Dela via


Migreringsguide för AutoGen till Microsoft Agent Framework

En omfattande guide för migrering från AutoGen till Microsoft Agent Framework Python SDK.

Innehållsförteckning

Bakgrund

AutoGen är ett ramverk för att skapa AI-agenter och system med flera agenter med hjälp av stora språkmodeller (LLM). Det startade som ett forskningsprojekt på Microsoft Research och banade väg för flera begrepp inom orkestrering med flera agenter, till exempel GroupChat och händelsedriven agentkörning. Projektet har varit ett givande samarbete med communityn med öppen källkod och många viktiga funktioner kom från externa deltagare.

Microsoft Agent Framework är ett nytt SDK med flera språk för att skapa AI-agenter och arbetsflöden med hjälp av LLM:er. Det representerar en betydande utveckling av de idéer som banat väg för AutoGen och innehåller lärdomar från verklig användning. Den har utvecklats av kärnteamen AutoGen och Semantic Kernel på Microsoft och är utformad för att vara en ny grund för att skapa AI-program framöver.

Den här guiden beskriver en praktisk migreringsväg: den börjar med att ta upp vad som förblir detsamma och vilka förändringar som sker snabbt. Sedan omfattar den modellklientkonfiguration, funktioner för en agent och slutligen orkestrering med flera agenter med konkret kod sida vid sida. Längs vägen hjälper länkar till körbara exempel på Agent Framework-lagringsplatsen dig att verifiera varje steg.

Viktiga likheter och skillnader

Vad förblir detsamma

Grunderna är bekanta. Du skapar fortfarande agenter runt en modellklient, ger instruktioner och bifogar verktyg. Båda biblioteken stöder verktyg i funktionsstil, tokenströmning, multimodalt innehåll och asynkron I/O.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Viktiga skillnader

  1. Orkestreringsformat: AutoGen parar ihop en händelsedriven kärna med en hög nivå Team. Agent Framework fokuserar på ett skrivet diagrambaserat Workflow som dirigerar data längs kanter och aktiverar exekutorer när indata är klara.

  2. Verktyg: AutoGen omsluter funktioner med FunctionTool. Agent Framework använder @ai_function, härleder scheman automatiskt och lägger till värdbaserade verktyg som en kodtolkare och webbsökning.

  3. Agentbeteende: AssistantAgent är en enda tur om du inte ökar max_tool_iterations. ChatAgent är multi-turn som standard och fortsätter att anropa verktyg tills det kan returnera ett slutligt svar.

  4. Körning: AutoGen erbjuder inbäddade och experimentella distribuerade körningar. Agent Framework fokuserar på en processsammansättning idag; distribuerad körning planeras.

Skapa och konfigurera modellklient

Båda ramverken tillhandahåller modellklienter för större AI-leverantörer, med liknande men inte identiska API:er.

Egenskap AutoGen Agentramverk
OpenAI-klient OpenAIChatCompletionClient OpenAIChatClient
OpenAI-svarsklient ❌ Inte tillgängligt OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Azure OpenAI-svar ❌ Inte tillgängligt AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Människoorienterad AnthropicChatCompletionClient 🚧 Planerad
Ollama OllamaChatCompletionClient 🚧 Planerad
Cache ChatCompletionCache omslag 🚧 Planerad

AutoGen-modellklienter

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Detaljerade exempel finns i:

Api-stöd för svar (exklusivt agentramverk)

Agent Framework och ger särskilt stöd för resonemangsmodeller AzureOpenAIResponsesClientOpenAIResponsesClient och strukturerade svar som inte är tillgängliga i AutoGen:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

För svars-API-exempel, se:

Single-Agent funktionsmappning

Det här avsnittet mappar funktioner med en agent mellan AutoGen och Agent Framework. Med en klient på plats skapar du en agent, bifogar verktyg och väljer mellan körning av icke-direktuppspelning och strömning.

Skapande och körning av grundläggande agent

När du har konfigurerat en modellklient är nästa steg att skapa agenter. Båda ramverken tillhandahåller liknande agentabstraktioner, men med olika standardbeteenden och konfigurationsalternativ.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Viktiga skillnader:

  • Standardbeteende: ChatAgent itererar automatiskt via verktygsanrop, samtidigt AssistantAgent som explicit inställning krävs max_tool_iterations
  • KörningskonfigurationChatAgent.run(): tools accepterar och tool_choice parametrar för anpassning per anrop
  • Fabriksmetoder: Agent Framework tillhandahåller praktiska fabriksmetoder direkt från chattklienter
  • Tillståndshantering: ChatAgent är tillståndslös och upprätthåller inte konversationshistorik mellan anrop, till skillnad från AssistantAgent vilket upprätthåller konversationshistoriken som en del av dess tillstånd

Hantera konversationstillstånd med AgentThread

Om du vill fortsätta konversationer med ChatAgentanvänder du AgentThread för att hantera konversationshistorik:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Tillståndslös som standard: snabbdemo

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Exempel på trådhantering finns i:

OpenAI Assistant Agent Equivalence

Båda ramverken tillhandahåller OpenAI Assistant API-integrering:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Exempel på OpenAI Assistant finns i:

Stöd för direktuppspelning

Båda ramverken strömmar token i realtid – från klienter och från agenter – för att hålla UIs responsiva.

AutoGen-direktuppspelning

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Agent Framework-direktuppspelning

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Tips: I Agent Framework ger både klienter och agenter samma uppdateringsform. du kan läsa chunk.text i båda fallen.

Meddelandetyper och skapande

Att förstå hur meddelanden fungerar är avgörande för effektiv agentkommunikation. Båda ramverken tillhandahåller olika metoder för att skapa och hantera meddelanden, med AutoGen med separata meddelandeklasser och Agent Framework med hjälp av ett enhetligt meddelandesystem.

Autogen meddelandetyper

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Meddelandetyper för Agent Framework

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Viktiga skillnader:

  • AutoGen använder separata meddelandeklasser (TextMessage, MultiModalMessage) med ett source fält
  • Agent Framework använder ett enhetligt ChatMessage innehåll med inskrivna innehållsobjekt och ett role fält
  • Agent Framework-meddelanden använder Role uppräkning (ANVÄNDARE, ASSISTENT, SYSTEM, VERKTYG) i stället för strängkällor

Skapa och integrera verktyg

Verktyg utökar agentfunktioner utöver textgenerering. Ramverken använder olika metoder för att skapa verktyg, där Agent Framework tillhandahåller mer automatiserad schemagenerering.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Agent Framework @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Detaljerade exempel finns i:

Värdbaserade verktyg (exklusivt agentramverk)

Agent Framework tillhandahåller värdbaserade verktyg som inte är tillgängliga i AutoGen:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Detaljerade exempel finns i:

Krav och varningar:

  • Värdbaserade verktyg är endast tillgängliga på modeller/konton som stöder dem. Kontrollera berättiganden och modellstöd för din leverantör innan du aktiverar dessa verktyg.
  • Konfigurationen skiljer sig åt beroende på leverantör. följ kraven i varje exempel för konfiguration och behörigheter.
  • Alla modeller stöder inte alla värdbaserade verktyg (till exempel webbsökning kontra kodtolk). Välj en kompatibel modell i din miljö.

Anmärkning

AutoGen stöder verktyg för lokal kodkörning, men den här funktionen planeras för framtida Agent Framework-versioner.

Nyckelskillnad: Agent Framework hanterar verktygs iteration automatiskt på agentnivå. Till skillnad från AutoGens max_tool_iterations parameter fortsätter Agent Framework-agenter att köra verktyg tills de är klara som standard, med inbyggda säkerhetsmekanismer för att förhindra oändliga loopar.

Stöd för MCP-server

För avancerad verktygsintegrering stöder båda ramverken McP (Model Context Protocol), vilket gör det möjligt för agenter att interagera med externa tjänster och datakällor. Agent Framework ger mer omfattande inbyggt stöd.

Stöd för AutoGen MCP

AutoGen har grundläggande MCP-stöd via tillägg (specifik implementeringsinformation varierar beroende på version).

Agent Framework MCP Support

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

McP-exempel finns i:

Mönster för agent som ett verktyg

Ett kraftfullt mönster är att använda själva agenterna som verktyg, vilket möjliggör hierarkiska agentarkitekturer. Båda ramverken stöder det här mönstret med olika implementeringar.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Explicit migreringsanteckning: I AutoGen anger du parallel_tool_calls=False på koordinatorns modellklient när du omsluter agenter som verktyg för att undvika samtidighetsproblem när du anropar samma agentinstans. I Agent Framework as_tool() kräver inte inaktivering av parallella verktygsanrop eftersom agenter är tillståndslösa som standard.

Mellanprogram (Agent Framework-funktion)

Agent Framework introducerar mellanprogramsfunktioner som AutoGen saknar. Mellanprogram möjliggör kraftfulla övergripande problem som loggning, säkerhet och prestandaövervakning.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Fördelar:

  • Säkerhet: Validering av indata och innehållsfiltrering
  • Observerbarhet: Loggning, mått och spårning
  • Prestanda: Cachelagring och hastighetsbegränsning
  • Felhantering: Graciös nedbrytnings- och återförsökslogik

Detaljerade exempel på mellanprogram finns i:

Skräddarsydda agenter

Ibland vill du inte ha någon modellbaserad agent alls – du vill ha en deterministisk eller API-baserad agent med anpassad logik. Båda ramverken har stöd för att skapa anpassade agenter, men mönstren skiljer sig åt.

AutoGen: Subclass BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implementera on_messages(...) och returnera ett Response med ett chattmeddelande.
  • Du kan också implementera on_reset(...) för att rensa internt tillstånd mellan körningar.

Agent Framework: Utöka BaseAgent (trådmedveten)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThread upprätthåller konversationstillståndet externt. använd agent.get_new_thread() och skicka den till run/run_stream.
  • Anropa self._notify_thread_of_new_messages(thread, input_messages, response_messages) så att tråden har båda sidor av utbytet.
  • Se det fullständiga exemplet: Anpassad agent

Nu ska vi titta på orkestrering med flera agenter – det område där ramverken skiljer sig mest åt.

Funktionsmappning för flera agenter

Översikt över programmeringsmodell

Programmeringsmodellerna för flera agenter representerar den viktigaste skillnaden mellan de två ramverken.

AutoGens metod med dubbla modeller

AutoGen innehåller två programmeringsmodeller:

  1. autogen-core: Låg nivå, händelsedriven programmering med RoutedAgent och meddelandeprenumerationer
  2. Team abstraktion: En övergripande, körningscentrerad modell som bygger på autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Utmaningar:

  • Lågnivåmodellen är för komplex för de flesta användare
  • Högnivåmodell kan bli begränsande för komplexa beteenden
  • Bryggning mellan de två modellerna ger implementeringskomplexitet

Agent Frameworks enhetliga arbetsflödesmodell

Agent Framework ger en enda Workflow abstraktion som kombinerar det bästa av båda metoderna:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Detaljerade arbetsflödesexempel finns i:

Fördelar:

  • Enhetlig modell: Enkel abstraktion för alla komplexitetsnivåer
  • Typsäkerhet: Starkt typinmatningar och utdata
  • Diagramvisualisering: Rensa dataflödesrepresentation
  • Flexibel sammansättning: Blanda agenter, funktioner och underarbetsflöden

Arbetsflöde jämfört med GraphFlow

Agent Frameworks Workflow abstraktion är inspirerad av AutoGens experimentella GraphFlow funktion, men representerar en betydande utveckling inom designfilosofi:

  • GraphFlow: Kontrollflöde baserat på var kanter är övergångar och meddelanden sänds till alla agenter. övergångar villkoras på innehåll för utsända meddelanden
  • Arbetsflöde: Dataflöde baserat på var meddelanden dirigeras via specifika kanter och utförare aktiveras av kanter, med stöd för samtidig körning.

Visuell översikt

Diagrammet nedan kontrasterar AutoGens graphflow för kontrollflöde (vänster) med Agent Frameworks dataflödesarbetsflöde (höger). GraphFlow modellerar agenter som noder med villkorsstyrda övergångar och sändningar. Körmodeller för arbetsflödesmodeller (agenter, funktioner eller underarbetsflöden) som är anslutna via inskrivna kanter. Den stöder också pauser och kontrollpunkter för begäran/svar.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

I praktiken:

  • GraphFlow använder agenter som noder och sänder meddelanden. kanter representerar villkorsstyrda övergångar.
  • Arbetsflödesvägar skrev meddelanden längs kanterna. Noder (exekutorer) kan vara agenter, rena funktioner eller underarbetsflöden.
  • Med begäran/svar kan ett arbetsflöde pausas för externa indata. kontrollpunkter bevarar förloppet och aktiverar återuppta.

Kodjämförelse

1) Sekventiell + villkorsstyrd
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Utrullning + koppling (ALLA vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Riktad routning (ingen sändning)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Vad du bör lägga märke till:

  • GraphFlow sänder meddelanden och använder villkorsstyrda övergångar. Kopplingsbeteendet konfigureras via målsidan activation och per kant activation_group/activation_condition (till exempel gruppera båda kanterna i join_d med activation_condition="any").
  • Arbetsflödet dirigerar data explicit; använd target_id för att välja underordnade körbara filer. Join behavior lives in the receiving executor (till exempel yield on first input vs wait for all) eller via orchestration builders/aggregators.
  • Körbara filer i arbetsflödet är i fritt format: omsluta en ChatAgent, en funktion eller ett underarbetsflöde och blanda dem i samma graf.

Viktiga skillnader

Tabellen nedan sammanfattar de grundläggande skillnaderna mellan AutoGens GraphFlow och Agent Frameworks arbetsflöde:

Aspekt AutoGen GraphFlow Agent Framework-arbetsflöde
Flödestyp Kontrollflöde (kanter är övergångar) Dataflöde (kanter dirigera meddelanden)
Nodtyper Endast agenter Agenter, funktioner, underarbetsflöden
Aktivering Meddelandesändning Edge-baserad aktivering
Typsäkerhet Limited Stark skrivning genomgående
Sammansättning Limited Mycket komposterbar

Kapslingsmönster

AutoGen Team Nesting

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Egenskaper för AutoGen-kapsling:

  • Kapslat team tar emot alla meddelanden från det yttre teamet
  • Kapslade teammeddelanden sänds till alla deltagare i det yttre teamet
  • Kontext för delat meddelande på alla nivåer

Kapsling av Agent Framework-arbetsflöde

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Egenskaper för Agent Framework-kapsling:

  • Isolerade indata/utdata via WorkflowExecutor
  • Inga meddelandesändningar – dataflöden via specifika anslutningar
  • Oberoende tillståndshantering för varje arbetsflödesnivå

Gruppchattmönster

Med gruppchattmönster kan flera agenter samarbeta om komplexa uppgifter. Så här översätts vanliga mönster mellan ramverk.

RoundRobinGroupChat-mönster

AutoGen-implementering:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implementering av Agent Framework:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Detaljerade orkestreringsexempel finns i:

För samtidiga körningsmönster tillhandahåller Agent Framework även:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Exempel på samtidig körning finns i:

MagenticOneGroupChat-mönster

AutoGen-implementering:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implementering av Agent Framework:

from agent_framework import (
    MagenticBuilder, MagenticCallbackMode, WorkflowOutputEvent,
    MagenticCallbackEvent, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent
)

# Assume we have researcher, coder, and coordinator_client from previous examples
async def on_event(event: MagenticCallbackEvent) -> None:
    if isinstance(event, MagenticOrchestratorMessageEvent):
        print(f"[ORCHESTRATOR]: {event.message.text}")
    elif isinstance(event, MagenticAgentDeltaEvent):
        print(f"[{event.agent_id}]: {event.text}", end="")

workflow = (MagenticBuilder()
           .participants(researcher=researcher, coder=coder)
           .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
           .with_standard_manager(
               chat_client=coordinator_client,
               max_round_count=20,
               max_stall_count=3,
               max_reset_count=2
           )
           .build())

# Example usage (would be in async context)
async def magentic_example():
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, WorkflowOutputEvent):
            final_result = event.data

Anpassningsalternativ för Agent Framework:

Det magentiska arbetsflödet innehåller omfattande anpassningsalternativ:

  • Konfiguration av chef: Anpassade orkestreringsmodeller och uppmaningar
  • Avrunda gränser: max_round_count, max_stall_count, max_reset_count
  • Återanrop till händelser: Direktuppspelning i realtid med detaljerad händelsefiltrering
  • Agentspecialisering: Anpassade instruktioner och verktyg per agent
  • Återanropslägen: STREAMING för realtidsuppdateringar eller BATCH för slutliga resultat
  • Planering av människa i loopen: Anpassade planeringsfunktioner för interaktiva arbetsflöden
# Advanced customization example with human-in-the-loop
from agent_framework.openai import OpenAIChatClient
from agent_framework import MagenticBuilder, MagenticCallbackMode, MagenticPlannerContext

# Assume we have researcher_agent, coder_agent, analyst_agent, detailed_event_handler
# and get_human_input function defined elsewhere

async def custom_planner(context: MagenticPlannerContext) -> str:
    """Custom planner with human input for critical decisions."""
    if context.round_count > 5:
        # Request human input for complex decisions
        return await get_human_input(f"Next action for: {context.current_state}")
    return "Continue with automated planning"

workflow = (MagenticBuilder()
           .participants(
               researcher=researcher_agent,
               coder=coder_agent,
               analyst=analyst_agent
           )
           .with_standard_manager(
               chat_client=OpenAIChatClient(model_id="gpt-5"),
               max_round_count=15,      # Limit total rounds
               max_stall_count=2,       # Prevent infinite loops
               max_reset_count=1,       # Allow one reset on failure
               orchestrator_prompt="Custom orchestration instructions..."
           )
           .with_planner(custom_planner)  # Human-in-the-loop planning
           .on_event(detailed_event_handler, mode=MagenticCallbackMode.STREAMING)
           .build())

Detaljerade magentiska exempel finns i:

Framtida mönster

Översikten för Agent Framework innehåller flera AutoGen-mönster som för närvarande är under utveckling:

  • Swarm-mönster: Handoff-baserad agentsamordning
  • SelectorGroupChat: LLM-driven talarval

Human-in-the-Loop med begärandesvar

En viktig ny funktion i Agent Framework är Workflow begreppet begäran och svar, vilket gör att arbetsflöden kan pausa körningen och vänta på externa indata innan de fortsätter. Den här funktionen finns inte i AutoGens Team abstraktion och möjliggör avancerade mönster för människor i loopen.

AutoGen-begränsningar

AutoGens abstraktion körs kontinuerligt när den har startats Team och tillhandahåller inte inbyggda mekanismer för att pausa körningen av mänskliga indata. Alla funktioner för människa i loopen kräver anpassade implementeringar utanför ramverket.

Agent Framework RequestInfoExecutor

Agent Framework tillhandahåller RequestInfoExecutor – en arbetsflödesbaserad brygga som pausar grafen vid en begäran om information, genererar en RequestInfoEvent med en skriven nyttolast och återupptar körningen först när programmet tillhandahåller en matchande RequestResponse.

from agent_framework import (
    RequestInfoExecutor, RequestInfoEvent, RequestInfoMessage,
    RequestResponse, WorkflowBuilder, WorkflowContext, executor
)
from dataclasses import dataclass
from typing_extensions import Never

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest(RequestInfoMessage):
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
@executor(id="reviewer")
async def approval_executor(
    agent_response: str,
    ctx: WorkflowContext[ApprovalRequest]
) -> None:
    # Request human input with structured data
    approval_request = ApprovalRequest(
        content=agent_response,
        agent_name="writer_agent"
    )
    await ctx.send_message(approval_request)

# Human feedback handler
@executor(id="processor")
async def process_approval(
    feedback: RequestResponse[ApprovalRequest, str],
    ctx: WorkflowContext[Never, str]
) -> None:
    decision = feedback.data.strip().lower()
    original_content = feedback.original_request.content

    if decision == "approved":
        await ctx.yield_output(f"APPROVED: {original_content}")
    else:
        await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
hitl_executor = RequestInfoExecutor(id="request_approval")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, approval_executor)
           .add_edge(approval_executor, hitl_executor)
           .add_edge(hitl_executor, process_approval)
           .set_start_executor(agent_executor)
           .build())

Köra human-in-the-Loop-arbetsflöden

Agent Framework tillhandahåller API:er för direktuppspelning för att hantera paus-återuppta-cykeln:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Exempel på arbetsflöden för människa i loopen finns i:

Kontrollpunkter och återuppta arbetsflöden

En annan viktig fördel med Agent Framework jämfört Workflow med AutoGens Team abstraktion är inbyggt stöd för kontrollpunkter och återupptagande av körning. Detta gör att arbetsflöden kan pausas, bevaras och återupptas senare från alla kontrollpunkter, vilket ger feltolerans och aktiverar långvariga eller asynkrona arbetsflöden.

AutoGen-begränsningar

AutoGens Team abstraktion ger inte inbyggda kontrollpunktsfunktioner. Beständighets- eller återställningsmekanismer måste implementeras externt, vilket ofta kräver komplex tillståndshantering och serialiseringslogik.

Kontrollpunkter för Agent Framework

Agent Framework tillhandahåller omfattande kontrollpunkter genom FileCheckpointStorage och metoden på with_checkpointing()WorkflowBuilder. Samla in kontrollpunkter:

  • Körtillstånd: Lokalt tillstånd för varje köre med hjälp av ctx.set_executor_state()
  • Delat tillstånd: Korsexekutortillstånd med hjälp av ctx.set_shared_state()
  • Meddelandeköer: Väntande meddelanden mellan utförare
  • Arbetsflödesposition: Aktuell körningsstatus och nästa steg
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Återuppta från kontrollpunkter

Agent Framework tillhandahåller API:er för att lista, inspektera och återuppta från specifika kontrollpunkter:

from agent_framework import (
    RequestInfoExecutor, FileCheckpointStorage, WorkflowBuilder,
    Executor, WorkflowContext, handler
)
from typing_extensions import Never

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = RequestInfoExecutor.checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
        print(f"  Shared state: {checkpoint.shared_state}")
        print(f"  Executor states: {list(checkpoint.executor_states.keys())}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream_from_checkpoint(
            chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Avancerade kontrollpunktsfunktioner

Kontrollpunkt med Integrering av människa i loopen:

Kontrollpunkter fungerar sömlöst med arbetsflöden för människor i loopen, vilket gör att arbetsflöden kan pausas för mänsklig indata och återupptas senare:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_responses_example():
    # Resume with pre-supplied human responses
    responses = {"request_id_123": "approved"}

    async for event in workflow.run_stream_from_checkpoint(
        checkpoint_id,
        checkpoint_storage=checkpoint_storage,
        responses=responses  # Pre-supply human responses
    ):
        print(f"Event: {event}")

Viktiga fördelar

Jämfört med AutoGen tillhandahåller Agent Framework kontrollpunkter:

  • Automatisk beständighet: Ingen manuell tillståndshantering krävs
  • Detaljerad återställning: Återuppta från valfri gräns för supersteg
  • Tillståndsisolering: Separat kör-lokalt och delat tillstånd
  • Integrering av människa i loopen: Sömlös paus-cv med mänskliga indata
  • Feltolerans: Robust återställning från fel eller avbrott

Praktiska exempel

Omfattande kontrollpunktsexempel finns i:


Observability

Både AutoGen och Agent Framework tillhandahåller observerbarhetsfunktioner, men med olika metoder och funktioner.

AutoGen-observabilitet

AutoGen har inbyggt stöd för OpenTelemetry med instrumentation för:

  • Körningsspårning: SingleThreadedAgentRuntime och GrpcWorkerAgentRuntime
  • Verktygskörning: BaseTool med execute_tool intervall som följer GenAI-semantiska konventioner
  • Agentåtgärder: BaseChatAgent med create_agent och invoke_agent sträcker sig över
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Observerbarhet för Agent Framework

Agent Framework ger omfattande observerbarhet genom flera metoder:

  • Nollkodskonfiguration: Automatisk instrumentering via miljövariabler
  • Manuell konfiguration: Programmatisk installation med anpassade parametrar
  • Omfattande telemetri: Agenter, arbetsflöden och spårning av verktygskörning
  • Konsolutdata: Inbyggd konsolloggning och visualisering
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Viktiga skillnader:

  • Konfigurationskomplexitet: Agent Framework erbjuder enklare alternativ för nollkodskonfiguration
  • Omfång: Agent Framework ger bredare täckning, inklusive observerbarhet på arbetsflödesnivå
  • Visualisering: Agent Framework innehåller inbyggda konsolutdata och utvecklingsgränssnitt
  • Konfiguration: Agent Framework erbjuder mer flexibla konfigurationsalternativ

Detaljerade exempel på observerbarhet finns i:


Conclusion

Den här migreringsguiden innehåller en omfattande mappning mellan AutoGen och Microsoft Agent Framework, som omfattar allt från grundläggande agentskapande till komplexa arbetsflöden för flera agenter. Viktiga lärdomar för migrering:

  • Migrering med en agent är enkel, med liknande API:er och förbättrade funktioner i Agent Framework
  • Multiagentmönster kräver att du omprövar din metod från händelsedrivna till dataflödesbaserade arkitekturer, men om du redan är bekant med GraphFlow blir övergången enklare
  • Agent Framework erbjuder ytterligare funktioner som mellanprogram, värdbaserade verktyg och typade arbetsflöden

Ytterligare exempel och detaljerad implementeringsvägledning finns i katalogen Agent Framework-exempel .

Ytterligare exempelkategorier

Agent Framework innehåller exempel på flera andra viktiga områden:

Nästa steg