Dela via


Magentic - Orkestreringar för arbetsflöden i Microsoft Agent Framework

Magentisk orkestrering är utformad baserat på Magentic-One-systemet som uppfunnits av AutoGen. Det är ett flexibelt, generellt mönster för flera agenter som är utformat för komplexa, öppna uppgifter som kräver dynamiskt samarbete. I det här mönstret samordnar en dedikerad Magentic Manager ett team med specialiserade agenter och väljer vilken agent som ska agera härnäst baserat på den föränderliga kontexten, aktivitetsstatusen och agentfunktionerna.

Magentic Manager har en delad kontext, spårar förloppet och anpassar arbetsflödet i realtid. På så sätt kan systemet dela upp komplexa problem, delegera underaktiviteter och iterativt förfina lösningar genom agentsamarbete. Orkestreringen passar särskilt bra för scenarier där lösningsvägen inte är känd i förväg och kan kräva flera omgångar av resonemang, forskning och beräkning.

Magentisk orkestrering

Vad du ska lära dig

  • Så här konfigurerar du en magentisk chef för att samordna flera specialiserade agenter
  • Så här konfigurerar du återanrop för strömning och händelsehantering
  • Så här implementerar du en human-in-the-loop-översyn av planer
  • Så här spårar du agentsamarbete och framsteg genom komplexa uppgifter

Definiera dina specialiserade agenter

Kommer snart...

I Magentisk orkestrering definierar du specialiserade agenter som chefen dynamiskt kan välja baserat på uppgiftskrav:

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

Konfigurera händelseåteranrop

Magnetisk orkestrering ger omfattande händelseåteranrop för att övervaka arbetsprocessens förlopp i realtid.

from agent_framework import (
    MagenticAgentDeltaEvent,
    MagenticAgentMessageEvent,
    MagenticCallbackEvent,
    MagenticFinalResultEvent,
    MagenticOrchestratorMessageEvent,
)

# Unified callback for all events
async def on_event(event: MagenticCallbackEvent) -> None:
    if isinstance(event, MagenticOrchestratorMessageEvent):
        # Manager's planning and coordination messages
        print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")

    elif isinstance(event, MagenticAgentDeltaEvent):
        # Streaming tokens from agents
        print(event.text, end="", flush=True)

    elif isinstance(event, MagenticAgentMessageEvent):
        # Complete agent responses
        msg = event.message
        if msg is not None:
            response_text = (msg.text or "").replace("\n", " ")
            print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")

    elif isinstance(event, MagenticFinalResultEvent):
        # Final synthesized result
        print("\n" + "=" * 50)
        print("FINAL RESULT:")
        print("=" * 50)
        if event.message is not None:
            print(event.message.text)
        print("=" * 50)

Skapa det magentiska arbetsflödet

Använd MagenticBuilder för att konfigurera arbetsflödet med en standardhanterare:

from agent_framework import MagenticBuilder, MagenticCallbackMode

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
    .with_standard_manager(
        chat_client=OpenAIChatClient(),
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

Kör arbetsflödet

Kör en komplex uppgift som kräver att flera agenter arbetar tillsammans:

from agent_framework import WorkflowCompletedEvent

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

completion_event = None
async for event in workflow.run_stream(task):
    if isinstance(event, WorkflowCompletedEvent):
        completion_event = event

if completion_event is not None:
    data = getattr(completion_event, "data", None)
    preview = getattr(data, "text", None) or (str(data) if data is not None else "")
    print(f"Workflow completed with result:\n\n{preview}")

Avancerat: Granskning av planering för Människan i loopen

Aktivera mänsklig granskning och godkännande av chefens plan före genomförande.

Konfigurera plangranskning

from agent_framework import (
    MagenticPlanReviewDecision,
    MagenticPlanReviewReply,
    MagenticPlanReviewRequest,
    RequestInfoEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
    .with_standard_manager(
        chat_client=OpenAIChatClient(),
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

Hantera begäranden om plangranskning

completion_event: WorkflowCompletedEvent | None = None
pending_request: RequestInfoEvent | None = None

while True:
    # Run until completion or review request
    if pending_request is None:
        async for event in workflow.run_stream(task):
            if isinstance(event, WorkflowCompletedEvent):
                completion_event = event

            if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
                pending_request = event
                review_req = cast(MagenticPlanReviewRequest, event.data)
                if review_req.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n")

    # Check if completed
    if completion_event is not None:
        break

    # Respond to plan review
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)

        # Or modify the plan:
        # reply = MagenticPlanReviewReply(
        #     decision=MagenticPlanReviewDecision.APPROVE,
        #     edited_plan="Modified plan text here..."
        # )

        async for event in workflow.send_responses_streaming({pending_request.request_id: reply}):
            if isinstance(event, WorkflowCompletedEvent):
                completion_event = event
            elif isinstance(event, RequestInfoEvent):
                # Another review cycle if needed
                pending_request = event
            else:
                pending_request = None

Viktiga begrepp

  • Dynamisk samordning: Den magentiska chefen väljer dynamiskt vilken agent som ska agera härnäst baserat på den föränderliga kontexten
  • Iterativ förfining: Systemet kan bryta ned komplexa problem och iterativt förfina lösningar genom flera rundor
  • Förloppsspårning: Inbyggda mekanismer för att identifiera förseningar och återställa planen om det behövs
  • Flexibelt samarbete: Agenter kan anropas flera gånger i valfri ordning enligt chefens beslut
  • Mänsklig tillsyn: Valfri mänsklig medverkan möjliggör manuell intervention och ändring av plan

Arbetsflödeskörning

Den magnetiska orkestreringen följer detta utförandemönster:

  1. Planeringsfas: Chefen analyserar uppgiften och skapar en första plan
  2. Val av agent: Chefen väljer den lämpligaste agenten för varje underavdelning
  3. Körning: Den valda agenten kör sin del av uppgiften
  4. Förloppsbedömning: Chefen utvärderar förloppet och uppdaterar planen
  5. Iteration: Steg 2–4 upprepas tills uppgiften har slutförts eller gränserna har nåtts
  6. Slutlig syntes: Chefen syntetiserar alla agentutdata till ett slutligt resultat

Felhantering

Lägg till felhantering för att göra arbetsflödet robust:

def on_exception(exception: Exception) -> None:
    print(f"Exception occurred: {exception}")
    logger.exception("Workflow exception", exc_info=exception)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .on_exception(on_exception)
    .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
    .with_standard_manager(
        chat_client=OpenAIChatClient(),
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

Fullständigt exempel

Här är ett fullständigt exempel som sammanför alla begrepp:

import asyncio
import logging
from typing import cast

from agent_framework import (
    ChatAgent,
    HostedCodeInterpreterTool,
    MagenticAgentDeltaEvent,
    MagenticAgentMessageEvent,
    MagenticBuilder,
    MagenticCallbackEvent,
    MagenticCallbackMode,
    MagenticFinalResultEvent,
    MagenticOrchestratorMessageEvent,
    WorkflowCompletedEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # State for streaming callback
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Unified callback for all events
    async def on_event(event: MagenticCallbackEvent) -> None:
        nonlocal last_stream_agent_id, stream_line_open

        if isinstance(event, MagenticOrchestratorMessageEvent):
            print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")

        elif isinstance(event, MagenticAgentDeltaEvent):
            if last_stream_agent_id != event.agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True)
                last_stream_agent_id = event.agent_id
                stream_line_open = True
            print(event.text, end="", flush=True)

        elif isinstance(event, MagenticAgentMessageEvent):
            if stream_line_open:
                print(" (final)")
                stream_line_open = False
                print()
            msg = event.message
            if msg is not None:
                response_text = (msg.text or "").replace("\n", " ")
                print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")

        elif isinstance(event, MagenticFinalResultEvent):
            print("\n" + "=" * 50)
            print("FINAL RESULT:")
            print("=" * 50)
            if event.message is not None:
                print(event.message.text)
            print("=" * 50)

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
        .with_standard_manager(
            chat_client=OpenAIChatClient(),
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        completion_event = None
        async for event in workflow.run_stream(task):
            print(f"Event: {event}")

            if isinstance(event, WorkflowCompletedEvent):
                completion_event = event

        if completion_event is not None:
            data = getattr(completion_event, "data", None)
            preview = getattr(data, "text", None) or (str(data) if data is not None else "")
            print(f"Workflow completed with result:\n\n{preview}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

Konfigurationsalternativ

Manager-Parametrar

  • max_round_count: Maximalt antal samarbetsrundor (standard: 10)
  • max_stall_count: Maximalt antal rundor utan förlopp före återställning (standard: 3)
  • max_reset_count: Maximalt antal tillåtna planåterställningar (standard: 2)

Återanropslägen

  • MagenticCallbackMode.STREAMING: Ta emot inkrementella tokenuppdateringar
  • MagenticCallbackMode.COMPLETE: Ta bara emot fullständiga meddelanden

Beslut om plangranskning

  • APPROVE: Godkänn planen as-is
  • REJECT: Avvisa och begära en ny plan
  • APPROVE med edited_plan: Acceptera med ändringar

Exempelutdata

Kommer snart...

Nästa steg