Magentic 编排是根据 AutoGen 发明的 Magentic-One 系统设计的。 它是一种灵活的常规用途多代理模式,专为需要动态协作的复杂开放式任务而设计。 在此模式中,Magentic 管理器协调一个专门代理团队,并根据不断变化的上下文、任务进度和代理能力选择哪个代理应在接下来的步骤中采取行动。
Magentic 经理维护共享上下文、跟踪进度并实时调整工作流。 这使系统能够分解复杂的问题、委托子任务,并通过代理协作迭代优化解决方案。 编排尤其适合于在解决方案路径事先未知的场景中,可能需要多次推理、研究和计算的情况。
学习内容
- 如何设置 Magentic 管理器以协调多个专用代理
- 如何配置回调以进行流式处理和事件处理
- 如何实施人机交互方案评审
- 如何跟踪代理协作及在复杂任务中的进展
定义专用代理
即将推出。。。
在 Magentic 编排中,您定义的专用代理可以由管理器根据任务要求进行动态选择。
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches
chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
设置事件回调
顺磁编排提供丰富的事件回调来实时监控工作流进度。
from agent_framework import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticCallbackEvent,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
)
# Unified callback for all events
async def on_event(event: MagenticCallbackEvent) -> None:
if isinstance(event, MagenticOrchestratorMessageEvent):
# Manager's planning and coordination messages
print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")
elif isinstance(event, MagenticAgentDeltaEvent):
# Streaming tokens from agents
print(event.text, end="", flush=True)
elif isinstance(event, MagenticAgentMessageEvent):
# Complete agent responses
msg = event.message
if msg is not None:
response_text = (msg.text or "").replace("\n", " ")
print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")
elif isinstance(event, MagenticFinalResultEvent):
# Final synthesized result
print("\n" + "=" * 50)
print("FINAL RESULT:")
print("=" * 50)
if event.message is not None:
print(event.message.text)
print("=" * 50)
生成 磁性 工作流
使用 MagenticBuilder 配置工作流,并使用标准管理器。
from agent_framework import MagenticBuilder, MagenticCallbackMode
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=OpenAIChatClient(),
max_round_count=10, # Maximum collaboration rounds
max_stall_count=3, # Maximum rounds without progress
max_reset_count=2, # Maximum plan resets allowed
)
.build()
)
运行工作流
执行需要多个代理协同工作的复杂任务:
from agent_framework import WorkflowCompletedEvent
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
completion_event = None
async for event in workflow.run_stream(task):
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
if completion_event is not None:
data = getattr(completion_event, "data", None)
preview = getattr(data, "text", None) or (str(data) if data is not None else "")
print(f"Workflow completed with result:\n\n{preview}")
高级:人机循环计划评审
在执行之前启用对经理计划的人工评审和审批:
配置计划评审
from agent_framework import (
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
RequestInfoEvent,
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=OpenAIChatClient(),
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.with_plan_review() # Enable plan review
.build()
)
处理审核计划请求
completion_event: WorkflowCompletedEvent | None = None
pending_request: RequestInfoEvent | None = None
while True:
# Run until completion or review request
if pending_request is None:
async for event in workflow.run_stream(task):
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
pending_request = event
review_req = cast(MagenticPlanReviewRequest, event.data)
if review_req.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n")
# Check if completed
if completion_event is not None:
break
# Respond to plan review
if pending_request is not None:
# Collect human decision (approve/reject/modify)
# For demo, we auto-approve:
reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)
# Or modify the plan:
# reply = MagenticPlanReviewReply(
# decision=MagenticPlanReviewDecision.APPROVE,
# edited_plan="Modified plan text here..."
# )
async for event in workflow.send_responses_streaming({pending_request.request_id: reply}):
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
elif isinstance(event, RequestInfoEvent):
# Another review cycle if needed
pending_request = event
else:
pending_request = None
关键概念
- 动态协调:Magentic 管理器根据不断发展的上下文动态选择哪个代理应接下来执行动作
- 迭代优化:系统可以分解复杂的问题,并通过多个轮迭代优化解决方案
- 进度跟踪:内置机制,用于检测停止并根据需要重置计划
- 灵活协作:可以按经理确定的任何顺序多次调用代理
- 人工监督:可选的人工参与评审允许手动干预和计划修改
工作流执行过程
Magentic 业务流程遵循以下执行模式:
- 规划阶段:经理分析任务并创建初始计划
- 代理选择:管理器为每个子任务选择最合适的代理
- 执行:所选代理执行其部分任务
- 进度评估:经理评估进度并更新计划
- 迭代:步骤 2-4 重复,直到任务完成或达到限制
- 最终合成:管理器将所有代理输出合成到最终结果中
错误处理
添加错误处理,使工作流可靠:
def on_exception(exception: Exception) -> None:
print(f"Exception occurred: {exception}")
logger.exception("Workflow exception", exc_info=exception)
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.on_exception(on_exception)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=OpenAIChatClient(),
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
完整的示例
下面是将所有概念汇集在一起的完整示例:
import asyncio
import logging
from typing import cast
from agent_framework import (
ChatAgent,
HostedCodeInterpreterTool,
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
WorkflowCompletedEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def main() -> None:
# Define specialized agents
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional "
"computation or quantitative analysis."
),
chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
# Unified callback for all events
async def on_event(event: MagenticCallbackEvent) -> None:
nonlocal last_stream_agent_id, stream_line_open
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")
elif isinstance(event, MagenticAgentDeltaEvent):
if last_stream_agent_id != event.agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True)
last_stream_agent_id = event.agent_id
stream_line_open = True
print(event.text, end="", flush=True)
elif isinstance(event, MagenticAgentMessageEvent):
if stream_line_open:
print(" (final)")
stream_line_open = False
print()
msg = event.message
if msg is not None:
response_text = (msg.text or "").replace("\n", " ")
print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")
elif isinstance(event, MagenticFinalResultEvent):
print("\n" + "=" * 50)
print("FINAL RESULT:")
print("=" * 50)
if event.message is not None:
print(event.message.text)
print("=" * 50)
# Build the workflow
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=OpenAIChatClient(),
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Define the task
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
# Run the workflow
try:
completion_event = None
async for event in workflow.run_stream(task):
print(f"Event: {event}")
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
if completion_event is not None:
data = getattr(completion_event, "data", None)
preview = getattr(data, "text", None) or (str(data) if data is not None else "")
print(f"Workflow completed with result:\n\n{preview}")
except Exception as e:
print(f"Workflow execution failed: {e}")
logger.exception("Workflow exception", exc_info=e)
if __name__ == "__main__":
asyncio.run(main())
配置选项
管理器参数
-
max_round_count:最大协作轮数(默认值:10) -
max_stall_count:重置前未取得进展的最大循环次数(默认值:3) -
max_reset_count:允许的计划重置的最大数量(默认值:2)
回调模式
-
MagenticCallbackMode.STREAMING:接收令牌增量更新 -
MagenticCallbackMode.COMPLETE:仅接收完整消息
计划审查决策
-
APPROVE:接受计划原样 -
REJECT:拒绝并请求新计划 -
APPROVEwithedited_plan: 接受带有修改
示例输出
即将推出。。。