并发业务流程使多个代理能够并行处理同一任务。 每个代理独立处理输入,并收集并聚合其结果。 这种方法非常适合在集思广益、集体推理或投票系统等场景中,需要多种观点或解决方案时应用。
学习内容
- 如何定义具有不同专业知识的多个代理
- 如何协调这些代理以在单个任务上并发工作
- 如何收集和处理结果
在并发业务流程中,多个代理同时和独立地处理同一任务,提供对相同输入的不同视角。
设置 Azure OpenAI 客户端
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName)
.AsIChatClient();
定义代理
创建多个专用代理,该代理将同时处理同一任务:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
设置并发编排
使用 AgentWorkflowBuilder 生成可并行执行代理的工作流。
// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);
运行并发工作流并收集结果
从同时运行的所有代理执行工作流和处理事件:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is AgentRunUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Data}");
}
else if (evt is WorkflowCompletedEvent completed)
{
result = (List<ChatMessage>)completed.Data!;
break;
}
}
// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Content}");
}
示例输出
French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!
===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!
关键概念
- 并行执行:所有代理同时独立处理输入
- AgentWorkflowBuilder.BuildConcurrent():从代理集合创建并发工作流
- 自动聚合:所有代理的结果都会自动收集到最终结果中
-
事件流式处理:通过实时监视代理进度
AgentRunUpdateEvent - 多样化观点:每个代理都为同一问题带来了独特的专业知识
代理是可以处理任务的专用实体。 在这里,我们定义了三个代理:研究专家、营销专家和法律专家。
from agent_framework.azure import AzureChatClient
# 1) Create three domain agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
设置并发编排
类 ConcurrentBuilder 允许你构造工作流以并行运行多个代理。 将“代理”名单传递给参与者。
from agent_framework import ConcurrentBuilder
# 2) Build a concurrent workflow
# Participants are either Agents (type of AgentProtocol) or Executors
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
运行并发工作流并收集结果
from agent_framework import ChatMessage, WorkflowCompletedEvent
# 3) Run with a single prompt, stream progress, and pretty-print the final combined messages
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Aggregated Conversation (messages) =====")
messages: list[ChatMessage] | Any = completion.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name if msg.author_name else "user"
print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
示例输出
Sample Output:
===== Final Aggregated Conversation (messages) =====
------------------------------------------------------------
01 [user]:
We are launching a new budget-friendly electric bike for urban commuters.
------------------------------------------------------------
02 [researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
- **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
- **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
and low-maintenance components.
**Opportunities:**
- **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
operation, and cost savings vs. public transit/car ownership.
...
------------------------------------------------------------
03 [marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
**Target Messaging:**
*For Young Professionals:*
...
------------------------------------------------------------
04 [legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
- Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
**2. Product Safety**
- Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
高级:自定义代理执行程序
并发编排支持使用其他逻辑包装代理的自定义执行程序。 当你需要更好地控制代理的初始化方式及其处理请求的方式时,这非常有用:
定义自定义代理执行程序
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
ChatAgent,
Executor,
WorkflowContext,
handler,
)
class ResearcherExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "researcher"):
agent = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "marketer"):
agent = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
使用自定义执行程序生成工作流
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
高级:自定义聚合器
默认情况下,并发业务流程将所有代理响应聚合到消息列表中。 您可以使用以特定方式处理结果的自定义聚合器覆盖此行为:
定义自定义聚合器
# Define a custom aggregator callback that uses the chat client to summarize
async def summarize_results(results: list[Any]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_run_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
system_msg = ChatMessage(
Role.SYSTEM,
text=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
)
user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections))
response = await chat_client.get_response([system_msg, user_msg])
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
使用自定义聚合器生成工作流
workflow = (
ConcurrentBuilder()
.participants([researcher, marketer, legal])
.with_aggregator(summarize_results)
.build()
)
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Consolidated Output =====")
print(completion.data)
使用自定义聚合器的示例输出
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
关键概念
- 并行执行:所有代理同时和独立地处理任务
- 结果聚合:收集结果,可由默认聚合器或自定义聚合器处理
- 多样化观点:每个代理都为同一问题带来了独特的专业知识
- 灵活参与者:可以直接使用代理,也可以将它们包装在自定义执行器中
- 自定义处理:覆盖默认聚合器,以符合特定领域的方式合成结果