你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将管道升级到 SDK v2

在 SDK v2 中,“管道”已合并到作业中。

作业有不同类型。 大多数作业都是运行 command 的命令作业,例如 python main.py。 作业中运行的内容与任何编程语言无关,因此你可以运行 bash 脚本、调用 python 解释器、运行一组 curl 命令或其他任何内容。

另一种作业类型是 pipeline,它定义可能具有输入/输出关系的子作业,形成有向无环图 (DAG)。

若要升级,需要更改用于定义管道并将其提交到 SDK v2 的代码。 在子作业运行的内容不需要升级到 SDK v2。 但是,建议从模型训练脚本中删除任何特定于 Azure 机器学习的代码。 这种分离便于更轻松地在本地和云之间进行转换,并且被认为是成熟 MLOps 的最佳做法。 实际上,这意味着删除 azureml.* 代码行。 模型日志记录和跟踪代码应替换为 MLflow。 有关详细信息,请参阅如何在 v2 中使用 MLflow

本文比较了 SDK v1 和 SDK v2 中的方案。 以下示例将在一个虚拟管道作业中生成三个步骤(训练、评分和评估)。 此示例演示如何使用 SDK v1 和 SDK v2 生成管道作业,以及如何在步骤之间使用数据和传输数据。

运行管道

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2。 完整示例链接

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

SDK v1 和 SDK v2 中关键功能的映射

SDK v1 中的功能 SDK v2 中的粗略映射
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig 输出
数据集as_mount 输入
StepSequence 数据依赖项

步骤和作业/组件类型映射

SDK v1 中的步骤 SDK v2 中的作业类型 SDK v2 中的组件类型
adla_step
automl_step automl 作业 automl 组件
azurebatch_step
command_step command 作业 command 组件
data_transfer_step
databricks_step
estimator_step command 作业 command 组件
hyper_drive_step sweep 作业
kusto_step
module_step command 组件
mpi_step command 作业 command 组件
parallel_run_step Parallel 作业 Parallel 组件
python_script_step command 作业 command 组件
r_script_step command 作业 command 组件
synapse_spark_step spark 作业 spark 组件

已发布的管道

启动并运行管道之后,你可以发布管道,以便它使用其他输入运行。 这被称为已发布的管道批处理终结点提供了一种类似但更强大的方法来处理在持久 API 下运行的多个资产,正因为如此,“已发布的管道”功能已移到批处理终结点中的管道组件部署

批处理终结点会将接口(终结点)与实际实现(部署)分离,并允许用户决定哪个部署将服务于终结点的默认实现。 批处理终结点中的管道组件部署允许用户部署管道组件而不是管道,从而使那些希望简化 MLOps 实践的组织能够更好地利用可重用资产。

下表显示了各个概念之间的比较:

概念 SDK v1 SDK v2
用于调用的管道 REST 终结点 管道终结点 批处理终结点
终结点下管道的特定版本 已发布管道 管道组件部署
管道的调用参数 管道参数 作业输入
从已发布管道生成的作业 管道作业 批处理作业

有关如何迁移到批处理终结点的具体指导,请参阅将管道终结点升级到 SDK v2

有关详细信息,请参阅以下文档: