创建简单的并发工作流

本教程演示如何使用代理框架创建并发工作流。 你将学习如何实现扇出模式和扇入模式,这些模式支持并行处理,允许多个执行程序或代理同时工作,然后聚合其结果。

你将构建的内容

你将创建一个工作流,该工作流:

  • 将问题作为输入(例如,“什么是温度?”)
  • 同时向两位专家 AI 代理发送相同的问题(物理学家和化学家)
  • 收集两个代理的响应并将其合并到单个输出中
  • 使用扇出/扇入模式演示 AI 代理的并发执行

先决条件

步骤 1:安装 NuGet 包

首先,安装 .NET 项目所需的包:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

步骤 2:设置依赖项和 Azure OpenAI

首先,使用所需的 NuGet 包和 Azure OpenAI 客户端设置项目:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

步骤 3:创建专家 AI 代理

创建两个专用 AI 代理,提供专家观点:

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

步骤 4:创建启动执行程序

创建一个执行程序,该执行程序通过向多个代理发送输入来启动并发处理:

        var startExecutor = new ConcurrentStartExecutor();

ConcurrentStartExecutor 实现:

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() :
    Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

步骤 5:创建聚合执行程序

创建一个执行程序,用于收集和合并来自多个代理的响应:

        var aggregationExecutor = new ConcurrentAggregationExecutor();

ConcurrentAggregationExecutor 实现:

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<ChatMessage>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.Add(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

步骤 6:生成工作流

使用扇出模式和扇入边缘模式连接执行器和代理:

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

步骤 7:执行工作流

运行工作流,并捕获流输出。

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

工作原理

  1. 扇出ConcurrentStartExecutor接收输入问题,扇出接口将其同时发送给物理学家和化学家代理。
  2. 并行处理:两个 AI 代理同时处理相同的问题,每个代理都提供其专家视角。
  3. 扇入ConcurrentAggregationExecutor从两个代理ChatMessage收集响应。
  4. 聚合:收到这两个响应后,聚合器将它们合并为格式化输出。

关键概念

  • Fan-Out 边:使用AddFanOutEdge()来将相同的输入分发到多个执行器或代理。
  • Fan-In 边缘:使用 AddFanInEdge() 从多个源执行器收集结果。
  • AI 代理集成:可以将 AI 代理直接用作工作流中的执行程序。
  • 执行程序基类:自定义执行程序继承自 Executor<TInput> 并重写 HandleAsync 方法。
  • 轮次令牌:使用TurnToken来向代理发出信号,让他们开始处理排队的消息。
  • 流式执行:用于 StreamAsync() 在工作流进行时获取实时更新。

完成实现

有关此并发工作流与 AI 代理的完整工作实现,请参阅 Agent Framework 存储库中的 Concurrent/Program.cs 示例。

在 Python 实现中,你将生成一个并发工作流,该工作流通过多个并行执行程序处理数据,并聚合不同类型的结果。 此示例演示框架如何处理并发处理中的混合结果类型。

你将构建的内容

你将创建一个工作流,该工作流:

  • 将数字列表作为输入
  • 将列表分配到两个并行执行程序(一个计算平均值,一个计算总和)
  • 将不同的结果类型(float 和 int)聚合到最终输出中
  • 演示如何框架处理并发执行程序的不同结果类型

先决条件

  • Python 3.10 或更高版本
  • 已安装 Agent Framework Core: pip install agent-framework-core

步骤 1:导入所需的依赖项

首先从代理框架导入必要的组件:

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

步骤 2:创建调度器执行器

调度程序负责将初始输入分发到多个并行执行程序:

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

步骤 3:创建并行处理执行程序

创建两个将同时处理数据的执行程序:

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

步骤 4:创建聚合器执行程序

聚合器从并行执行程序收集结果并生成最终输出:

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

步骤 5:生成工作流

使用扇出和扇入边缘模式连接执行程序:

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

步骤 6:运行工作流

使用示例数据执行工作流并捕获输出:

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

工作原理

  1. 扇出:接收Dispatcher输入列表并将其同时发送到AverageSum执行器
  2. 并行处理:两个执行程序同时处理相同的输入,生成不同的结果类型:
    • Average 执行程序生成 float 结果
    • Sum 执行程序生成 int 结果
  3. 扇入Aggregator 接收来自两个执行器的结果,并将其作为包含两种类型的列表输出。
  4. 类型处理:框架使用联合类型自动处理不同的结果类型(int | float

关键概念

  • 分布式输出:用于 add_fan_out_edges() 向多个执行程序发送相同的输入
  • Fan-In 边:使用 add_fan_in_edges() 从多个源执行程序中收集结果
  • 联合类型:使用类型注释处理不同的结果类型,例如 list[int | float]
  • 并发执行:多个执行程序同时处理数据,提高性能

完成实现

有关此并发工作流的完整工作实现,请参阅 Agent Framework 存储库中的 aggregate_results_of_different_types.py 示例。

后续步骤