使用 Livy API 提交和执行会话作业

适用于:✅Microsoft Fabric 中的数据工程和数据科学

了解如何使用 Livy API for Fabric 数据工程提交 Spark 会话作业。

重要

此功能目前为预览版

先决条件

Livy API 定义用于操作的统一终结点。 按照本文中的示例作,请将占位符 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID}、{Fabric_LakehouseID} 和 {Entra_ClientSecret} 替换为相应的值。

为 Livy API 会话配置 Visual Studio Code

  1. 在 Fabric 湖屋中选择“湖屋设置”

    屏幕截图显示湖屋设置。

  2. 导航到“Livy 终结点”部分。

    屏幕截图显示湖屋 Livy 终结点和会话作业连接字符串。

  3. 请将图像中第一个红色框内的会话作业连接字符串复制到您的代码中。

  4. 导航到 Microsoft Entra 管理中心,并将应用程序(客户端)ID 和目录(租户)ID 复制到代码。

    在 Microsoft Entra 管理中心显示 Livy API 应用概述的屏幕截图。

使用 Entra 用户令牌或 Entra SPN 令牌对 Livy API Spark 会话进行身份验证

使用 Entra SPN 令牌对 Livy API Spark 会话进行身份验证

  1. 在 Visual Studio Code 中创建 .ipynb 笔记本并插入以下代码。

    from msal import ConfidentialClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID" 
    client_id = "Entra_ClientID"
    client_secret = "Entra_ClientSecret"
    audience = "https://api.fabric.microsoft.com/.default"  
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Get the app-only token
    def get_app_only_token(tenant_id, client_id, client_secret, audience):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID.
            client_id (str): The Service Principal's client ID.
            client_secret (str): The Service Principal's client secret.
            audience (str): The audience for the token (e.g., resource-specific scope).
    
        Returns:
            str: The access token.
        """
        try:
            # Define the authority URL for the tenant
            authority = f"https://login.microsoftonline.com/{tenant_id}"
    
            # Create a ConfidentialClientApplication instance
            app = ConfidentialClientApplication(
                client_id = client_id,
                client_credential = client_secret,
                authority = authority
            )
    
            # Acquire a token using the client credentials flow
            result = app.acquire_token_for_client(scopes = [audience])
    
            # Check if the token was successfully retrieved
            if "access_token" in result:
                return result["access_token"]
            else:
                raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}")
        except Exception as e:
            print(f"Error retrieving token: {e}", fil = sys.stderr)
            sys.exit(1)
    
    token = get_app_only_token(tenant_id, client_id, client_secret, audience)
    
    api_base_url = 'https://api.fabric.microsoft.com/v1/'
    livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"    
    headers = {"Authorization": "Bearer " + token}
    
    print(token)
    
    
  2. 在 Visual Studio Code 中,应会看到返回的 Microsoft Entra 令牌。

    屏幕截图显示运行单元格后返回的 Microsoft Entra SPN 令牌。 ```

使用 Entra 用户令牌对 Livy API Spark 会话进行身份验证

  1. 在 Visual Studio Code 中创建 .ipynb 笔记本并插入以下代码。

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
            authority = "https://login.microsoftonline.com/"Entra_TenantID"
        )
    
     result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
     result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
     "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
    print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url ='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. 在 Visual Studio Code 中,应会看到返回的 Microsoft Entra 令牌。

    屏幕截图显示运行单元格后返回的 Microsoft Entra 用户令牌。

创建 Livy API Spark 会话

  1. 添加另一个笔记本单元格并插入此代码。

    create_livy_session = requests.post(livy_base_url, headers = headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    print(get_session_response.json())
    
  2. 运行笔记本单元格,应该会看到在创建 Livy 会话时打印了一行。

    显示第一个笔记本单元格执行结果的屏幕截图。

  3. 可以使用[在监视中心查看作业](#在监视中心查看作业)来验证是否已创建 Livy 会话。

与 Fabric 环境集成

默认情况下,此 Livy API 会话针对工作区的默认初学者池运行。 或者,你可以使用 Fabric 环境。在 Microsoft Fabric 中创建、配置并使用环境来自定义由 Livy API 会话用于这些 Spark 作业的 Spark 池。 若要使用 Fabric 环境,只需使用此 JSON 数据包更新以前的笔记本单元。

create_livy_session = requests.post(livy_base_url, headers = headers, json = {
    "conf" : {
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
        }
)

使用 Livy API Spark 会话提交 spark.sql 语句

  1. 添加另一个笔记本单元格并插入此代码。

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
            "kind": "spark"
        }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. 运行笔记本单元格,你应该会看到在作业提交和结果返回时打印出的几条递增的行。

    显示第一个包含 Spark.sql 执行的笔记本单元格结果的屏幕截图。

使用 Livy API Spark 会话提交第二个 spark.sql 语句

  1. 添加另一个笔记本单元格并插入此代码。

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. 运行笔记本单元格,你应该会看到在作业提交和结果返回时打印出的几条递增的行。

    显示第二个笔记本单元格执行结果的屏幕截图。

使用第三个语句关闭 Livy 会话

  1. 添加另一个笔记本单元格并插入此代码。

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, header = headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers = headers)
    print (response)
    

在监视中心查看作业

可以通过在左侧导航链接中选择“监视”来访问监视中心,以查看各种 Apache Spark 活动。

  1. 当会话正在进行或处于完成状态后,可以通过导航到“监视”来查看会话状态。

    显示监控中心中以前的 Livy API 提交的屏幕截图。

  2. 选择并打开最新活动名称。

    显示监视中心的最新 Livy API 活动的屏幕截图。

  3. 在此 Livy API 会话案例中,可以看到以前的会话提交、运行详细信息、Spark 版本和配置。 请注意右上角的已停止状态。

    屏幕截图,其中显示了监视中心的最新 Livy API 活动详细信息。

若要回顾整个过程,需要一个远程客户端(例如 Visual Studio Code)、Microsoft Entra 应用/SPN 令牌、Livy API 终结点 URL、针对湖屋的身份验证以及会话 Livy API。