Fabric 用户数据函数编程模型概述

Fabric 用户数据函数编程模型是一种 SDK,它提供在 Fabric 中创作和发布可运行的函数所需的功能。 SDK 还支持与 Fabric 生态系统中的其他项(如 Fabric 数据源)无缝集成。 此库在 PyPI 中公开提供,并预安装在用户数据函数项中。

用户数据函数 SDK

用户数据函数项包含一个或多个可从 Fabric 门户、另一个 Fabric 项或使用提供的 REST 终结点从外部应用程序调用的函数。 每个函数都是 Python 脚本中的一种方法,它支持将参数传递给调用程序并返回输出。 用户数据函数编程模型包含以下组件:

  • fabric.functions 库提供采用 Python 创建用户数据函数所需的代码。 创建新的用户数据函数项时,可以看到此库导入到第一个函数模板中。

  • 该方法 fn.UserDataFunctions() 提供在所有新的用户数据函数项(在任何函数定义之前)代码文件的开头找到的执行上下文。

    示例:

    import datetime
    import fabric.functions as fn
    import logging
    
    udf = fn.UserDataFunctions()
    
  • 每个函数都使用 @udf.function() 修饰器进行标识。 此修饰器决定您的函数是否能够分别从门户或外部调用器调用。 使用此修饰器还需要函数具有返回值。 具有此修饰器的函数可以访问修饰器表示 @udf.connection 的连接对象。

    可调用的函数示例

    # This is a hello fabric function sample that can be invoked from the Fabric portal, another Fabric item, or an external application.
    
    @udf.function()
    def hello_fabric(name: str) -> str:
        logging.info('Python UDF trigger function processed a request.')
        logging.info('Executing hello fabric function.')
    
        return f"Welcome to Fabric Functions, {name}, at {datetime.datetime.now()}!"
    
  • 任何没有 @udf.function() 修饰器的 Python 方法都无法直接被调用。 它们只能从包含修饰器的函数进行调用,并可用作帮助程序函数。

    帮助程序函数示例

    # This is a helper function that can be invoked from other functions, but can't be invoked or run directly because it doesn't have the @udf.function() decorator
    
    def uppercase_name(name: str) -> str:
        return name.upper()
    

支持的输入类型

可以为函数定义输入参数,例如 str、int、float 等基元数据类型。支持的输入数据类型为:

JSON 类型 Python 数据类型
字符串 str
日期/时间字符串 日期/时间
布尔值 布尔
数字 int、float
数组 list[],例如 list[int]
对象 dict
对象 pandas DataFrame
对象对象的数组 pandas 系列

注释

若要使用 pandas DataFrame 和 Series 类型,请在 Fabric 门户中为用户数据功能选择 库管理 ,并将版本更新 fabric-user-data-function 为 1.0.0。

支持的输入类型的请求正文示例:

{
  "name": "Alice",                          // String (str)
  "signup_date": "2025-07-08T13:44:40Z",    // Datetime string (datetime)
  "is_active": true,                        // Boolean (bool)
  "age": 30,                                // Number (int)
  "height": 5.6,                            // Number (float)
  "favorite_numbers": [3, 7, 42],           // Array (list[int])
  "profile": {                              // Object (dict)
    "email": "alice@example.com",
    "location": "Sammamish"
  },
  "sales_data": {                           // Object (pandas DataFrame)
    "2025-07-01": {"product": "A", "units": 10},
    "2025-07-02": {"product": "B", "units": 15}
  },
  "weekly_scores": [                        // Object or Array of Objects (pandas Series)
    {"week": 1, "score": 88},
    {"week": 2, "score": 92},
    {"week": 3, "score": 85}
  ]
}

支持的输出类型

支持的输出数据类型为:

Python 数据类型
str
日期/时间
布尔
int、float
list[data-type],例如 list[int]
dict
没有
pandas 系列
pandas DataFrame

如何编写异步函数

在代码中的函数定义上添加异步装饰器。 借助函数 async ,可以通过同时处理多个任务来提高应用程序的响应能力和效率。 这些工具非常适合管理大规模的 I/O 绑定操作。 此示例函数使用 pandas 从 lakehouse 读取 CSV 文件。 函数采用文件名作为输入参数。

import pandas as pd 

# Replace the alias "<My Lakehouse alias>" with your connection alias.
@udf.connection(argName="myLakehouse", alias="<My Lakehouse alias>")
@udf.function()
async def read_csv_from_lakehouse(myLakehouse: fn.FabricLakehouseClient, csvFileName: str) -> str:

    # Connect to the Lakehouse
    connection = myLakehouse.connectToFilesAsync()   

    # Download the CSV file from the Lakehouse
    csvFile = connection.get_file_client(csvFileName)

    downloadFile = await csvFile.download_file()
    csvData = await downloadFile.readall()
    
    # Read the CSV data into a pandas DataFrame
    from io import StringIO
    df = pd.read_csv(StringIO(csvData.decode('utf-8')))

    # Display the DataFrame    
    result="" 
    for index, row in df.iterrows():
        result=result + "["+ (",".join([str(item) for item in row]))+"]"
    
    # Close the connection
    csvFile.close()
    connection.close()

    return f"CSV file read successfully.{result}"

与 Fabric 数据源的数据连接

使用此模块,可以引用数据连接,而无需在代码中编写连接字符串。 fabric.functions 库提供两种方法来处理数据连接:

  • fabric.functions.FabricSqlConnection:支持在 Fabric 中使用 SQL 数据库,包括 SQL Analytics 终结点和 Fabric 仓库。
  • fabric.functions.FabricLakehouseClient:支持使用湖屋,并通过一种方法连接到湖屋表和湖屋文件。

若要引用与数据源的连接,需要使用 @udf.connection 修饰器。 可以采用以下任何格式应用它:

  • @udf.connection(alias="<alias for data connection>", argName="sqlDB")
  • @udf.connection("<alias for data connection>", "<argName>")
  • @udf.connection("<alias for data connection>")

@udf.connection 的参数为:

  • argName,连接在函数中使用的变量的名称。
  • alias,使用“管理连接”菜单添加的连接的别名。
  • 如果 argNamealias 的值相同,则可以使用 @udf.connection("<alias and argName for the data connection>")

示例

# Where demosqldatabase is the argument name and the alias for my data connection used for this function
@udf.connection("demosqldatabase")
@udf.function()
def read_from_sql_db(demosqldatabase: fn.FabricSqlConnection)-> list:
  # Replace with the query you want to run
  query = "SELECT * FROM (VALUES ('John Smith', 31), ('Kayla Jones', 33)) AS Employee(EmpName, DepID);"

  # [...] Here is where the rest of your SqlConnection code would be.

  return results

Fabric 项目或 Azure 资源的通用连接

泛型连接允许使用用户数据函数项所有者标识创建与 Fabric 项或 Azure 资源的连接。 此功能生成一个 Entra ID 令牌,其中包含项所有者的标识和提供的受众类型。 此令牌用于对支持该受众类型的 Fabric 项或 Azure 资源进行身份验证。 此过程将为你提供与使用 “管理连接”功能 中的托管连接对象类似的编程体验,但仅适用于连接中提供的受众类型。

此功能使用带有以下参数的 @udf.generic_connection() 修饰器。

参数 说明 价值
argName 传递给函数的变量的名称。 用户需要在函数的参数中指定此变量,并使用其类型fn.FabricItem 例如,如果为 argName=CosmosDb,则函数应包含此参数 cosmosDb: fn.FabricItem
audienceType 为其创建连接的受众类型。 此参数与 Fabric 项或 Azure 服务的类型相关联,并确定用于连接的客户端。 此参数 CosmosDb 的允许值为或 KeyVault

使用通用连接连接到 Fabric Cosmos DB 容器

通用连接使用 CosmosDB 受众类型支持原生 Fabric Cosmos DB 项。 包含的用户数据函数 SDK 提供了一种名为 get_cosmos_client 的帮助程序方法,用于在每次调用时获取单一实例 Cosmos DB 客户端。

可以按照以下步骤使用泛型连接连接到 Fabric Cosmos DB 项

  1. 在 Fabric 用户数据函数项中,使用库管理功能安装azure-cosmos库。

  2. 转到 Fabric Cosmos DB 项 设置。

    显示 Fabric Cosmos DB 设置按钮位置的屏幕截图。

  3. 检索 Fabric Cosmos DB 终结点 URL

    显示 Fabric Cosmos DB 终结点 URL 的屏幕截图。

  4. 转到 用户数据功能选项。 使用以下示例代码连接到 Fabric Cosmos DB 容器,并使用 Cosmos DB 示例数据集运行读取查询。 替换以下变量的值:

    • COSMOS_DB_URI 使用 Fabric Cosmos DB 终结点。
    • DB_NAME 填入您的 Fabric Cosmos DB 项目名称。
    from fabric.functions.cosmosdb import get_cosmos_client
    import json
    
    @udf.generic_connection(argName="cosmosDb", audienceType="CosmosDB")
    @udf.function()
    def get_product_by_category(cosmosDb: fn.FabricItem, category: str) -> list:
    
        COSMOS_DB_URI = "YOUR_COSMOS_DB_URL"
        DB_NAME = "YOUR_COSMOS_DB_NAME" # Note: This is the Fabric item name
        CONTAINER_NAME = "SampleData" # Note: This is your container name. In this example, we are using the SampleData container.
    
        cosmosClient = get_cosmos_client(cosmosDb, COSMOS_DB_URI)
    
        # Get the database and container
        database = cosmosClient.get_database_client(DB_NAME)
        container = database.get_container_client(CONTAINER_NAME)
    
        query = 'select * from c WHERE c.category=@category' #"select * from c where c.category=@category"
        parameters = [
            {
                "name": "@category", "value": category
            }
        ]
        results = container.query_items(query=query, parameters=parameters)
        items = [item for item in results]
    
        logging.info(f"Found {len(items)} products in {category}")
    
        return json.dumps(items)
    
  5. 通过提供类别名称(例如在调用参数中)Accessory

注释

还可以使用这些步骤通过帐户 URL 和数据库名称连接到 Azure Cosmos DB 数据库。 用户数据函数所有者帐户需要对该 Azure Cosmos DB 帐户的访问权限。

使用通用连接连接到 Azure Key Vault

泛型连接支持通过使用 KeyVault 受众类型与 Azure Key Vault 进行连接。 这种类型的连接要求 Fabric 用户数据函数所有者有权连接到 Azure Key Vault。 可以使用此连接按名称检索密钥、机密或证书。

可以连接到 Azure Key Vault 以检索客户端密码,以使用通用连接调用 API,请执行以下步骤:

  1. Fabric 用户数据函数项中,使用requests安装azure-keyvault-secrets库和库。

  2. 转到 Azure Key Vault 资源 并检索 Vault URI 密钥、机密或证书的名称。

    显示 Azure Key Vault 终结点 URL 和值的屏幕截图。

  3. 返回到 Fabric 用户数据函数项 并使用此示例。 在此示例中,我们将从 Azure Key Vault 检索机密以连接到公共 API。 替换以下变量的值:

    • 使用在上一步中检索到的Vault URIKEY_VAULT_URL
    • KEY_VAULT_SECRET_NAME 以您的机密名称。
    • API_URL 变量,其中包含要连接到的 API 的 URL。 此示例假定你连接到接受 GET 请求并采用以下参数 api-key 的公共 request-bodyAPI。
    from azure.keyvault.secrets import SecretClient
    from azure.identity import DefaultAzureCredential
    import requests
    
    @udf.generic_connection(argName="keyVaultClient", audienceType="KeyVault")
    @udf.function()
    def retrieveNews(keyVaultClient: fn.FabricItem, requestBody:str) -> str:
        KEY_VAULT_URL = 'YOUR_KEY_VAULT_URL'
        KEY_VAULT_SECRET_NAME= 'YOUR_SECRET'
        API_URL = 'YOUR_API_URL'
    
        credential = keyVaultClient.get_access_token()
    
        client = SecretClient(vault_url=KEY_VAULT_URL, credential=credential)
    
        api_key = client.get_secret(KEY_VAULT_SECRET_NAME).value
    
        api_url = API_URL
        params = {
            "api-key": api_key,
            "request-body": requestBody
        }
    
        response = requests.get(api_url, params=params)
    
        data = "" 
    
        if response.status_code == 200:
            data = response.json()
        else:
            print(f"Error {response.status_code}: {response.text}")
    
        return f"Response: {data}"
    
  4. 通过在代码中提供请求正文来测试或运行此函数

使用 UserDataFunctionContext 获取调用属性

编程模型还包含 UserDataFunctionContext 对象。 此对象包含函数调用元数据,可用于为某些调用机制创建特定的应用逻辑。

下表显示了 UserDataFunctionContext 对象的属性:

属性名称 数据类型 说明
调用 ID 字符串 绑定到用户数据函数项调用的唯一 GUID。
执行用户 对象 用于授权调用的用户信息的元数据。

ExecutingUser 对象包含以下信息:

属性名称 数据类型 说明
OID (对象标识符) 字符串 (GUID) 用户的对象 ID,它是请求者的不可变标识符。 这是用于跨应用程序调用此函数的用户或服务主体的验证标识。
租户ID 字符串 (GUID) 用户登录到的租户的 ID。
首选用户名 字符串 调用用户的首选用户名,由用户设置。 此值是可变的。

若要访问 UserDataFunctionContext 参数,必须在函数定义的基础上使用以下修饰器:@udf.context(argName="<parameter name>")

示例

@udf.context(argName="myContext")
@udf.function()
def getContext(myContext: fabric.functions.UserDataFunctionContext)-> str:
    logging.info('Python UDF trigger function processed a request.')
    return f"Hello oid = {context.executing_user['Oid']}, TenantId = {context.executing_user['TenantId']}, PreferredUsername = {context.executing_user['PreferredUsername']}, InvocationId = {context.invocation_id}"

使用 UserThrownError 引发句柄错误

开发函数时,可以使用 Python 编程模型中提供的 UserThrownError 方法引发预期的错误响应。 如果用户提供的输入无法通过业务验证规则,则可以使用此方法。

示例

import datetime

@udf.function()
def raise_userthrownerror(age: int)-> str:
    if age < 18:
        raise fn.UserThrownError("You must be 18 years or older to use this service.", {"age": age})

    return f"Welcome to Fabric Functions at {datetime.datetime.now()}!"

UserThrownError 方法采用以下两种参数:

  • Message:此字符串作为错误消息返回给调用此函数的应用程序。
  • 属性字典将返回到调用此函数的应用程序。

后续步骤