Microsoft代理框架工作流编排 - 顺序

在顺序编排中,代理程序按流水线组织。 每个代理反过来处理任务,将其输出传递给序列中的下一个代理。 对于每个步骤基于上一步(例如文档审阅、数据处理管道或多阶段推理)构建的工作流来说,这是理想的选择。

顺序编排

学习内容

  • 如何创建一个顺序的代理流水线
  • 如何链接代理,其中每个代理在上一个输出的基础上生成
  • 如何将代理与专用任务的自定义执行程序混合使用
  • 如何跟踪通过管道的会话流

定义代理

在顺序业务流程中,代理组织在管道中,每个代理依次处理任务,并将输出传递到序列中的下一个代理。

设置 Azure OpenAI 客户端

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

创建将按顺序工作的专用代理:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

设置顺序编排

使用 AgentWorkflowBuilder以下命令生成工作流:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

运行顺序工作流

执行工作流并处理事件:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is AgentRunUpdateEvent e)
    {
        Console.WriteLine($"{e.ExecutorId}: {e.Data}");
    }
    else if (evt is WorkflowCompletedEvent completed)
    {
        result = (List<ChatMessage>)completed.Data!;
        break;
    }
}

// Display final result
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Content}");
}

示例输出

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

关键概念

  • 顺序处理:每个代理按顺序处理上一个代理的输出
  • AgentWorkflowBuilder.BuildSequential():从代理集合创建管道工作流
  • ChatClientAgent:表示由聊天客户端支持的代理,其中包含特定说明
  • StreamingRun:使用事件流式处理功能提供实时执行
  • 事件处理:通过 AgentRunUpdateEvent 监视代理进度,通过 WorkflowCompletedEvent 监视其完成情况

在顺序业务流程中,每个代理依次处理任务,输出从一个流向下一个。 首先,为两个阶段过程定义代理:

from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())

writer = chat_client.create_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.create_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

设置顺序编排

SequentialBuilder 类创建一个管道,其中代理按顺序处理任务。 每个代理都会看到完整的对话历史记录并添加其响应:

from agent_framework import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()

运行顺序工作流

执行工作流并收集显示每个代理贡献的最终对话:

from agent_framework import ChatMessage, WorkflowCompletedEvent

# 3) Run and print final conversation
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("Write a tagline for a budget-friendly eBike."):
    if isinstance(event, WorkflowCompletedEvent):
        completion = event

if completion:
    print("===== Final Conversation =====")
    messages: list[ChatMessage] | Any = completion.data
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

示例输出

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

高级:将代理与自定义执行程序混合

顺序业务流程支持将代理与自定义执行程序混合,以便进行专用处理。 当你需要不需要 LLM 的自定义逻辑时,这非常有用:

定义自定义执行程序

from agent_framework import Executor, WorkflowContext, handler
from agent_framework import ChatMessage, Role

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        conversation: list[ChatMessage],
        ctx: WorkflowContext[list[ChatMessage]]
    ) -> None:
        users = sum(1 for m in conversation if m.role == Role.USER)
        assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT)
        summary = ChatMessage(
            role=Role.ASSISTANT,
            text=f"Summary -> users:{users} assistants:{assistants}"
        )
        await ctx.send_message(list(conversation) + [summary])

生成混合顺序工作流

# Create a content agent
content = chat_client.create_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder().participants([content, summarizer]).build()

使用自定义执行程序的示例输出

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

关键概念

  • 共享上下文:每个参与者都会收到完整的对话历史记录,包括所有以前的消息
  • 订单事项:代理严格按照列表中的指定 participants() 顺序执行
  • 灵活参与者:可以按任意顺序混合代理和自定义执行程序
  • 对话流:每个代理/执行器附加到会话中,构建完整的对话

后续步骤