Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
SDK:er för varaktiga uppgifter tillhandahåller ett enkelt klientbibliotek för Durable Task Scheduler. I den här snabbstartsguiden får du lära dig hur du skapar orkestreringar som använder fan-out/fan-in-applikationsmönstret för att utföra parallell bearbetning.
Viktigt!
För närvarande är SDK:erna för varaktiga uppgifter inte tillgängliga för JavaScript och PowerShell.
Viktigt!
För närvarande är SDK:erna för varaktiga uppgifter inte tillgängliga för JavaScript och PowerShell.
- Konfigurera och kör Durable Task Scheduler-emulatorn för lokal utveckling.
- Kör arbetar- och klientprojekten.
- Granska orkestreringsstatus och historik via instrumentpanelen Durable Task Scheduler.
Förutsättningar
Innan du börjar:
- Kontrollera att du har .NET 8 SDK eller senare.
- Installera Docker för att köra emulatorn.
- Klona GitHub-lagringsplatsen durable task scheduler för att använda snabbstartsexemplet.
- Kontrollera att du har Python 3.9+ eller senare.
- Installera Docker för att köra emulatorn.
- Klona GitHub-lagringsplatsen durable task scheduler för att använda snabbstartsexemplet.
- Kontrollera att du har Java 8 eller 11.
- Installera Docker för att köra emulatorn.
- Klona GitHub-lagringsplatsen durable task scheduler för att använda snabbstartsexemplet.
Konfigurera Durable Task Scheduler-emulatorn
Programkoden söker efter en distribuerad schemaläggnings- och uppgiftshubbresurs. Om ingen hittas återgår koden till emulatorn. Emulatorn simulerar en schemaläggare och en aktivitetshubb i en Docker-container, vilket gör den idealisk för den lokala utveckling som krävs i den här snabbstarten.
- Navigera från rotkatalogen till exempelkatalogen för .NET SDK. - cd samples/durable-task-sdks/dotnet/FanOutFanIn
- Hämta Docker-avbildningen för emulatorn. - docker pull mcr.microsoft.com/dts/dts-emulator:latest
- Kör emulatorn. Det kan ta några sekunder innan containern är klar. - docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Eftersom exempelkoden automatiskt använder standardinställningarna för emulatorn behöver du inte ange några miljövariabler. Standardinställningarna för emulatorn för den här snabbstarten är:
- Slutpunkt: http://localhost:8080
- Aktivitetshubben: default
- Gå till Python SDK-exempelkatalogen från rotkatalogen - Azure-Samples/Durable-Task-Scheduler.- cd samples/durable-task-sdks/python/fan-out-fan-in
- Hämta Docker-avbildningen för emulatorn. - docker pull mcr.microsoft.com/dts/dts-emulator:latest
- Kör emulatorn. Det kan ta några sekunder innan containern är klar. - docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Eftersom exempelkoden automatiskt använder standardinställningarna för emulatorn behöver du inte ange några miljövariabler. Standardinställningarna för emulatorn för den här snabbstarten är:
- Slutpunkt: http://localhost:8080
- Aktivitetshubben: default
- Gå till Java SDK-exempelkatalogen - Azure-Samples/Durable-Task-Schedulerfrån rotkatalogen.- cd samples/durable-task-sdks/java/fan-out-fan-in
- Hämta Docker-avbildningen för emulatorn. - docker pull mcr.microsoft.com/dts/dts-emulator:latest
- Kör emulatorn. Det kan ta några sekunder innan containern är klar. - docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Eftersom exempelkoden automatiskt använder standardinställningarna för emulatorn behöver du inte ange några miljövariabler. Standardinställningarna för emulatorn för den här snabbstarten är:
- Slutpunkt: http://localhost:8080
- Aktivitetshubben: default
Kör snabbstarten
- Från katalogen - FanOutFanIngår du till- Workerkatalogen för att skapa och köra arbetaren.- cd Worker dotnet build dotnet run
- I en separat terminal, från - FanOutFanInkatalogen, navigerar du till- Clientkatalogen för att skapa och köra klienten.- cd Client dotnet build dotnet run
Förstå utdata
När du kör det här exemplet får du utdata från både arbets- och klientprocesserna. Packa upp det som hände i koden när du körde projektet.
Arbetsproduktivitet
Arbetsutdata visar:
- Registrering av orkestreraren och aktiviteter
- Loggposter när varje aktivitet anropas
- Parallell bearbetning av flera arbetsobjekt
- Slutlig sammanställning av resultat
Klientens resultat
Klientutdata visar:
- Orkestreringen börjar med en lista över arbetsobjekt
- Det unika orkestreringsinstans-ID:t
- De slutliga aggregerade resultaten, som visar varje arbetsobjekt och dess motsvarande resultat
- Totalt antal bearbetade objekt
Exempel på utdata
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
- Aktivera en virtuell Python-miljö. 
- Installera de paket som krävs. - pip install -r requirements.txt
- Starta arbetaren. - python worker.py- Förväntade utdata - Du kan se utdata som visar att arbetaren har startat och väntar på arbetsuppgifter. - Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...
- I en ny terminal aktiverar du den virtuella miljön och kör klienten. - Du kan ange antalet arbetsobjekt som argument. Om inget argument anges körs 10 objekt som standard i exemplet. - python client.py [number_of_items]
Förstå utdata
När du kör det här exemplet får du utdata från både arbets- och klientprocesserna. Packa upp det som hände i koden när du körde projektet.
Arbetsproduktivitet
Arbetsutdata visar:
- Registrering av orkestreraren och aktiviteter.
- Statusmeddelanden vid bearbetning av varje arbetsobjekt parallellt, som visar att de körs samtidigt.
- Slumpmässiga fördröjningar för varje arbetsobjekt (mellan 0,5 och 2 sekunder) för att simulera varierande bearbetningstider.
- Ett slutligt meddelande som visar sammanställningen av resultat.
Klientens resultat
Klientutdata visar:
- Orkestreringen börjar med det angivna antalet arbetsobjekt.
- Det unika orkestreringsinstans-ID:t.
- Det slutliga aggregerade resultatet, som innehåller: - Det totala antalet bearbetade objekt
- Summan av alla resultat (varje objektresultat är kvadraten för dess värde)
- Medelvärdet av alla resultat
 
Exempel på utdata
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
Från katalogen fan-out-fan-in skapar och kör du programmet med Hjälp av Gradle.
./gradlew runFanOutFanInPattern
Tips/Råd
Om du får felmeddelandet zsh: permission denied: ./gradlewkan du prova att köra chmod +x gradlew innan du kör programmet.
Förstå utdata
När du kör det här exemplet får du utdata som visar:
- Registrering av orkestreraren och aktiviteter.
- Statusmeddelanden vid bearbetning av varje arbetsobjekt parallellt, som visar att de körs samtidigt.
- Slumpmässiga fördröjningar för varje arbetsobjekt (mellan 0,5 och 2 sekunder) för att simulera varierande bearbetningstider.
- Ett slutligt meddelande som visar sammanställningen av resultat.
Packa upp det som hände i koden när du körde projektet.
Exempel på utdata
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Nu när du har kört projektet lokalt kan du nu lära dig hur du distribuerar till Azure som finns i Azure Container Apps.
Visa orkestreringsstatus och historik
Du kan visa orkestreringsstatus och historik via instrumentpanelen Durable Task Scheduler. Som standard körs instrumentpanelen på port 8082.
- Navigera till http://localhost:8082 i webbläsaren.
- Klicka på standard aktivitetshubben. Orkestreringsinstansen som du skapade finns i listan.
- Klicka på orkestreringsinstans-ID:t för att visa körningsinformationen, som omfattar: - Parallell körning av flera aktivitetsaktiviteter
- Aggregeringssteget för insamlare
- Indata och utdata i varje steg
- Den tid det tar för varje steg
 
              
               
              
              
            
              
               
              
              
            
              
               
              
              
            
Förstå kodstrukturen
Arbetsprojektet
För att demonstrera fan-out/fan-in-mönstret skapar orkestreringen av arbetsprocessen parallella aktiviteter och väntar på att alla ska slutföras. Dirigeraren:
- Tar en lista över arbetsobjekt som indata.
- Sprider ut sig genom att skapa en separat uppgift för varje arbetsuppgift med hjälp av ProcessWorkItemActivity.
- Kör alla uppgifter parallellt.
- Väntar på att alla uppgifter ska slutföras med hjälp av Task.WhenAll.
- Samlar in genom att aggregera alla enskilda resultat med hjälp av AggregateResultsActivity.
- Returnerar det slutliga aggregerade resultatet till klienten.
Arbetsprojektet innehåller:
- ParallelProcessingOrchestration.cs: Definierar orkestrerings- och aktivitetsfunktionerna i en enda fil.
- Program.cs: Konfigurerar arbetsvärden med korrekt hantering av anslutningssträngar.
ParallelProcessingOrchestration.cs
Med fan-out/fan-in skapar orkestreringen parallella uppgifter och väntar på att alla ska slutföras.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }
    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);
    return aggregatedResults;
}
Varje aktivitet implementeras som en separat klass som är dekorerad med [DurableTask] attributet .
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}
Program.cs
Arbetaren använder Microsoft.Extensions.Hosting för korrekt livscykelhantering.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);
Klientprojektet
Klientprojektet:
- Använder samma anslutningssträngslogik som arbetaren.
- Skapar en lista över arbetsobjekt som ska bearbetas parallellt.
- Schemalägger en orkestreringsinstans med listan som indata.
- Väntar på att orkestreringen ska slutföras och visar de aggregerade resultaten.
- "WaitForInstanceCompletionAsyncanvänds för effektiv avsökning."
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);
worker.py
För att demonstrera fan-out/fan-in-mönstret skapar orkestreringen av arbetsprocessen parallella aktiviteter och väntar på att alla ska slutföras. Dirigeraren:
- Tar emot en lista över arbetsobjekt som indata.
- Den "fläktar ut" genom att skapa parallella uppgifter för varje arbetsobjekt (anropa process_work_itemför var och en).
- Den väntar på att alla uppgifter ska slutföras med hjälp av task.when_all.
- Den samlar sedan in genom att aggregera resultaten med aggregate_results-aktiviteten.
- Det slutliga aggregerade resultatet returneras till klienten.
Med fan-out/fan-in skapar orkestreringen parallella uppgifter och väntar på att alla ska slutföras.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)
    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)
    return final_result
client.py
Klientprojektet:
- Använder samma anslutningssträngslogik som arbetaren.
- Skapar en lista över arbetsobjekt som ska bearbetas parallellt.
- Schemalägger en orkestreringsinstans med listan som indata.
- Väntar på att orkestreringen ska slutföras och visar de aggregerade resultaten.
- "wait_for_orchestration_completionanvänds för effektiv avsökning."
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)
För att demonstrera fan-out/fan-in-mönstret skapar projektorkestreringen parallella aktivitetsuppgifter och väntar på att alla ska slutföras. Dirigeraren:
- Tar en lista över arbetsobjekt som indata.
- Sprider ut sig genom att skapa en separat uppgift för varje arbetsobjekt.
- Kör alla uppgifter parallellt.
- Väntar tills alla uppgifter har slutförts med hjälp av ''.
- Samlar fans genom att aggregera alla individuella resultat med hjälp av "".
- Returnerar det slutliga aggregerade resultatet till klienten.
Projektet innehåller:
- 
              
              DurableTaskSchedulerWorkerExtensionsworker: Definierar orkestrerings- och aktivitetsfunktionerna.
- 
              
              DurableTaskSchedulerClientExtensionklient: Konfigurerar arbetarvärd med korrekt hantering av anslutningssträngar.
Arbetare
Med fan-out/fan-in skapar orkestreringen parallella uppgifter och väntar på att alla ska slutföras.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }
        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }
        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();
// Start the worker
worker.start();
Klient
Klientprojektet:
- Använder samma anslutningssträngslogik som arbetaren.
- Skapar en lista över arbetsobjekt som ska bearbetas parallellt.
- Schemalägger en orkestreringsinstans med listan som indata.
- Väntar på att orkestreringen ska slutföras och visar de aggregerade resultaten.
- "waitForInstanceCompletionanvänds för effektiv avsökning."
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Nästa steg
Nu när du har kört exemplet lokalt med hjälp av Durable Task Scheduler-emulatorn kan du prova att skapa en scheduler- och aktivitetshubbresurs och distribuera till Azure Container Apps.