适用于:✅Microsoft Fabric 中的数据工程和数据科学
了解如何使用 Livy API for Fabric 数据工程提交 Spark 会话作业。
重要
此功能目前为预览版。
先决条件
具有 Jupyter Notebook、PySpark 和用于 Python 的 Microsoft 身份验证库(MSAL)的远程客户端(例如 Visual Studio Code)
或者是 Microsoft Entra 应用令牌。 将应用程序注册到 Microsoft 标识平台
或者Microsoft Entra SPN 令牌。 在 Microsoft Entra 中添加和管理应用程序凭据
对于湖屋中的一些数据,此示例使用纽约市出租车和豪华轿车委员会 green_tripdata_2022_08,这是加载到湖屋中的一个 parquet 文件
Livy API 定义用于操作的统一终结点。 按照本文中的示例作,请将占位符 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID}、{Fabric_LakehouseID} 和 {Entra_ClientSecret} 替换为相应的值。
为 Livy API 会话配置 Visual Studio Code
在 Fabric 湖屋中选择“湖屋设置”。
导航到“Livy 终结点”部分。
请将图像中第一个红色框内的会话作业连接字符串复制到您的代码中。
导航到 Microsoft Entra 管理中心,并将应用程序(客户端)ID 和目录(租户)ID 复制到代码。
使用 Entra 用户令牌或 Entra SPN 令牌对 Livy API Spark 会话进行身份验证
使用 Entra SPN 令牌对 Livy API Spark 会话进行身份验证
在 Visual Studio Code 中创建
.ipynb笔记本并插入以下代码。from msal import ConfidentialClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" client_secret = "Entra_ClientSecret" audience = "https://api.fabric.microsoft.com/.default" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Get the app-only token def get_app_only_token(tenant_id, client_id, client_secret, audience): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. Args: tenant_id (str): The Azure Active Directory tenant ID. client_id (str): The Service Principal's client ID. client_secret (str): The Service Principal's client secret. audience (str): The audience for the token (e.g., resource-specific scope). Returns: str: The access token. """ try: # Define the authority URL for the tenant authority = f"https://login.microsoftonline.com/{tenant_id}" # Create a ConfidentialClientApplication instance app = ConfidentialClientApplication( client_id = client_id, client_credential = client_secret, authority = authority ) # Acquire a token using the client credentials flow result = app.acquire_token_for_client(scopes = [audience]) # Check if the token was successfully retrieved if "access_token" in result: return result["access_token"] else: raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}") except Exception as e: print(f"Error retrieving token: {e}", fil = sys.stderr) sys.exit(1) token = get_app_only_token(tenant_id, client_id, client_secret, audience) api_base_url = 'https://api.fabric.microsoft.com/v1/' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + token} print(token)在 Visual Studio Code 中,应会看到返回的 Microsoft Entra 令牌。
使用 Entra 用户令牌对 Livy API Spark 会话进行身份验证
在 Visual Studio Code 中创建
.ipynb笔记本并插入以下代码。from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority = "https://login.microsoftonline.com/"Entra_TenantID" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url ='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}在 Visual Studio Code 中,应会看到返回的 Microsoft Entra 令牌。
创建 Livy API Spark 会话
添加另一个笔记本单元格并插入此代码。
create_livy_session = requests.post(livy_base_url, headers = headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json())运行笔记本单元格,应该会看到在创建 Livy 会话时打印了一行。
可以使用[在监视中心查看作业](#在监视中心查看作业)来验证是否已创建 Livy 会话。
与 Fabric 环境集成
默认情况下,此 Livy API 会话针对工作区的默认初学者池运行。 或者,你可以使用 Fabric 环境。在 Microsoft Fabric 中创建、配置并使用环境来自定义由 Livy API 会话用于这些 Spark 作业的 Spark 池。 若要使用 Fabric 环境,只需使用此 JSON 数据包更新以前的笔记本单元。
create_livy_session = requests.post(livy_base_url, headers = headers, json = {
"conf" : {
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
}
)
使用 Livy API Spark 会话提交 spark.sql 语句
添加另一个笔记本单元格并插入此代码。
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)运行笔记本单元格,你应该会看到在作业提交和结果返回时打印出的几条递增的行。
使用 Livy API Spark 会话提交第二个 spark.sql 语句
添加另一个笔记本单元格并插入此代码。
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)运行笔记本单元格,你应该会看到在作业提交和结果返回时打印出的几条递增的行。
使用第三个语句关闭 Livy 会话
添加另一个笔记本单元格并插入此代码。
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, header = headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers = headers) print (response)
在监视中心查看作业
可以通过在左侧导航链接中选择“监视”来访问监视中心,以查看各种 Apache Spark 活动。
当会话正在进行或处于完成状态后,可以通过导航到“监视”来查看会话状态。
选择并打开最新活动名称。
在此 Livy API 会话案例中,可以看到以前的会话提交、运行详细信息、Spark 版本和配置。 请注意右上角的已停止状态。
若要回顾整个过程,需要一个远程客户端(例如 Visual Studio Code)、Microsoft Entra 应用/SPN 令牌、Livy API 终结点 URL、针对湖屋的身份验证以及会话 Livy API。