本文包含 Python 用户定义函数 (UDF) 示例。 其中介绍了如何注册 UDF、如何调用 UDF,并提供有关 Spark SQL 中子表达式求值顺序的注意事项。
在 Databricks Runtime 14.0 及更高版本中,可以使用 Python 用户定义的表函数 (UDTF) 来注册返回整个关系而不是标量值的函数。 请参阅 Python 用户定义表函数 (UDTF)。
注意
在 Databricks Runtime 12.2 LTS 及更低版本中,使用标准访问模式的 Unity 目录计算不支持 Python UDF 和 Pandas UDF。 Databricks Runtime 13.3 LTS 及更高版本支持标量 Python UDF 和 Pandas UDF,适用于所有访问模式。
在 Databricks Runtime 13.3 LTS 及更高版本中,可以使用 SQL 语法将标量 Python UDF 注册到 Unity Catalog。 请参阅 Unity Catalog 中的用户定义函数 (UDF)。
将函数注册为 UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
可以选择设置 UDF 的返回类型。 默认返回类型为 StringType。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
在 Spark SQL 中调用 UDF
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
将 UDF 与数据帧配合使用
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
另外,还可以使用注释语法声明同一 UDF:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
计算顺序和 NULL 检查
Spark SQL(包括 SQL、数据帧和数据集 API)不保证子表达式的计算顺序。 具体而言,运算符或函数的输入不一定按从左到右的顺序或任何其他固定顺序进行计算。 例如,逻辑 AND 和 OR 表达式没有从左到右的“短路”语义。
因此,依赖于布尔表达式计算的副作用或顺序以及 WHERE 和 HAVING 子句的顺序是危险的,因为在查询优化和规划过程中,这些表达式和子句可能重新排序。 具体而言,如果 UDF 依赖于 SQL 中的短路语义进行 NULL 检查,则不能保证在调用 UDF 之前执行 NULL 检查。 例如,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
这个 WHERE 子句并不保证在筛选掉 NULL 后调用 strlen UDF。
若要执行正确的 NULL 检查,建议执行以下操作之一:
- 使 UDF 自身能够识别 NULL,在 UDF 自身内部进行 NULL 检查
- 使用
IF或CASE WHEN表达式来执行 NULL 检查并在条件分支中调用 UDF
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Scalar Python UDF 中的服务凭据
标量 Python UDF 可以使用 Unity 目录服务凭据安全地访问外部云服务。 这可用于将基于云的令牌化、加密或机密管理等作直接集成到数据转换中。
SQL 仓库和常规计算仅支持标量 Python UDF 的服务凭据。
注意
Scalar Python UDF 中的服务凭据需要 Databricks Runtime 17.1 及更高版本。
若要创建服务凭据,请参阅 “创建服务凭据”。
注意
UDF 专用的服务凭据 API:
在 UDF 中,使用 databricks.service_credentials.getServiceCredentialsProvider() 访问服务凭据。
这不同于 dbutils.credentials.getServiceCredentialsProvider() 笔记本中使用的函数,该函数在 UDF 执行上下文中不可用。
若要访问服务凭据,请使用 databricks.service_credentials.getServiceCredentialsProvider() UDF 逻辑中的实用工具通过相应的凭据初始化云 SDK。 所有代码都必须封装在 UDF 正文中。
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
服务凭据权限
UDF 的创建者必须具有 Unity 目录服务凭据的 ACCESS 权限。
在 No-PE 范围内运行的 UDF(也称为专用群集)需要对服务凭据拥有 MANAGE 权限。
默认凭据
在 Scalar Python UDF 中使用时,Databricks 会自动使用计算环境变量中的默认服务凭据。 此行为允许在不显式管理 UDF 代码中的凭据别名的情况下安全地引用外部服务。 请参阅 为计算资源指定默认服务凭据
默认凭据支持仅适用于标准和专用访问模式群集。 它在 DBSQL 中不可用。
必须安装包 azure-identity 才能使用 DefaultAzureCredential 提供程序。 若要安装包,请参阅 笔记本范围的 Python 库 或 计算范围的库。
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
获取任务执行上下文
使用 TaskContext PySpark API 获取上下文信息,例如用户的标识、群集标记、spark 作业 ID 等。 请参阅 UDF 中的“获取任务上下文”。
限制
以下限制适用于 PySpark UDF:
文件访问限制: 在 Databricks Runtime 14.2 及更低版本上,共享群集上的 PySpark UDF 无法访问 Git 文件夹、工作区文件或 Unity 目录卷。
广播变量: 标准访问模式群集和无服务器计算上的 PySpark UDF 不支持广播变量。
服务凭据: 服务凭据仅在 Batch Unity 目录 Python UDF 和标量 Python UDF 中可用。 标准 Unity 目录 Python UDF 不支持它们。
服务凭据:仅当使用无服务器环境版本 3 或更高版本时,服务凭据才可用于无服务器计算。 请参阅 无服务器环境版本。
- 无服务器内存限制:无服务器计算上的 PySpark UDF 的内存限制为每个 PySpark UDF 限制 1GB。 超出此限制会导致类型 UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESS 错误。
- 标准访问模式上的内存限制:标准访问模式上的 PySpark UDF 基于所选实例类型的可用内存限制。 超出可用内存会导致 UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT 类型错误。