Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
En omfattande guide för migrering från AutoGen till Microsoft Agent Framework Python SDK.
Innehållsförteckning
- Bakgrund
- Viktiga likheter och skillnader
- Skapa och konfigurera modellklient
-
Single-Agent funktionsmappning
- Skapande och körning av grundläggande agent
- Hantera konversationstillstånd med AgentThread
- OpenAI Assistant Agent Equivalence
- Stöd för direktuppspelning
- Meddelandetyper och skapande
- Skapa och integrera verktyg
- Värdbaserade verktyg (exklusivt agentramverk)
- Stöd för MCP-server
- Mönster för agent som ett verktyg
- Mellanprogram (Agent Framework-funktion)
- Anpassade agenter
- Funktionsmappning för flera agenter
- Observerbarhet
- Slutsats
Bakgrund
AutoGen är ett ramverk för att skapa AI-agenter och system med flera agenter med hjälp av stora språkmodeller (LLM). Det startade som ett forskningsprojekt på Microsoft Research och banade väg för flera begrepp inom orkestrering med flera agenter, till exempel GroupChat och händelsedriven agentkörning. Projektet har varit ett givande samarbete med communityn med öppen källkod och många viktiga funktioner kom från externa deltagare.
Microsoft Agent Framework är ett nytt SDK med flera språk för att skapa AI-agenter och arbetsflöden med hjälp av LLM:er. Det representerar en betydande utveckling av de idéer som banat väg för AutoGen och innehåller lärdomar från verklig användning. Den har utvecklats av kärnteamen AutoGen och Semantic Kernel på Microsoft och är utformad för att vara en ny grund för att skapa AI-program framöver.
Den här guiden beskriver en praktisk migreringsväg: den börjar med att ta upp vad som förblir detsamma och vilka förändringar som sker snabbt. Sedan omfattar den modellklientkonfiguration, funktioner för en agent och slutligen orkestrering med flera agenter med konkret kod sida vid sida. Längs vägen hjälper länkar till körbara exempel på Agent Framework-lagringsplatsen dig att verifiera varje steg.
Viktiga likheter och skillnader
Vad förblir detsamma
Grunderna är bekanta. Du skapar fortfarande agenter runt en modellklient, ger instruktioner och bifogar verktyg. Båda biblioteken stöder verktyg i funktionsstil, tokenströmning, multimodalt innehåll och asynkron I/O.
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
Viktiga skillnader
Orkestreringsformat: AutoGen parar ihop en händelsedriven kärna med en hög nivå
Team. Agent Framework fokuserar på ett skrivet diagrambaseratWorkflowsom dirigerar data längs kanter och aktiverar exekutorer när indata är klara.Verktyg: AutoGen omsluter funktioner med
FunctionTool. Agent Framework använder@ai_function, härleder scheman automatiskt och lägger till värdbaserade verktyg som en kodtolkare och webbsökning.Agentbeteende:
AssistantAgentär en enda tur om du inte ökarmax_tool_iterations.ChatAgentär multi-turn som standard och fortsätter att anropa verktyg tills det kan returnera ett slutligt svar.Körning: AutoGen erbjuder inbäddade och experimentella distribuerade körningar. Agent Framework fokuserar på en processsammansättning idag; distribuerad körning planeras.
Skapa och konfigurera modellklient
Båda ramverken tillhandahåller modellklienter för större AI-leverantörer, med liknande men inte identiska API:er.
| Egenskap | AutoGen | Agentramverk |
|---|---|---|
| OpenAI-klient | OpenAIChatCompletionClient |
OpenAIChatClient |
| OpenAI-svarsklient | ❌ Inte tillgängligt | OpenAIResponsesClient |
| Azure OpenAI | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Azure OpenAI-svar | ❌ Inte tillgängligt | AzureOpenAIResponsesClient |
| Azure AI | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Människoorienterad | AnthropicChatCompletionClient |
🚧 Planerad |
| Ollama | OllamaChatCompletionClient |
🚧 Planerad |
| Cache |
ChatCompletionCache omslag |
🚧 Planerad |
AutoGen-modellklienter
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
Agent Framework ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
Detaljerade exempel finns i:
- OpenAI-chattklient – Grundläggande OpenAI-klientkonfiguration
- Azure OpenAI-chattklient – Azure OpenAI med autentisering
- Azure AI-klient – Azure AI-agentintegrering
Api-stöd för svar (exklusivt agentramverk)
Agent Framework och ger särskilt stöd för resonemangsmodeller AzureOpenAIResponsesClientOpenAIResponsesClient och strukturerade svar som inte är tillgängliga i AutoGen:
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
För svars-API-exempel, se:
- Azure Responses Client Basic – Azure OpenAI med svar
- OpenAI Responses Client Basic – OpenAI-svarsintegrering
Single-Agent funktionsmappning
Det här avsnittet mappar funktioner med en agent mellan AutoGen och Agent Framework. Med en klient på plats skapar du en agent, bifogar verktyg och väljer mellan körning av icke-direktuppspelning och strömning.
Skapande och körning av grundläggande agent
När du har konfigurerat en modellklient är nästa steg att skapa agenter. Båda ramverken tillhandahåller liknande agentabstraktioner, men med olika standardbeteenden och konfigurationsalternativ.
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
Viktiga skillnader:
-
Standardbeteende:
ChatAgentitererar automatiskt via verktygsanrop, samtidigtAssistantAgentsom explicit inställning krävsmax_tool_iterations -
Körningskonfiguration
ChatAgent.run():toolsaccepterar ochtool_choiceparametrar för anpassning per anrop - Fabriksmetoder: Agent Framework tillhandahåller praktiska fabriksmetoder direkt från chattklienter
-
Tillståndshantering:
ChatAgentär tillståndslös och upprätthåller inte konversationshistorik mellan anrop, till skillnad frånAssistantAgentvilket upprätthåller konversationshistoriken som en del av dess tillstånd
Hantera konversationstillstånd med AgentThread
Om du vill fortsätta konversationer med ChatAgentanvänder du AgentThread för att hantera konversationshistorik:
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
Tillståndslös som standard: snabbdemo
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
Exempel på trådhantering finns i:
- Azure AI med tråd – Hantering av konversationstillstånd
- OpenAI-chattklient med tråd – Trådanvändningsmönster
- Redis-backade trådar – Bevara konversationstillstånd externt
OpenAI Assistant Agent Equivalence
Båda ramverken tillhandahåller OpenAI Assistant API-integrering:
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
Exempel på OpenAI Assistant finns i:
- OpenAI Assistants Basic – Grundläggande assistentkonfiguration
- OpenAI-assistenter med funktionsverktyg – Integrering av anpassade verktyg
- Azure OpenAI Assistants Basic – Konfiguration av Azure Assistant
- OpenAI-assistenter med tråd – Trådhantering
Stöd för direktuppspelning
Båda ramverken strömmar token i realtid – från klienter och från agenter – för att hålla UIs responsiva.
AutoGen-direktuppspelning
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Agent Framework-direktuppspelning
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
Tips: I Agent Framework ger både klienter och agenter samma uppdateringsform. du kan läsa chunk.text i båda fallen.
Meddelandetyper och skapande
Att förstå hur meddelanden fungerar är avgörande för effektiv agentkommunikation. Båda ramverken tillhandahåller olika metoder för att skapa och hantera meddelanden, med AutoGen med separata meddelandeklasser och Agent Framework med hjälp av ett enhetligt meddelandesystem.
Autogen meddelandetyper
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Meddelandetyper för Agent Framework
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
Viktiga skillnader:
- AutoGen använder separata meddelandeklasser (
TextMessage,MultiModalMessage) med ettsourcefält - Agent Framework använder ett enhetligt
ChatMessageinnehåll med inskrivna innehållsobjekt och ettrolefält - Agent Framework-meddelanden använder
Roleuppräkning (ANVÄNDARE, ASSISTENT, SYSTEM, VERKTYG) i stället för strängkällor
Skapa och integrera verktyg
Verktyg utökar agentfunktioner utöver textgenerering. Ramverken använder olika metoder för att skapa verktyg, där Agent Framework tillhandahåller mer automatiserad schemagenerering.
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Agent Framework @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
Detaljerade exempel finns i:
- OpenAI Chat Agent Basic – Enkel OpenAI-chattagent
- OpenAI med funktionsverktyg – Agent med anpassade verktyg
- Azure OpenAI Basic – Konfiguration av Azure OpenAI-agent
Värdbaserade verktyg (exklusivt agentramverk)
Agent Framework tillhandahåller värdbaserade verktyg som inte är tillgängliga i AutoGen:
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
Detaljerade exempel finns i:
- Azure AI med kodtolkare – kodkörningsverktyg
- Azure AI med flera verktyg – flera värdbaserade verktyg
- OpenAI med webbsökning – webbsökningsintegrering
Krav och varningar:
- Värdbaserade verktyg är endast tillgängliga på modeller/konton som stöder dem. Kontrollera berättiganden och modellstöd för din leverantör innan du aktiverar dessa verktyg.
- Konfigurationen skiljer sig åt beroende på leverantör. följ kraven i varje exempel för konfiguration och behörigheter.
- Alla modeller stöder inte alla värdbaserade verktyg (till exempel webbsökning kontra kodtolk). Välj en kompatibel modell i din miljö.
Anmärkning
AutoGen stöder verktyg för lokal kodkörning, men den här funktionen planeras för framtida Agent Framework-versioner.
Nyckelskillnad: Agent Framework hanterar verktygs iteration automatiskt på agentnivå. Till skillnad från AutoGens max_tool_iterations parameter fortsätter Agent Framework-agenter att köra verktyg tills de är klara som standard, med inbyggda säkerhetsmekanismer för att förhindra oändliga loopar.
Stöd för MCP-server
För avancerad verktygsintegrering stöder båda ramverken McP (Model Context Protocol), vilket gör det möjligt för agenter att interagera med externa tjänster och datakällor. Agent Framework ger mer omfattande inbyggt stöd.
Stöd för AutoGen MCP
AutoGen har grundläggande MCP-stöd via tillägg (specifik implementeringsinformation varierar beroende på version).
Agent Framework MCP Support
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
McP-exempel finns i:
- OpenAI med lokal MCP – Använda MCPStreamableHTTPTool med OpenAI
- OpenAI med värdbaserad MCP – Använda värdbaserade MCP-tjänster
- Azure AI med lokal MCP – Använda MCP med Azure AI
- Azure AI med värdbaserad MCP – Använda värdbaserad MCP med Azure AI
Mönster för agent som ett verktyg
Ett kraftfullt mönster är att använda själva agenterna som verktyg, vilket möjliggör hierarkiska agentarkitekturer. Båda ramverken stöder det här mönstret med olika implementeringar.
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
Explicit migreringsanteckning: I AutoGen anger du parallel_tool_calls=False på koordinatorns modellklient när du omsluter agenter som verktyg för att undvika samtidighetsproblem när du anropar samma agentinstans.
I Agent Framework as_tool() kräver inte inaktivering av parallella verktygsanrop eftersom agenter är tillståndslösa som standard.
Mellanprogram (Agent Framework-funktion)
Agent Framework introducerar mellanprogramsfunktioner som AutoGen saknar. Mellanprogram möjliggör kraftfulla övergripande problem som loggning, säkerhet och prestandaövervakning.
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
Fördelar:
- Säkerhet: Validering av indata och innehållsfiltrering
- Observerbarhet: Loggning, mått och spårning
- Prestanda: Cachelagring och hastighetsbegränsning
- Felhantering: Graciös nedbrytnings- och återförsökslogik
Detaljerade exempel på mellanprogram finns i:
- Funktionsbaserat mellanprogram – Enkel funktionsmellanprogram
- Klassbaserat mellanprogram – objektorienterat mellanprogram
- Undantagshantering av mellanprogram – Mönster för felhantering
- Mellanprogram för delat tillstånd – Tillståndshantering mellan agenter
Skräddarsydda agenter
Ibland vill du inte ha någon modellbaserad agent alls – du vill ha en deterministisk eller API-baserad agent med anpassad logik. Båda ramverken har stöd för att skapa anpassade agenter, men mönstren skiljer sig åt.
AutoGen: Subclass BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- Implementera
on_messages(...)och returnera ettResponsemed ett chattmeddelande. - Du kan också implementera
on_reset(...)för att rensa internt tillstånd mellan körningar.
Agent Framework: Utöka BaseAgent (trådmedveten)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThreadupprätthåller konversationstillståndet externt. användagent.get_new_thread()och skicka den tillrun/run_stream. - Anropa
self._notify_thread_of_new_messages(thread, input_messages, response_messages)så att tråden har båda sidor av utbytet. - Se det fullständiga exemplet: Anpassad agent
Nu ska vi titta på orkestrering med flera agenter – det område där ramverken skiljer sig mest åt.
Funktionsmappning för flera agenter
Översikt över programmeringsmodell
Programmeringsmodellerna för flera agenter representerar den viktigaste skillnaden mellan de två ramverken.
AutoGens metod med dubbla modeller
AutoGen innehåller två programmeringsmodeller:
-
autogen-core: Låg nivå, händelsedriven programmering medRoutedAgentoch meddelandeprenumerationer -
Teamabstraktion: En övergripande, körningscentrerad modell som bygger påautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
Utmaningar:
- Lågnivåmodellen är för komplex för de flesta användare
- Högnivåmodell kan bli begränsande för komplexa beteenden
- Bryggning mellan de två modellerna ger implementeringskomplexitet
Agent Frameworks enhetliga arbetsflödesmodell
Agent Framework ger en enda Workflow abstraktion som kombinerar det bästa av båda metoderna:
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
Detaljerade arbetsflödesexempel finns i:
- Grundläggande arbetsflöde – Introduktion till kör- och kanter
- Agenter i arbetsflödet – Integrera agenter i arbetsflöden
- Arbetsflödesströmning – körning av arbetsflöden i realtid
Fördelar:
- Enhetlig modell: Enkel abstraktion för alla komplexitetsnivåer
- Typsäkerhet: Starkt typinmatningar och utdata
- Diagramvisualisering: Rensa dataflödesrepresentation
- Flexibel sammansättning: Blanda agenter, funktioner och underarbetsflöden
Arbetsflöde jämfört med GraphFlow
Agent Frameworks Workflow abstraktion är inspirerad av AutoGens experimentella GraphFlow funktion, men representerar en betydande utveckling inom designfilosofi:
- GraphFlow: Kontrollflöde baserat på var kanter är övergångar och meddelanden sänds till alla agenter. övergångar villkoras på innehåll för utsända meddelanden
- Arbetsflöde: Dataflöde baserat på var meddelanden dirigeras via specifika kanter och utförare aktiveras av kanter, med stöd för samtidig körning.
Visuell översikt
Diagrammet nedan kontrasterar AutoGens graphflow för kontrollflöde (vänster) med Agent Frameworks dataflödesarbetsflöde (höger). GraphFlow modellerar agenter som noder med villkorsstyrda övergångar och sändningar. Körmodeller för arbetsflödesmodeller (agenter, funktioner eller underarbetsflöden) som är anslutna via inskrivna kanter. Den stöder också pauser och kontrollpunkter för begäran/svar.
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
I praktiken:
- GraphFlow använder agenter som noder och sänder meddelanden. kanter representerar villkorsstyrda övergångar.
- Arbetsflödesvägar skrev meddelanden längs kanterna. Noder (exekutorer) kan vara agenter, rena funktioner eller underarbetsflöden.
- Med begäran/svar kan ett arbetsflöde pausas för externa indata. kontrollpunkter bevarar förloppet och aktiverar återuppta.
Kodjämförelse
1) Sekventiell + villkorsstyrd
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) Utrullning + koppling (ALLA vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) Riktad routning (ingen sändning)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
Vad du bör lägga märke till:
- GraphFlow sänder meddelanden och använder villkorsstyrda övergångar. Kopplingsbeteendet konfigureras via målsidan
activationoch per kantactivation_group/activation_condition(till exempel gruppera båda kanterna ijoin_dmedactivation_condition="any"). - Arbetsflödet dirigerar data explicit; använd
target_idför att välja underordnade körbara filer. Join behavior lives in the receiving executor (till exempel yield on first input vs wait for all) eller via orchestration builders/aggregators. - Körbara filer i arbetsflödet är i fritt format: omsluta en
ChatAgent, en funktion eller ett underarbetsflöde och blanda dem i samma graf.
Viktiga skillnader
Tabellen nedan sammanfattar de grundläggande skillnaderna mellan AutoGens GraphFlow och Agent Frameworks arbetsflöde:
| Aspekt | AutoGen GraphFlow | Agent Framework-arbetsflöde |
|---|---|---|
| Flödestyp | Kontrollflöde (kanter är övergångar) | Dataflöde (kanter dirigera meddelanden) |
| Nodtyper | Endast agenter | Agenter, funktioner, underarbetsflöden |
| Aktivering | Meddelandesändning | Edge-baserad aktivering |
| Typsäkerhet | Limited | Stark skrivning genomgående |
| Sammansättning | Limited | Mycket komposterbar |
Kapslingsmönster
AutoGen Team Nesting
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
Egenskaper för AutoGen-kapsling:
- Kapslat team tar emot alla meddelanden från det yttre teamet
- Kapslade teammeddelanden sänds till alla deltagare i det yttre teamet
- Kontext för delat meddelande på alla nivåer
Kapsling av Agent Framework-arbetsflöde
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Egenskaper för Agent Framework-kapsling:
- Isolerade indata/utdata via
WorkflowExecutor - Inga meddelandesändningar – dataflöden via specifika anslutningar
- Oberoende tillståndshantering för varje arbetsflödesnivå
Gruppchattmönster
Med gruppchattmönster kan flera agenter samarbeta om komplexa uppgifter. Så här översätts vanliga mönster mellan ramverk.
RoundRobinGroupChat-mönster
AutoGen-implementering:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
Implementering av Agent Framework:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
Detaljerade orkestreringsexempel finns i:
- Sekventiella agenter – agentkörning med resursallokeringsformat
- Sekventiella anpassade utförare – Anpassade körmönster
För samtidiga körningsmönster tillhandahåller Agent Framework även:
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
Exempel på samtidig körning finns i:
- Samtidiga agenter – parallell agentkörning
- Samtidiga anpassade körbara filer – anpassade parallella mönster
- Samtidig med anpassad aggregator – resultataggregeringsmönster
MagenticOneGroupChat-mönster
AutoGen-implementering:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
Implementering av Agent Framework:
from agent_framework import (
MagenticBuilder, MagenticCallbackMode, WorkflowOutputEvent,
MagenticCallbackEvent, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent
)
# Assume we have researcher, coder, and coordinator_client from previous examples
async def on_event(event: MagenticCallbackEvent) -> None:
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"[ORCHESTRATOR]: {event.message.text}")
elif isinstance(event, MagenticAgentDeltaEvent):
print(f"[{event.agent_id}]: {event.text}", end="")
workflow = (MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=coordinator_client,
max_round_count=20,
max_stall_count=3,
max_reset_count=2
)
.build())
# Example usage (would be in async context)
async def magentic_example():
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, WorkflowOutputEvent):
final_result = event.data
Anpassningsalternativ för Agent Framework:
Det magentiska arbetsflödet innehåller omfattande anpassningsalternativ:
- Konfiguration av chef: Anpassade orkestreringsmodeller och uppmaningar
-
Avrunda gränser:
max_round_count,max_stall_count,max_reset_count - Återanrop till händelser: Direktuppspelning i realtid med detaljerad händelsefiltrering
- Agentspecialisering: Anpassade instruktioner och verktyg per agent
-
Återanropslägen:
STREAMINGför realtidsuppdateringar ellerBATCHför slutliga resultat - Planering av människa i loopen: Anpassade planeringsfunktioner för interaktiva arbetsflöden
# Advanced customization example with human-in-the-loop
from agent_framework.openai import OpenAIChatClient
from agent_framework import MagenticBuilder, MagenticCallbackMode, MagenticPlannerContext
# Assume we have researcher_agent, coder_agent, analyst_agent, detailed_event_handler
# and get_human_input function defined elsewhere
async def custom_planner(context: MagenticPlannerContext) -> str:
"""Custom planner with human input for critical decisions."""
if context.round_count > 5:
# Request human input for complex decisions
return await get_human_input(f"Next action for: {context.current_state}")
return "Continue with automated planning"
workflow = (MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent
)
.with_standard_manager(
chat_client=OpenAIChatClient(model_id="gpt-5"),
max_round_count=15, # Limit total rounds
max_stall_count=2, # Prevent infinite loops
max_reset_count=1, # Allow one reset on failure
orchestrator_prompt="Custom orchestration instructions..."
)
.with_planner(custom_planner) # Human-in-the-loop planning
.on_event(detailed_event_handler, mode=MagenticCallbackMode.STREAMING)
.build())
Detaljerade magentiska exempel finns i:
- Grundläggande magentiskt arbetsflöde – Standardorkestrerat arbetsflöde för flera agenter
- Magentisk med kontrollpunkter – Beständiga orkestrerade arbetsflöden
- Uppdatering av magentisk mänsklig plan – Planering av människa i loopen
Framtida mönster
Översikten för Agent Framework innehåller flera AutoGen-mönster som för närvarande är under utveckling:
- Swarm-mönster: Handoff-baserad agentsamordning
- SelectorGroupChat: LLM-driven talarval
Human-in-the-Loop med begärandesvar
En viktig ny funktion i Agent Framework är Workflow begreppet begäran och svar, vilket gör att arbetsflöden kan pausa körningen och vänta på externa indata innan de fortsätter. Den här funktionen finns inte i AutoGens Team abstraktion och möjliggör avancerade mönster för människor i loopen.
AutoGen-begränsningar
AutoGens abstraktion körs kontinuerligt när den har startats Team och tillhandahåller inte inbyggda mekanismer för att pausa körningen av mänskliga indata. Alla funktioner för människa i loopen kräver anpassade implementeringar utanför ramverket.
Agent Framework RequestInfoExecutor
Agent Framework tillhandahåller RequestInfoExecutor – en arbetsflödesbaserad brygga som pausar grafen vid en begäran om information, genererar en RequestInfoEvent med en skriven nyttolast och återupptar körningen först när programmet tillhandahåller en matchande RequestResponse.
from agent_framework import (
RequestInfoExecutor, RequestInfoEvent, RequestInfoMessage,
RequestResponse, WorkflowBuilder, WorkflowContext, executor
)
from dataclasses import dataclass
from typing_extensions import Never
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest(RequestInfoMessage):
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
@executor(id="reviewer")
async def approval_executor(
agent_response: str,
ctx: WorkflowContext[ApprovalRequest]
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.send_message(approval_request)
# Human feedback handler
@executor(id="processor")
async def process_approval(
feedback: RequestResponse[ApprovalRequest, str],
ctx: WorkflowContext[Never, str]
) -> None:
decision = feedback.data.strip().lower()
original_content = feedback.original_request.content
if decision == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
hitl_executor = RequestInfoExecutor(id="request_approval")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, approval_executor)
.add_edge(approval_executor, hitl_executor)
.add_edge(hitl_executor, process_approval)
.set_start_executor(agent_executor)
.build())
Köra human-in-the-Loop-arbetsflöden
Agent Framework tillhandahåller API:er för direktuppspelning för att hantera paus-återuppta-cykeln:
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
Exempel på arbetsflöden för människa i loopen finns i:
- Gissa spel med mänsklig indata – interaktivt arbetsflöde med användarfeedback
- Arbetsflöde som agent med mänsklig indata – kapslade arbetsflöden med mänsklig interaktion
Kontrollpunkter och återuppta arbetsflöden
En annan viktig fördel med Agent Framework jämfört Workflow med AutoGens Team abstraktion är inbyggt stöd för kontrollpunkter och återupptagande av körning. Detta gör att arbetsflöden kan pausas, bevaras och återupptas senare från alla kontrollpunkter, vilket ger feltolerans och aktiverar långvariga eller asynkrona arbetsflöden.
AutoGen-begränsningar
AutoGens Team abstraktion ger inte inbyggda kontrollpunktsfunktioner. Beständighets- eller återställningsmekanismer måste implementeras externt, vilket ofta kräver komplex tillståndshantering och serialiseringslogik.
Kontrollpunkter för Agent Framework
Agent Framework tillhandahåller omfattande kontrollpunkter genom FileCheckpointStorage och metoden på with_checkpointing()WorkflowBuilder. Samla in kontrollpunkter:
-
Körtillstånd: Lokalt tillstånd för varje köre med hjälp av
ctx.set_executor_state() -
Delat tillstånd: Korsexekutortillstånd med hjälp av
ctx.set_shared_state() - Meddelandeköer: Väntande meddelanden mellan utförare
- Arbetsflödesposition: Aktuell körningsstatus och nästa steg
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
Återuppta från kontrollpunkter
Agent Framework tillhandahåller API:er för att lista, inspektera och återuppta från specifika kontrollpunkter:
from agent_framework import (
RequestInfoExecutor, FileCheckpointStorage, WorkflowBuilder,
Executor, WorkflowContext, handler
)
from typing_extensions import Never
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = RequestInfoExecutor.checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
print(f" Shared state: {checkpoint.shared_state}")
print(f" Executor states: {list(checkpoint.executor_states.keys())}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream_from_checkpoint(
chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
Avancerade kontrollpunktsfunktioner
Kontrollpunkt med Integrering av människa i loopen:
Kontrollpunkter fungerar sömlöst med arbetsflöden för människor i loopen, vilket gör att arbetsflöden kan pausas för mänsklig indata och återupptas senare:
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_responses_example():
# Resume with pre-supplied human responses
responses = {"request_id_123": "approved"}
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id,
checkpoint_storage=checkpoint_storage,
responses=responses # Pre-supply human responses
):
print(f"Event: {event}")
Viktiga fördelar
Jämfört med AutoGen tillhandahåller Agent Framework kontrollpunkter:
- Automatisk beständighet: Ingen manuell tillståndshantering krävs
- Detaljerad återställning: Återuppta från valfri gräns för supersteg
- Tillståndsisolering: Separat kör-lokalt och delat tillstånd
- Integrering av människa i loopen: Sömlös paus-cv med mänskliga indata
- Feltolerans: Robust återställning från fel eller avbrott
Praktiska exempel
Omfattande kontrollpunktsexempel finns i:
- Kontrollpunkt med Cv – Grundläggande kontrollpunkter och interaktivt CV
- Kontrollpunkt med Human-in-the-Loop – Beständiga arbetsflöden med grindar för mänskligt godkännande
- Kontrollpunkt för underarbetsflöde – Checkpointing kapslade arbetsflöden
- Magentisk kontrollpunkt – Kontrollpunktsorkestrerade arbetsflöden för flera agenter
Observability
Både AutoGen och Agent Framework tillhandahåller observerbarhetsfunktioner, men med olika metoder och funktioner.
AutoGen-observabilitet
AutoGen har inbyggt stöd för OpenTelemetry med instrumentation för:
-
Körningsspårning:
SingleThreadedAgentRuntimeochGrpcWorkerAgentRuntime -
Verktygskörning:
BaseToolmedexecute_toolintervall som följer GenAI-semantiska konventioner -
Agentåtgärder:
BaseChatAgentmedcreate_agentochinvoke_agentsträcker sig över
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Observerbarhet för Agent Framework
Agent Framework ger omfattande observerbarhet genom flera metoder:
- Nollkodskonfiguration: Automatisk instrumentering via miljövariabler
- Manuell konfiguration: Programmatisk installation med anpassade parametrar
- Omfattande telemetri: Agenter, arbetsflöden och spårning av verktygskörning
- Konsolutdata: Inbyggd konsolloggning och visualisering
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
Viktiga skillnader:
- Konfigurationskomplexitet: Agent Framework erbjuder enklare alternativ för nollkodskonfiguration
- Omfång: Agent Framework ger bredare täckning, inklusive observerbarhet på arbetsflödesnivå
- Visualisering: Agent Framework innehåller inbyggda konsolutdata och utvecklingsgränssnitt
- Konfiguration: Agent Framework erbjuder mer flexibla konfigurationsalternativ
Detaljerade exempel på observerbarhet finns i:
- Nollkodskonfiguration – Miljövariabelkonfiguration
- Manuell installation – Programmatisk konfiguration
- Agentobservabilitet – telemetri med en enda agent
- Arbetsflödesobservabilitet – spårning av arbetsflöden med flera agenter
Conclusion
Den här migreringsguiden innehåller en omfattande mappning mellan AutoGen och Microsoft Agent Framework, som omfattar allt från grundläggande agentskapande till komplexa arbetsflöden för flera agenter. Viktiga lärdomar för migrering:
- Migrering med en agent är enkel, med liknande API:er och förbättrade funktioner i Agent Framework
- Multiagentmönster kräver att du omprövar din metod från händelsedrivna till dataflödesbaserade arkitekturer, men om du redan är bekant med GraphFlow blir övergången enklare
- Agent Framework erbjuder ytterligare funktioner som mellanprogram, värdbaserade verktyg och typade arbetsflöden
Ytterligare exempel och detaljerad implementeringsvägledning finns i katalogen Agent Framework-exempel .
Ytterligare exempelkategorier
Agent Framework innehåller exempel på flera andra viktiga områden:
- Trådar: Trådexempel – Hantera konversationstillstånd och kontext
- Multimodala indata: Multimodala exempel – Arbeta med bilder och andra medietyper
- Kontextprovidrar: Exempel på kontextprovider – Integreringsmönster för extern kontext