Microsoft代理框架工作流协调和编排 - Magentic

Magentic 编排是根据 AutoGen 发明的 Magentic-One 系统设计的。 它是一种灵活的常规用途多代理模式,专为需要动态协作的复杂开放式任务而设计。 在此模式中,Magentic 管理器协调一个专门代理团队,并根据不断变化的上下文、任务进度和代理能力选择哪个代理应在接下来的步骤中采取行动。

Magentic 经理维护共享上下文、跟踪进度并实时调整工作流。 这使系统能够分解复杂的问题、委托子任务,并通过代理协作迭代优化解决方案。 编排尤其适合于在解决方案路径事先未知的场景中,可能需要多次推理、研究和计算的情况。

磁性调度

学习内容

  • 如何设置 Magentic 管理器以协调多个专用代理
  • 如何配置回调以进行流式处理和事件处理
  • 如何实施人机交互方案评审
  • 如何跟踪代理协作及在复杂任务中的进展

定义专用代理

即将推出。。。

在 Magentic 编排中,您定义的专用代理可以由管理器根据任务要求进行动态选择。

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

设置事件回调

顺磁编排提供丰富的事件回调来实时监控工作流进度。

from agent_framework import (
    MagenticAgentDeltaEvent,
    MagenticAgentMessageEvent,
    MagenticCallbackEvent,
    MagenticFinalResultEvent,
    MagenticOrchestratorMessageEvent,
)

# Unified callback for all events
async def on_event(event: MagenticCallbackEvent) -> None:
    if isinstance(event, MagenticOrchestratorMessageEvent):
        # Manager's planning and coordination messages
        print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")

    elif isinstance(event, MagenticAgentDeltaEvent):
        # Streaming tokens from agents
        print(event.text, end="", flush=True)

    elif isinstance(event, MagenticAgentMessageEvent):
        # Complete agent responses
        msg = event.message
        if msg is not None:
            response_text = (msg.text or "").replace("\n", " ")
            print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")

    elif isinstance(event, MagenticFinalResultEvent):
        # Final synthesized result
        print("\n" + "=" * 50)
        print("FINAL RESULT:")
        print("=" * 50)
        if event.message is not None:
            print(event.message.text)
        print("=" * 50)

生成 磁性 工作流

使用 MagenticBuilder 配置工作流,并使用标准管理器。

from agent_framework import MagenticBuilder, MagenticCallbackMode

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
    .with_standard_manager(
        chat_client=OpenAIChatClient(),
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

运行工作流

执行需要多个代理协同工作的复杂任务:

from agent_framework import WorkflowCompletedEvent

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

completion_event = None
async for event in workflow.run_stream(task):
    if isinstance(event, WorkflowCompletedEvent):
        completion_event = event

if completion_event is not None:
    data = getattr(completion_event, "data", None)
    preview = getattr(data, "text", None) or (str(data) if data is not None else "")
    print(f"Workflow completed with result:\n\n{preview}")

高级:人机循环计划评审

在执行之前启用对经理计划的人工评审和审批:

配置计划评审

from agent_framework import (
    MagenticPlanReviewDecision,
    MagenticPlanReviewReply,
    MagenticPlanReviewRequest,
    RequestInfoEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
    .with_standard_manager(
        chat_client=OpenAIChatClient(),
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

处理审核计划请求

completion_event: WorkflowCompletedEvent | None = None
pending_request: RequestInfoEvent | None = None

while True:
    # Run until completion or review request
    if pending_request is None:
        async for event in workflow.run_stream(task):
            if isinstance(event, WorkflowCompletedEvent):
                completion_event = event

            if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
                pending_request = event
                review_req = cast(MagenticPlanReviewRequest, event.data)
                if review_req.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n")

    # Check if completed
    if completion_event is not None:
        break

    # Respond to plan review
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)

        # Or modify the plan:
        # reply = MagenticPlanReviewReply(
        #     decision=MagenticPlanReviewDecision.APPROVE,
        #     edited_plan="Modified plan text here..."
        # )

        async for event in workflow.send_responses_streaming({pending_request.request_id: reply}):
            if isinstance(event, WorkflowCompletedEvent):
                completion_event = event
            elif isinstance(event, RequestInfoEvent):
                # Another review cycle if needed
                pending_request = event
            else:
                pending_request = None

关键概念

  • 动态协调:Magentic 管理器根据不断发展的上下文动态选择哪个代理应接下来执行动作
  • 迭代优化:系统可以分解复杂的问题,并通过多个轮迭代优化解决方案
  • 进度跟踪:内置机制,用于检测停止并根据需要重置计划
  • 灵活协作:可以按经理确定的任何顺序多次调用代理
  • 人工监督:可选的人工参与评审允许手动干预和计划修改

工作流执行过程

Magentic 业务流程遵循以下执行模式:

  1. 规划阶段:经理分析任务并创建初始计划
  2. 代理选择:管理器为每个子任务选择最合适的代理
  3. 执行:所选代理执行其部分任务
  4. 进度评估:经理评估进度并更新计划
  5. 迭代:步骤 2-4 重复,直到任务完成或达到限制
  6. 最终合成:管理器将所有代理输出合成到最终结果中

错误处理

添加错误处理,使工作流可靠:

def on_exception(exception: Exception) -> None:
    print(f"Exception occurred: {exception}")
    logger.exception("Workflow exception", exc_info=exception)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .on_exception(on_exception)
    .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
    .with_standard_manager(
        chat_client=OpenAIChatClient(),
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

完整的示例

下面是将所有概念汇集在一起的完整示例:

import asyncio
import logging
from typing import cast

from agent_framework import (
    ChatAgent,
    HostedCodeInterpreterTool,
    MagenticAgentDeltaEvent,
    MagenticAgentMessageEvent,
    MagenticBuilder,
    MagenticCallbackEvent,
    MagenticCallbackMode,
    MagenticFinalResultEvent,
    MagenticOrchestratorMessageEvent,
    WorkflowCompletedEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # State for streaming callback
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Unified callback for all events
    async def on_event(event: MagenticCallbackEvent) -> None:
        nonlocal last_stream_agent_id, stream_line_open

        if isinstance(event, MagenticOrchestratorMessageEvent):
            print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")

        elif isinstance(event, MagenticAgentDeltaEvent):
            if last_stream_agent_id != event.agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True)
                last_stream_agent_id = event.agent_id
                stream_line_open = True
            print(event.text, end="", flush=True)

        elif isinstance(event, MagenticAgentMessageEvent):
            if stream_line_open:
                print(" (final)")
                stream_line_open = False
                print()
            msg = event.message
            if msg is not None:
                response_text = (msg.text or "").replace("\n", " ")
                print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")

        elif isinstance(event, MagenticFinalResultEvent):
            print("\n" + "=" * 50)
            print("FINAL RESULT:")
            print("=" * 50)
            if event.message is not None:
                print(event.message.text)
            print("=" * 50)

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
        .with_standard_manager(
            chat_client=OpenAIChatClient(),
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        completion_event = None
        async for event in workflow.run_stream(task):
            print(f"Event: {event}")

            if isinstance(event, WorkflowCompletedEvent):
                completion_event = event

        if completion_event is not None:
            data = getattr(completion_event, "data", None)
            preview = getattr(data, "text", None) or (str(data) if data is not None else "")
            print(f"Workflow completed with result:\n\n{preview}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

配置选项

管理器参数

  • max_round_count:最大协作轮数(默认值:10)
  • max_stall_count:重置前未取得进展的最大循环次数(默认值:3)
  • max_reset_count:允许的计划重置的最大数量(默认值:2)

回调模式

  • MagenticCallbackMode.STREAMING:接收令牌增量更新
  • MagenticCallbackMode.COMPLETE:仅接收完整消息

计划审查决策

  • APPROVE:接受计划原样
  • REJECT:拒绝并请求新计划
  • APPROVE with edited_plan: 接受带有修改

示例输出

即将推出。。。

后续步骤