Dela via


Snabbstart: Skapa en app med Durable Task SDK:er och Durable Task Scheduler (förhandsversion)

SDK:er för varaktiga uppgifter tillhandahåller ett enkelt klientbibliotek för Durable Task Scheduler. I den här snabbstartsguiden får du lära dig hur du skapar orkestreringar som använder fan-out/fan-in-applikationsmönstret för att utföra parallell bearbetning.

Viktigt!

För närvarande är SDK:erna för varaktiga uppgifter inte tillgängliga för JavaScript och PowerShell.

Viktigt!

För närvarande är SDK:erna för varaktiga uppgifter inte tillgängliga för JavaScript och PowerShell.

  • Konfigurera och kör Durable Task Scheduler-emulatorn för lokal utveckling.
  • Kör arbetar- och klientprojekten.
  • Granska orkestreringsstatus och historik via instrumentpanelen Durable Task Scheduler.

Förutsättningar

Innan du börjar:

Konfigurera Durable Task Scheduler-emulatorn

Programkoden söker efter en distribuerad schemaläggnings- och uppgiftshubbresurs. Om ingen hittas återgår koden till emulatorn. Emulatorn simulerar en schemaläggare och en aktivitetshubb i en Docker-container, vilket gör den idealisk för den lokala utveckling som krävs i den här snabbstarten.

  1. Navigera från rotkatalogen till exempelkatalogen för .NET SDK.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Hämta Docker-avbildningen för emulatorn.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Kör emulatorn. Det kan ta några sekunder innan containern är klar.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Eftersom exempelkoden automatiskt använder standardinställningarna för emulatorn behöver du inte ange några miljövariabler. Standardinställningarna för emulatorn för den här snabbstarten är:

  • Slutpunkt: http://localhost:8080
  • Aktivitetshubben: default
  1. Gå till Python SDK-exempelkatalogen från rotkatalogen Azure-Samples/Durable-Task-Scheduler .

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Hämta Docker-avbildningen för emulatorn.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Kör emulatorn. Det kan ta några sekunder innan containern är klar.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Eftersom exempelkoden automatiskt använder standardinställningarna för emulatorn behöver du inte ange några miljövariabler. Standardinställningarna för emulatorn för den här snabbstarten är:

  • Slutpunkt: http://localhost:8080
  • Aktivitetshubben: default
  1. Gå till Java SDK-exempelkatalogen Azure-Samples/Durable-Task-Scheduler från rotkatalogen.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Hämta Docker-avbildningen för emulatorn.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Kör emulatorn. Det kan ta några sekunder innan containern är klar.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Eftersom exempelkoden automatiskt använder standardinställningarna för emulatorn behöver du inte ange några miljövariabler. Standardinställningarna för emulatorn för den här snabbstarten är:

  • Slutpunkt: http://localhost:8080
  • Aktivitetshubben: default

Kör snabbstarten

  1. Från katalogen FanOutFanIn går du till Worker katalogen för att skapa och köra arbetaren.

    cd Worker
    dotnet build
    dotnet run
    
  2. I en separat terminal, från FanOutFanIn katalogen, navigerar du till Client katalogen för att skapa och köra klienten.

    cd Client
    dotnet build
    dotnet run
    

Förstå utdata

När du kör det här exemplet får du utdata från både arbets- och klientprocesserna. Packa upp det som hände i koden när du körde projektet.

Arbetsproduktivitet

Arbetsutdata visar:

  • Registrering av orkestreraren och aktiviteter
  • Loggposter när varje aktivitet anropas
  • Parallell bearbetning av flera arbetsobjekt
  • Slutlig sammanställning av resultat

Klientens resultat

Klientutdata visar:

  • Orkestreringen börjar med en lista över arbetsobjekt
  • Det unika orkestreringsinstans-ID:t
  • De slutliga aggregerade resultaten, som visar varje arbetsobjekt och dess motsvarande resultat
  • Totalt antal bearbetade objekt

Exempel på utdata

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Aktivera en virtuell Python-miljö.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Installera de paket som krävs.

    pip install -r requirements.txt
    
  3. Starta arbetaren.

    python worker.py
    

    Förväntade utdata

    Du kan se utdata som visar att arbetaren har startat och väntar på arbetsuppgifter.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. I en ny terminal aktiverar du den virtuella miljön och kör klienten.

    venv/Scripts/activate
    python client.py
    

    Du kan ange antalet arbetsobjekt som argument. Om inget argument anges körs 10 objekt som standard i exemplet.

    python client.py [number_of_items]
    

Förstå utdata

När du kör det här exemplet får du utdata från både arbets- och klientprocesserna. Packa upp det som hände i koden när du körde projektet.

Arbetsproduktivitet

Arbetsutdata visar:

  • Registrering av orkestreraren och aktiviteter.
  • Statusmeddelanden vid bearbetning av varje arbetsobjekt parallellt, som visar att de körs samtidigt.
  • Slumpmässiga fördröjningar för varje arbetsobjekt (mellan 0,5 och 2 sekunder) för att simulera varierande bearbetningstider.
  • Ett slutligt meddelande som visar sammanställningen av resultat.

Klientens resultat

Klientutdata visar:

  • Orkestreringen börjar med det angivna antalet arbetsobjekt.
  • Det unika orkestreringsinstans-ID:t.
  • Det slutliga aggregerade resultatet, som innehåller:
    • Det totala antalet bearbetade objekt
    • Summan av alla resultat (varje objektresultat är kvadraten för dess värde)
    • Medelvärdet av alla resultat

Exempel på utdata

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

Från katalogen fan-out-fan-in skapar och kör du programmet med Hjälp av Gradle.

./gradlew runFanOutFanInPattern

Tips/Råd

Om du får felmeddelandet zsh: permission denied: ./gradlewkan du prova att köra chmod +x gradlew innan du kör programmet.

Förstå utdata

När du kör det här exemplet får du utdata som visar:

  • Registrering av orkestreraren och aktiviteter.
  • Statusmeddelanden vid bearbetning av varje arbetsobjekt parallellt, som visar att de körs samtidigt.
  • Slumpmässiga fördröjningar för varje arbetsobjekt (mellan 0,5 och 2 sekunder) för att simulera varierande bearbetningstider.
  • Ett slutligt meddelande som visar sammanställningen av resultat.

Packa upp det som hände i koden när du körde projektet.

Exempel på utdata

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

Nu när du har kört projektet lokalt kan du nu lära dig hur du distribuerar till Azure som finns i Azure Container Apps.

Visa orkestreringsstatus och historik

Du kan visa orkestreringsstatus och historik via instrumentpanelen Durable Task Scheduler. Som standard körs instrumentpanelen på port 8082.

  1. Navigera till http://localhost:8082 i webbläsaren.
  2. Klicka på standard aktivitetshubben. Orkestreringsinstansen som du skapade finns i listan.
  3. Klicka på orkestreringsinstans-ID:t för att visa körningsinformationen, som omfattar:
    • Parallell körning av flera aktivitetsaktiviteter
    • Aggregeringssteget för insamlare
    • Indata och utdata i varje steg
    • Den tid det tar för varje steg

Skärmbild som visar orkestreringsinstansens information för .NET-exemplet.

Skärmbild som visar orkestreringsinstansens information för Python-exemplet.

Skärmbild som visar orkestreringsinstansens information för Java-exemplet.

Förstå kodstrukturen

Arbetsprojektet

För att demonstrera fan-out/fan-in-mönstret skapar orkestreringen av arbetsprocessen parallella aktiviteter och väntar på att alla ska slutföras. Dirigeraren:

  1. Tar en lista över arbetsobjekt som indata.
  2. Sprider ut sig genom att skapa en separat uppgift för varje arbetsuppgift med hjälp av ProcessWorkItemActivity.
  3. Kör alla uppgifter parallellt.
  4. Väntar på att alla uppgifter ska slutföras med hjälp av Task.WhenAll.
  5. Samlar in genom att aggregera alla enskilda resultat med hjälp av AggregateResultsActivity.
  6. Returnerar det slutliga aggregerade resultatet till klienten.

Arbetsprojektet innehåller:

  • ParallelProcessingOrchestration.cs: Definierar orkestrerings- och aktivitetsfunktionerna i en enda fil.
  • Program.cs: Konfigurerar arbetsvärden med korrekt hantering av anslutningssträngar.

ParallelProcessingOrchestration.cs

Med fan-out/fan-in skapar orkestreringen parallella uppgifter och väntar på att alla ska slutföras.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Varje aktivitet implementeras som en separat klass som är dekorerad med [DurableTask] attributet .

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

Arbetaren använder Microsoft.Extensions.Hosting för korrekt livscykelhantering.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

Klientprojektet

Klientprojektet:

  • Använder samma anslutningssträngslogik som arbetaren.
  • Skapar en lista över arbetsobjekt som ska bearbetas parallellt.
  • Schemalägger en orkestreringsinstans med listan som indata.
  • Väntar på att orkestreringen ska slutföras och visar de aggregerade resultaten.
  • "WaitForInstanceCompletionAsync används för effektiv avsökning."
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

För att demonstrera fan-out/fan-in-mönstret skapar orkestreringen av arbetsprocessen parallella aktiviteter och väntar på att alla ska slutföras. Dirigeraren:

  1. Tar emot en lista över arbetsobjekt som indata.
  2. Den "fläktar ut" genom att skapa parallella uppgifter för varje arbetsobjekt (anropa process_work_item för var och en).
  3. Den väntar på att alla uppgifter ska slutföras med hjälp av task.when_all.
  4. Den samlar sedan in genom att aggregera resultaten med aggregate_results-aktiviteten.
  5. Det slutliga aggregerade resultatet returneras till klienten.

Med fan-out/fan-in skapar orkestreringen parallella uppgifter och väntar på att alla ska slutföras.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

Klientprojektet:

  • Använder samma anslutningssträngslogik som arbetaren.
  • Skapar en lista över arbetsobjekt som ska bearbetas parallellt.
  • Schemalägger en orkestreringsinstans med listan som indata.
  • Väntar på att orkestreringen ska slutföras och visar de aggregerade resultaten.
  • "wait_for_orchestration_completion används för effektiv avsökning."
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

För att demonstrera fan-out/fan-in-mönstret skapar projektorkestreringen parallella aktivitetsuppgifter och väntar på att alla ska slutföras. Dirigeraren:

  1. Tar en lista över arbetsobjekt som indata.
  2. Sprider ut sig genom att skapa en separat uppgift för varje arbetsobjekt.
  3. Kör alla uppgifter parallellt.
  4. Väntar tills alla uppgifter har slutförts med hjälp av ''.
  5. Samlar fans genom att aggregera alla individuella resultat med hjälp av "".
  6. Returnerar det slutliga aggregerade resultatet till klienten.

Projektet innehåller:

  • DurableTaskSchedulerWorkerExtensions worker: Definierar orkestrerings- och aktivitetsfunktionerna.
  • DurableTaskSchedulerClientExtension klient: Konfigurerar arbetarvärd med korrekt hantering av anslutningssträngar.

Arbetare

Med fan-out/fan-in skapar orkestreringen parallella uppgifter och väntar på att alla ska slutföras.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Klient

Klientprojektet:

  • Använder samma anslutningssträngslogik som arbetaren.
  • Skapar en lista över arbetsobjekt som ska bearbetas parallellt.
  • Schemalägger en orkestreringsinstans med listan som indata.
  • Väntar på att orkestreringen ska slutföras och visar de aggregerade resultaten.
  • "waitForInstanceCompletion används för effektiv avsökning."
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Nästa steg

Nu när du har kört exemplet lokalt med hjälp av Durable Task Scheduler-emulatorn kan du prova att skapa en scheduler- och aktivitetshubbresurs och distribuera till Azure Container Apps.