När du har en pipeline igång kan du publicera en pipeline så att den körs med olika indata. Detta kallades publicerade pipelines.
Vad har ändrats?
Batch-slutpunkten föreslår ett liknande men kraftfullare sätt att hantera flera tillgångar som körs under ett beständigt API, vilket är anledningen till att funktionen Publicerade pipelines flyttades till pipelinekomponentdistributioner i batchslutpunkter.
Batch-slutpunkter frikopplar gränssnittet (slutpunkten) från den faktiska implementeringen (distributionen) och låter användaren bestämma vilken distribution som ska användas för standardimplementeringen av slutpunkten.
Distributioner av pipelinekomponenter i batchslutpunkter gör det möjligt för användare att distribuera pipelinekomponenter i stället för pipelines, vilket bättre använder återanvändbara tillgångar för de organisationer som vill effektivisera sin MLOps-praxis.
Följande tabell visar en jämförelse av vart och ett av begreppen:
| Koncept |
SDK v1 |
SDK v2 |
| Pipelinens REST-slutpunkt för anrop |
Pipelineslutpunkt |
Batch-slutpunkt |
| Pipelinens specifika version under slutpunkten |
Publicerad pipeline |
Distribution av pipelinekomponenter |
| Pipelinens argument om anrop |
Pipeline-parameter |
Jobbindata |
| Jobb som genererats från en publicerad pipeline |
Pipelinejobb |
Batchjobb |
Information om hur du skapar din första pipelinekomponentdistribution finns i Distribuera pipelines i Batch-slutpunkter.
Flytta till batchslutpunkter
Använd följande riktlinjer för att lära dig hur du flyttar från SDK v1 till SDK v2 med hjälp av begreppen i Batch-slutpunkter.
Publicera en pipeline
Jämför hur publiceringen av en pipeline har ändrats från v1 till v2:
Först måste vi hämta den pipeline som vi vill publicera:
pipeline1 = Pipeline(workspace=ws, steps=[step1, step2])
Vi kan publicera pipelinen på följande sätt:
from azureml.pipeline.core import PipelineEndpoint
endpoint_name = "PipelineEndpointTest"
pipeline_endpoint = PipelineEndpoint.publish(
workspace=ws,
name=endpoint_name,
pipeline=pipeline,
description="A hello world endpoint for component deployments"
)
Först måste vi definiera den pipeline som vi vill publicera.
@pipeline()
def pipeline(input_data: Input(type=AssetTypes.URI_FOLDER)):
(...)
return {
(..)
}
Batchslutpunkter distribuerar inte pipelines utan pipelinekomponenter. Komponenter föreslår ett mer tillförlitligt sätt att ha källkontroll över de tillgångar som distribueras under en slutpunkt. Vi kan konvertera valfri pipelinedefinition till en pipelinekomponent på följande sätt:
pipeline_component = pipeline().component
Som bästa praxis rekommenderar vi att du registrerar pipelinekomponenter så att du kan behålla versionshantering av dem på ett centraliserat sätt på arbetsytan eller till och med de delade registren.
ml_client.components.create(pipeline_component)
Sedan måste vi skapa slutpunkten som är värd för alla pipelinedistributioner:
endpoint_name = "PipelineEndpointTest"
endpoint = BatchEndpoint(
name=endpoint_name,
description="A hello world endpoint for component deployments",
)
ml_client.batch_endpoints.begin_create_or_update(endpoint)
Skapa en distribution för pipelinekomponenten:
deployment_name = "hello-batch-dpl"
deployment = BatchPipelineComponentDeployment(
name=deployment_name,
description="A hello world deployment with a single step.",
endpoint_name=endpoint.name,
component=pipeline_component
)
ml_client.batch_deployments.begin_create_or_update(deployment)
Skicka ett jobb till en pipelineslutpunkt
Om du vill anropa standardversionen av pipelinen kan du använda:
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint.submit("PipelineEndpointExperiment")
job = ml_client.batch_endpoints.invoke(
endpoint_name=batch_endpoint,
)
Använd inputs för att ange indata för jobbet om det behövs. Se Skapa jobb och indata för batchslutpunkter för en mer detaljerad förklaring om hur du anger indata och utdata.
job = ml_client.batch_endpoints.invoke(
endpoint_name=batch_endpoint,
inputs={
"input_data": Input(type=AssetTypes.URI_FOLDER, path="./my_local_data")
}
)
Du kan också skicka ett jobb till en viss version:
run_id = pipeline_endpoint.submit(endpoint_name, pipeline_version="0")
Distributioner är inte versionshanterade i batchslutpunkter. Du kan dock distribuera flera pipelinekomponenter under samma slutpunkt. I den meningen motsvarar varje pipelineversion i v1 en annan pipelinekomponentversion och dess motsvarande distribution under slutpunkten.
Sedan kan du distribuera en specifik distribution som körs under slutpunkten om distributionen kör den version som du är intresserad av.
job = ml_client.batch_endpoints.invoke(
endpoint_name=endpoint_name,
deployment_name=deployment_name,
)
Få alla pipelines distribuerade
all_pipelines = PublishedPipeline.get_all(ws)
Följande kod visar alla slutpunkter som finns på arbetsytan:
all_endpoints = ml_client.batch_endpoints.list()
Tänk dock på att batchslutpunkter kan vara värdar för distributioner som operationaliserar antingen pipelines eller modeller. Om du vill få en lista över alla distributioner som är värdar för pipelines kan du göra så här:
all_deployments = []
for endpoint in all_endpoints:
all_deployments.extend(ml_client.batch_deployments.list(endpoint_name=endpoint.name))
all_pipeline_deployments = filter(all_endpoints, lamdba x: x is BatchPipelineComponentDeployment)
Använda REST API
Du kan skapa jobb från slutpunkterna med hjälp av REST-API:et för anrops-URL:en. Se följande exempel för att se hur anrop har ändrats från v1 till v2.
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name=endpoint_name)
rest_endpoint = pipeline_endpoint.endpoint
response = requests.post(
rest_endpoint,
headers=aad_token,
json={
"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"argument1": "united", "argument2":45}
}
)
Batch-slutpunkter stöder flera indatatyper. I följande exempel visas hur du anger två olika indata av typen string och numeric. Mer detaljerade exempel finns i Skapa jobb och indata för batchslutpunkter (REST ):
batch_endpoint = ml_client.batch_endpoints.get(endpoint_name)
rest_endpoint = batch_endpoint.invocation_url
response = requests.post(
rest_endpoint,
headers=aad_token,
json={
"properties": {
"InputData": {
"argument1": {
"JobInputType": "Literal",
"Value": "united"
},
"argument2": {
"JobInputType": "Literal",
"Value": 45
}
}
}
}
)
Nästa steg