启动并运行管道之后,你可以发布管道,以便它使用其他输入运行。 这称为已发布的管道。
什么发生了变化?
Batch Endpoint 提出了一种类似的更强大的方法来处理在持久 API 下运行的多个资产,这就是为什么已发布的管道功能在批处理终结点中移动到管道组件部署的原因。
批处理终结点会将接口(终结点)与实际实现(部署)分离,并允许用户决定哪个部署将服务于终结点的默认实现。
批处理终结点中的管道组件部署允许用户部署管道组件而不是管道,从而使那些希望简化 MLOps 实践的组织能够更好地利用可重用资产。
下表显示了各个概念之间的比较:
| 概念 |
SDK v1 |
SDK v2 |
| 用于调用的管道 REST 终结点 |
管道终结点 |
批处理终结点 |
| 终结点下管道的特定版本 |
已发布管道 |
管道组件部署 |
| 管道的调用参数 |
管道参数 |
作业输入 |
| 从已发布管道生成的作业 |
管道作业 |
批处理作业 |
若要了解如何创建你的首个管道组件部署,请参阅如何在批处理终结点中部署管道。
迁移到批处理终结点
使用以下指南了解如何使用批处理终结点中的概念从 SDK v1 迁移到 SDK v2。
发布管道
比较发布管道从 v1 到 v2 的变化:
首先,我们需要获取要发布的管道:
pipeline1 = Pipeline(workspace=ws, steps=[step1, step2])
可以按如下所示发布管道:
from azureml.pipeline.core import PipelineEndpoint
endpoint_name = "PipelineEndpointTest"
pipeline_endpoint = PipelineEndpoint.publish(
workspace=ws,
name=endpoint_name,
pipeline=pipeline,
description="A hello world endpoint for component deployments"
)
首先,我们需要定义要发布的管道。
@pipeline()
def pipeline(input_data: Input(type=AssetTypes.URI_FOLDER)):
(...)
return {
(..)
}
Batch 终结点不部署管道,而是部署管道组件。 组件提出了一种更可靠的方法,用于对在终结点下部署的资产进行源代码管理。 我们可以将任何管道定义转换为管道组件,如下所示:
pipeline_component = pipeline().component
作为最佳做法,我们建议注册管道组件,以便能够在工作区甚至共享注册表中以集中的方式对它们进行版本控制。
ml_client.components.create(pipeline_component)
然后,我们需要创建托管所有管道部署的终结点:
endpoint_name = "PipelineEndpointTest"
endpoint = BatchEndpoint(
name=endpoint_name,
description="A hello world endpoint for component deployments",
)
ml_client.batch_endpoints.begin_create_or_update(endpoint)
为管道组件创建部署:
deployment_name = "hello-batch-dpl"
deployment = BatchPipelineComponentDeployment(
name=deployment_name,
description="A hello world deployment with a single step.",
endpoint_name=endpoint.name,
component=pipeline_component
)
ml_client.batch_deployments.begin_create_or_update(deployment)
将作业提交到管道终结点
若要调用管道的默认版本,可以使用:
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint.submit("PipelineEndpointExperiment")
job = ml_client.batch_endpoints.invoke(
endpoint_name=batch_endpoint,
)
用于 inputs 指示作业的输入(如果需要)。 有关如何指示输入和输出的更详细说明,请参阅 为批处理终结点 创建作业和输入数据。
job = ml_client.batch_endpoints.invoke(
endpoint_name=batch_endpoint,
inputs={
"input_data": Input(type=AssetTypes.URI_FOLDER, path="./my_local_data")
}
)
还可将作业提交到特定的版本:
run_id = pipeline_endpoint.submit(endpoint_name, pipeline_version="0")
在批处理终结点中,不会对部署进行版本控制。 但是,可以在同一终结点下部署多个管道组件版本。 从这个意义上说,v1 中的每个管道版本都对应于不同的管道组件版本及其在终结点下的相应部署。
然后,可以部署在终结点下运行的特定部署,前提是该部署运行了你感兴趣的版本。
job = ml_client.batch_endpoints.invoke(
endpoint_name=endpoint_name,
deployment_name=deployment_name,
)
获取已部署的所有管道
all_pipelines = PublishedPipeline.get_all(ws)
以下代码列出了工作区中存在的所有终结点:
all_endpoints = ml_client.batch_endpoints.list()
但是,请记住,批处理终结点可以托管运行管道或模型的部署。 如果要获取托管管道的所有部署的列表,可以执行以下操作:
all_deployments = []
for endpoint in all_endpoints:
all_deployments.extend(ml_client.batch_deployments.list(endpoint_name=endpoint.name))
all_pipeline_deployments = filter(all_endpoints, lamdba x: x is BatchPipelineComponentDeployment)
使用 REST API
可以使用调用 URL 的 REST API 从终结点创建作业。 请参阅以下示例,了解调用如何从 v1 更改为 v2。
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name=endpoint_name)
rest_endpoint = pipeline_endpoint.endpoint
response = requests.post(
rest_endpoint,
headers=aad_token,
json={
"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"argument1": "united", "argument2":45}
}
)
批处理终结点支持多种输入类型。 下面的示例演示如何指示两个不同的类型 string 输入和 numeric。 有关更详细的示例,请参阅 创建批处理终结点的 作业和输入数据:
batch_endpoint = ml_client.batch_endpoints.get(endpoint_name)
rest_endpoint = batch_endpoint.invocation_url
response = requests.post(
rest_endpoint,
headers=aad_token,
json={
"properties": {
"InputData": {
"argument1": {
"JobInputType": "Literal",
"Value": "united"
},
"argument2": {
"JobInputType": "Literal",
"Value": 45
}
}
}
}
)
后续步骤