Dela via


Snabbstart: Vara värd för en Durable Task SDK-app i Azure Container Apps (förhandsversion)

Viktigt!

För närvarande är SDK:erna för varaktiga uppgifter inte tillgängliga för JavaScript och PowerShell.

Viktigt!

För närvarande är SDK:erna för varaktiga uppgifter inte tillgängliga för JavaScript och PowerShell.

I den här snabbstarten lär du dig att:

  • Konfigurera och kör Durable Task Scheduler-emulatorn för lokal utveckling.
  • Kör arbetar- och klientprojekten.
  • Kontrollera Azure Container Apps-loggarna.
  • Granska orkestreringsstatus och historik via instrumentpanelen Durable Task Scheduler.

Förutsättningar

Innan du börjar:

Förbereda projektet

I ett nytt terminalfönster, gå från katalogen Azure-Samples/Durable-Task-Scheduler in i exempelkatalogen.

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining

Distribuera med Hjälp av Azure Developer CLI

  1. Kör azd up för att etablera infrastrukturen och distribuera programmet till Azure Container Apps i ett enda kommando.

    azd up
    
  2. Ange följande parametrar när du uppmanas till det i terminalen.

    Parameter Beskrivning
    Miljönamn Prefix för resursgruppen som skapats för att lagra alla Azure-resurser.
    Azure-plats Azure-platsen för dina resurser.
    Azure-prenumeration Azure-prenumerationen för dina resurser.

    Den här processen kan ta lite tid att slutföra. azd up När kommandot har slutförts visar CLI-utdata två Azure Portal länkar för att övervaka distributionens förlopp. Utdata visar också hur azd up:

    • Skapar och konfigurerar alla nödvändiga Azure-resurser via de angivna Bicep-filerna i ./infra katalogen med azd provision. När du har tillhandahållits av Azure Developer CLI kan du komma åt dessa resurser via Azure-portalen. De filer som tillhandahåller Azure-resurserna är:
      • main.parameters.json
      • main.bicep
      • En app resurskatalog ordnad efter funktion
      • Ett core referensbibliotek som innehåller de Bicep-moduler som används av mallen azd
    • Distribuerar koden med hjälp av azd deploy

    Förväntad utdata

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure Portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Bekräfta lyckad distribution

I Azure-portalen, kontrollera att orkestreringarna körs framgångsrikt.

  1. Kopiera resursgruppens namn från terminalutdata.

  2. Logga in på Azure-portalen och sök efter resursgruppens namn.

  3. På översiktssidan för resursgruppen klickar du på resursen för klientcontainerappen.

  4. Välj Övervakning>Loggström.

  1. Bekräfta att klientcontainern loggar funktionslänkningsuppgifterna.

    Skärmbild av klientcontainerns loggström i Azure-portalen.

  2. Gå tillbaka till resursgruppssidan för att välja containern worker .

  3. Välj Övervakning>Loggström.

  4. Bekräfta att arbetscontainern loggar funktionslänkningsuppgifterna.

    Skärmbild av arbetscontainerns loggström i Azure-portalen.

  1. Bekräfta att exempelcontainerappen loggar uppgifter för funktionskedjor.

    Skärmbild av Java-exempelappens loggström i Azure-portalen.

Förstå koden

Klientprojekt

Klientprojektet:

  • Använder samma anslutningssträngslogik som arbetaren
  • Implementerar en sekventiell orkestreringsschemaläggare som:
    • Schemalägger 20 orkestreringsinstanser, en åt gången
    • Väntar 5 sekunder mellan schemaläggningen av varje orkestrering
    • Spårar samtliga orkestreringsinstanser i listan
    • Väntar på att alla orkestreringar ska slutföras innan processen avslutas.
  • Använder standardloggning för att visa förlopp och resultat
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

Arbetarprojekt

Worker-projektet innehåller:

  • GreetingOrchestration.cs: Definierar orkestrerings- och aktivitetsfunktionerna i en enda fil
  • Program.cs: Konfigurerar tjänstvärden med korrekt hantering av anslutningssträngar

Orkestreringsimplementering

Orkestreringen anropar direkt varje aktivitet i sekvens med hjälp av standardmetoden CallActivityAsync :

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Varje aktivitet implementeras som en separat klass som är dekorerad med [DurableTask] attributet:

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

Arbetaren använder Microsoft.Extensions.Hosting för korrekt livscykelhantering:

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Klient

Klientprojektet:

  • Använder samma anslutningssträngslogik som arbetaren
  • Implementerar en sekventiell orkestreringsschemaläggare som:
    • Schemalägger 20 orkestreringsinstanser, en åt gången
    • Väntar 5 sekunder mellan schemaläggningen av varje orkestrering
    • Spårar samtliga orkestreringsinstanser i listan
    • Väntar på att alla orkestreringar ska slutföras innan processen avslutas.
  • Använder standardloggning för att visa förlopp och resultat
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Arbetare

Orkestreringsimplementering

Orkestreringen anropar direkt varje aktivitet i sekvens med hjälp av standardfunktionen call_activity :

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Varje aktivitet implementeras som en separat funktion:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

Arbetaren använder DurableTaskSchedulerWorker för korrekt livscykelhantering:

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

Exempelcontainerappen innehåller både arbets- och klientkoden.

Klient

Klientkoden:

  • Använder samma anslutningssträngslogik som arbetaren
  • Implementerar en sekventiell orkestreringsschemaläggare som:
    • Schemalägger 20 orkestreringsinstanser, en åt gången
    • Väntar 5 sekunder mellan schemaläggningen av varje orkestrering
    • Spårar samtliga orkestreringsinstanser i listan
    • Väntar på att alla orkestreringar ska slutföras innan processen avslutas.
  • Använder standardloggning för att visa förlopp och resultat
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Arbetare

Orkestreringen anropar direkt varje aktivitet i sekvens med hjälp av standardmetoden callActivity :

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Nästa steg