在顺序编排中,代理程序按流水线组织。 每个代理反过来处理任务,将其输出传递给序列中的下一个代理。 对于每个步骤基于上一步(例如文档审阅、数据处理管道或多阶段推理)构建的工作流来说,这是理想的选择。
学习内容
- 如何创建一个顺序的代理流水线
- 如何链接代理,其中每个代理在上一个输出的基础上生成
- 如何将代理与专用任务的自定义执行程序混合使用
- 如何跟踪通过管道的会话流
定义代理
在顺序业务流程中,代理组织在管道中,每个代理依次处理任务,并将输出传递到序列中的下一个代理。
设置 Azure OpenAI 客户端
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName)
.AsIChatClient();
创建将按顺序工作的专用代理:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
设置顺序编排
使用 AgentWorkflowBuilder以下命令生成工作流:
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
运行顺序工作流
执行工作流并处理事件:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is AgentRunUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Data}");
}
else if (evt is WorkflowCompletedEvent completed)
{
result = (List<ChatMessage>)completed.Data!;
break;
}
}
// Display final result
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Content}");
}
示例输出
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
关键概念
- 顺序处理:每个代理按顺序处理上一个代理的输出
- AgentWorkflowBuilder.BuildSequential():从代理集合创建管道工作流
- ChatClientAgent:表示由聊天客户端支持的代理,其中包含特定说明
- StreamingRun:使用事件流式处理功能提供实时执行
-
事件处理:通过
AgentRunUpdateEvent监视代理进度,通过WorkflowCompletedEvent监视其完成情况
在顺序业务流程中,每个代理依次处理任务,输出从一个流向下一个。 首先,为两个阶段过程定义代理:
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())
writer = chat_client.create_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.create_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
设置顺序编排
该 SequentialBuilder 类创建一个管道,其中代理按顺序处理任务。 每个代理都会看到完整的对话历史记录并添加其响应:
from agent_framework import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()
运行顺序工作流
执行工作流并收集显示每个代理贡献的最终对话:
from agent_framework import ChatMessage, WorkflowCompletedEvent
# 3) Run and print final conversation
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("Write a tagline for a budget-friendly eBike."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Conversation =====")
messages: list[ChatMessage] | Any = completion.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
示例输出
===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
高级:将代理与自定义执行程序混合
顺序业务流程支持将代理与自定义执行程序混合,以便进行专用处理。 当你需要不需要 LLM 的自定义逻辑时,这非常有用:
定义自定义执行程序
from agent_framework import Executor, WorkflowContext, handler
from agent_framework import ChatMessage, Role
class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
@handler
async def summarize(
self,
conversation: list[ChatMessage],
ctx: WorkflowContext[list[ChatMessage]]
) -> None:
users = sum(1 for m in conversation if m.role == Role.USER)
assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT)
summary = ChatMessage(
role=Role.ASSISTANT,
text=f"Summary -> users:{users} assistants:{assistants}"
)
await ctx.send_message(list(conversation) + [summary])
生成混合顺序工作流
# Create a content agent
content = chat_client.create_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder().participants([content, summarizer]).build()
使用自定义执行程序的示例输出
------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1
关键概念
- 共享上下文:每个参与者都会收到完整的对话历史记录,包括所有以前的消息
-
订单事项:代理严格按照列表中的指定
participants()顺序执行 - 灵活参与者:可以按任意顺序混合代理和自定义执行程序
- 对话流:每个代理/执行器附加到会话中,构建完整的对话