本文介绍了 Databricks REST API 中的 clone a pipeline 请求,以及如何使用它将发布至 Hive 元数据存储的现有管道复制到发布至 Unity 目录系统的新管道。 调用 clone a pipeline 请求时,它会:
- 将源代码和配置从现有管道复制到新管道,应用指定的任何配置替代。
- 更新具体化视图和流式处理表定义及其引用,进行必要的更改以由 Unity Catalog 管理这些对象。
- 启动管道更新,以迁移管道中任意流式表的现有数据和元数据,如检查点等。 这样,这些流式处理表就可以在与原始管道相同的时间点恢复处理。
克隆作完成后,原始管道和新管道都可以独立运行。
本文包含从 Databricks 笔记本直接或通过 Python 脚本调用 API 请求的示例。
在您开始之前
克隆管道之前,需要满足以下条件:
若要克隆 Hive 元存储管道,管道中定义的表和视图必须将表发布到目标架构。 若要了解如何将目标架构添加到管道,请参阅 配置管道以发布到 Hive 元存储。
对要克隆的管道中的 Hive 元存储托管表或视图的引用必须具有目录(
hive_metastore)、架构和表名称的完全限定。 例如,在创建customers数据集的以下代码中,表名称参数必须更新为hive_metastore.sales.customers:@dp.table def customers(): return spark.read.table("sales.customers").where(...)克隆作正在进行时,请勿编辑源 Hive 元存储管道的源代码,包括配置为管道的一部分的笔记本以及存储在 Git 文件夹或工作区文件中的任何模块。
启动克隆操作时,源 Hive metastore 管道不得运行。 如果更新正在运行,请停止更新或等待更新完成。
以下是克隆管道之前的其他重要注意事项:
- 如果 Hive 元存储管道中的表使用
pathPython 或LOCATIONSQL 中的参数指定存储位置,请将"pipelines.migration.ignoreExplicitPath": "true"配置传递给克隆请求。 以下说明中包括设置此配置。 - 如果 Hive 元存储管道包含一个自动加载程序源,其中指定了
cloudFiles.schemaLocation选项的值,并且在创建 Unity 目录克隆后 Hive 元存储管道将保持正常运行,则必须在 Hive 元存储管道和克隆后的 Unity 目录管道中,将mergeSchema选项设置为true。 在克隆之前,将此选项添加到 Hive 元存储管道中将会把该选项复制到新的管道中。
使用 Databricks REST API 克隆管道
以下示例使用 curl 命令在 Databricks REST API 中调用 clone a pipeline 请求:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
替换为:
-
<personal-access-token>使用 Databricks 个人访问令牌。 -
<databricks-instance>使用 Azure Databricks 工作区实例名称,例如adb-1234567890123456.7.azuredatabricks.net -
<pipeline-id>包含要克隆的 Hive 元存储管道的唯一标识符。 可以在 Lakeflow 声明性管道 UI 中找到管道 ID。
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
替换为:
-
<target-catalog-name>包含新管道应发布到的 Unity 目录中的目录的名称。 这必须是现有目录。 -
<target-schema-name>如果新管道的发布目标与当前架构名称不同,则应标记为 Unity Catalog 中的另一架构名称。 此参数是可选的,如果未指定,则使用现有架构名称。 -
<new-pipeline-name>具有新管道的可选名称。 如果未指定,则使用追加的源管道名称[UC]命名新管道。
clone_mode 指定要用于克隆作的模式。
MIGRATE_TO_UC 是唯一支持的选项。
使用 configuration 字段在新管道上指定配置。 此处设置的值将替代原始管道中的配置。
来自 REST API 请求的响应 clone 是新 Unity Catalog 管道的管道 ID。
从 Databricks 笔记本克隆管道
以下示例从 Python 脚本调用 create a pipeline 请求。 可以使用 Databricks 笔记本运行此脚本:
- 为脚本创建新笔记本。 请参阅创建笔记本。
- 将以下 Python 脚本复制到笔记本的第一个单元中。
- 通过替换以下代码更新脚本中的占位符值:
-
<databricks-instance>使用 Azure Databricks 工作区实例名称,例如adb-1234567890123456.7.azuredatabricks.net -
<pipeline-id>包含要克隆的 Hive 元存储管道的唯一标识符。 可以在 Lakeflow 声明性管道 UI 中找到管道 ID。 -
<target-catalog-name>是 Unity Catalog 中一个目录的名称,新管道应发布到该目录。 这必须是一个现有的目录。 -
<target-schema-name>如果新的管道要发布到的架构名称与当前架构名称不同,则为 Unity 目录中的架构名称。 此参数是可选的,如果未指定,则使用现有架构名称。 -
<new-pipeline-name>具有新管道的可选名称。 如果未指定,则使用追加的源管道名称[UC]命名新管道。
-
- 运行脚本。 请参阅 “运行 Databricks 笔记本”。
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
局限性
下面是 Lakeflow 声明性管道 clone a pipeline API 请求的限制:
- 仅支持从配置为使用 Hive 元数据存储的管道克隆到 Unity Catalog 管道。
- 您只能在您正在克隆的管道所属的同一个 Azure Databricks 工作区中创建克隆。
- 要克隆的管道只能包括以下流媒体源:
- 增量源
- 自动加载程序,包括自动加载程序支持的任何数据源。 请参阅 从云对象存储加载文件。
- Apached Kafka 与结构化流式处理。 但是,无法将 Kafka 源配置为使用
kafka.group.id该选项。 请参阅使用 Apache Kafka 和 Azure Databricks 进行流处理。 - Amazon Kinesis 结合结构化流式处理。 但是,无法将 Kinesis 源配置为
consumerMode或efo。
- Databricks 建议如果要克隆的 Hive 元存储管道使用 Auto Loader 文件通知模式,则在克隆后不要运行 Hive 元存储管道。 这是因为运行 Hive 元存储管道会导致从 Unity 目录克隆中删除某些文件通知事件。 如果源 Hive 元存储管道在克隆操作完成后运行,则可以使用自动加载器配合
cloudFiles.backfillInterval选项回填缺失的文件。 若要了解自动加载程序文件通知模式,请参阅 在文件通知模式下配置自动加载程序流。 若要了解如何使用自动加载程序回填文件,请参阅 使用 cloudFiles.backfillInterval 和 Common Auto Loader 选项触发常规回填。 - 当克隆正在进行时,两个管道的维护任务会自动暂停。
- 在克隆版 Unity Catalog 管道中的表上执行时间范围查询如下:
- 如果表版本最初被写入到由 Hive Metastore 管理的对象中,那么当查询克隆的 Unity Catalog 对象时,使用
timestamp_expression子句的时光旅行查询将是未定义的。 - 但是,如果表版本已写入克隆的 Unity 目录对象,则使用
timestamp_expression子句的时间旅行查询能够正常运行。 - 使用
version子句的时间旅行查询在查询克隆的 Unity Catalog 对象时正常工作,即使其版本最初被写入 Hive 元存储托管对象也是如此。
- 如果表版本最初被写入到由 Hive Metastore 管理的对象中,那么当查询克隆的 Unity Catalog 对象时,使用
- 有关将 Lakeflow 声明性管道与 Unity 目录配合使用时的其他限制,请参阅 Unity 目录管道限制。
- 有关 Unity 目录限制,请参阅 Unity 目录限制。