你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
在此快速入门中,您将学习如何:
- 设置并运行 Durable Task Scheduler 模拟器进行本地开发。
- 运行辅助角色和客户端项目。
- 检查 Azure 容器应用日志。
- 通过 Durable Task Scheduler 仪表板查看业务流程状态和历史记录。
先决条件
开始之前:
- 请确保具有 .NET 8 SDK 或更高版本。
- 安装 Docker 来运行模拟器。
- 安装 Azure Developer CLI
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
- 请确保具有 Python 3.9+ 或更高版本。
- 安装 Docker 来运行模拟器。
- 安装 Azure Developer CLI
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
- 请确保具有 Java 8 或 11。
- 安装 Docker 来运行模拟器。
- 安装 Azure Developer CLI
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
准备项目
在新的终端窗口中,从 Azure-Samples/Durable-Task-Scheduler 目录中导航到示例目录。
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
使用 Azure 开发人员 CLI 进行部署
- 运行 - azd up以预配基础结构,并通过单个命令将应用程序部署到 Azure 容器应用。- azd up
- 当终端中出现提示时,请提供以下参数。 - 参数 - DESCRIPTION - 环境名称 - 为保存所有 Azure 资源而创建的资源组的前缀。 - Azure 位置 - 资源的 Azure 位置。 - Azure 订阅 - 资源的 Azure 订阅。 - 此过程可能需要一段时间才能完成。 完成 - azd up命令后,CLI 输出将显示两个用于监视部署进度的 Azure 门户链接。 输出还演示了如何运行- azd up:- 使用 ./infra通过azd provision目录中提供的 Bicep 文件创建和配置所有必要的 Azure 资源。 Azure Developer CLI 预配这些资源后,你可以通过 Azure 门户访问这些资源。 用于预配 Azure 资源的文件包括:- main.parameters.json
- main.bicep
- 按功能组织的 app资源目录
- 一个 core参考库,其中包含azd模板使用的 Bicep 模块
 
- 使用 azd deploy部署代码
 - 预期输出- Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure Portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.
- 使用 
确认部署是否成功
在 Azure 门户中,验证业务流程是否成功运行。
- 从终端输出复制资源组名称。 
- 登录到 Azure 门户 并搜索该资源组名称。 
- 在资源组概述页中,单击客户端容器应用资源。 
- 选择“ 监视>日志流”。 
- 确认客户端容器正在记录函数链任务。   
- 导航回资源组页以选择 - worker容器。
- 选择“ 监视>日志流”。 
- 确认辅助角色容器正在记录函数链任务。   
- 确认示例容器应用正在记录函数链接任务。   
了解代码
客户项目
客户项目:
- 使用与辅助角色相同的连接字符串逻辑
- 实现一个顺序编排调度器,该调度器: - 计划 20 个业务流程实例,一次一个
- 在计划每个业务流程之间等待 5 秒
- 跟踪列表中的所有编排实例
- 在退出前等待所有业务流程完成
 
- 使用标准日志记录显示进度和结果
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";
    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);
    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}
辅助角色项目
辅助角色项目包含:
- GreetingOrchestration.cs:在单个文件中定义协调程序和活动函数
- Program.cs:设置工作主机并正确管理连接字符串
业务流程实现
业务流程使用标准 CallActivityAsync 方法按顺序直接调用每个活动:
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
    return finalResponse;
}
每个活动都实现为一个用 [DurableTask] 属性修饰的独立类。
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}
工作人员使用Microsoft.Extensions.Hosting进行适当的生命周期管理。
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
客户
客户项目:
- 使用与辅助角色相同的连接字符串逻辑
- 实现一个顺序编排调度器,该调度器: - 计划 20 个业务流程实例,一次一个
- 在计划每个业务流程之间等待 5 秒
- 跟踪列表中的所有编排实例
- 在退出前等待所有业务流程完成
 
- 使用标准日志记录显示进度和结果
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )
        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )
工人
业务流程实现
业务流程使用标准 call_activity 函数按顺序直接调用每个活动:
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")
    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)
    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
    return final_response
每个活动都作为单独的函数实现:
# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"
工作人员使用DurableTaskSchedulerWorker进行适当的生命周期管理。
with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:
    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)
    # Start the worker (without awaiting)
    worker.start()
示例容器应用包含工作器和客户端代码。
客户
客户端代码:
- 使用与辅助角色相同的连接字符串逻辑
- 实现一个顺序编排调度器,该调度器: - 计划 20 个业务流程实例,一次一个
- 在计划每个业务流程之间等待 5 秒
- 跟踪列表中的所有编排实例
- 在退出前等待所有业务流程完成
 
- 使用标准日志记录显示进度和结果
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
工人
业务流程使用标准 callActivity 方法按顺序直接调用每个活动:
DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }
        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }
        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }
        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }
        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();
// Start the worker
worker.start();