从 AutoGen 迁移到 Microsoft Agent Framework Python SDK 的综合指南。
目录
背景
AutoGen 是一个框架,用于使用大型语言模型(LLM)生成 AI 代理和多代理系统。 它最初是Microsoft研究的一个项目,并在多代理业务流程(如 GroupChat 和事件驱动的代理运行时)中开创了几个概念。 该项目是开源社区的富有成效的合作,许多重要功能来自外部参与者。
Microsoft Agent Framework 是一个新的多语言 SDK,用于使用 LLM 生成 AI 代理和工作流。 它代表了 AutoGen 中开创的想法的重大演变,并融入了从实际使用中吸取的教训。 它由Microsoft的核心 AutoGen 和语义内核团队开发,旨在成为未来构建 AI 应用程序的新基础。
本指南介绍了一个实际迁移路径:首先介绍保持不变的内容以及一目了然的变化。 然后,它介绍模型客户端设置、单代理功能,最后介绍包含具体代码并排的多代理业务流程。 在此过程中,代理框架存储库中可运行的示例的链接可帮助你验证每个步骤。
主要相似性和差异
保持不变的内容
基础很熟悉。 你仍围绕模型客户端创建代理、提供说明和附加工具。 这两个库都支持函数样式工具、令牌流式处理、多模式内容和异步 I/O。
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
主要区别
- 业务流程样式:AutoGen 将事件驱动的核心与高级 - Team配对。 代理框架以基于- Workflow图形的类型化为中心,在输入准备就绪时沿边缘路由数据并激活执行程序。
- 工具:AutoGen 使用 - FunctionTool. 代理框架自动使用- @ai_function、推断架构,并添加托管工具,例如代码解释器和 Web 搜索。
- 代理行为: - AssistantAgent除非增加,否则为单轮次- max_tool_iterations。- ChatAgent默认情况下为多轮次,并一直调用工具,直到它可以返回最终答案。
- 运行时:AutoGen 提供嵌入式和实验性分布式运行时。 代理框架目前侧重于单进程组合;计划分布式执行。 
模型客户端创建和配置
这两个框架都为主要 AI 提供程序提供模型客户端,其 API 类似但并不完全相同。
| 功能 / 特点 | AutoGen | 代理框架 | 
|---|---|---|
| OpenAI 客户端 | OpenAIChatCompletionClient | OpenAIChatClient | 
| OpenAI 响应客户端 | ❌ 不可用 | OpenAIResponsesClient | 
| Azure OpenAI | AzureOpenAIChatCompletionClient | AzureOpenAIChatClient | 
| Azure OpenAI 响应 | ❌ 不可用 | AzureOpenAIResponsesClient | 
| Azure AI | AzureAIChatCompletionClient | AzureAIAgentClient | 
| Anthropic | AnthropicChatCompletionClient | 🚧 计划 | 
| Ollama | OllamaChatCompletionClient | 🚧 计划 | 
| Caching | ChatCompletionCache包装纸 | 🚧 计划 | 
AutoGen 模型客户端
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)
代理框架 ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
有关详细示例,请参阅:
- OpenAI 聊天客户端 - 基本 OpenAI 客户端设置
- Azure OpenAI 聊天客户端 - 使用身份验证的 Azure OpenAI
- Azure AI 客户端 - Azure AI 代理集成
响应 API 支持(代理框架独占)
代理框架, AzureOpenAIResponsesClient 并为 OpenAIResponsesClient AutoGen 中不可用的推理模型和结构化响应提供专用支持:
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
有关响应 API 示例,请参阅:
- Azure 响应客户端基本 - 包含响应的 Azure OpenAI
- OpenAI 响应客户端基本 - OpenAI 响应集成
Single-Agent 功能映射
本部分映射 AutoGen 和 Agent Framework 之间的单代理功能。 在客户端就位后,创建代理、附加工具,并在非流式处理和流式处理执行之间进行选择。
基本代理创建和执行
配置模型客户端后,下一步是创建代理。 这两个框架提供类似的代理抽象,但具有不同的默认行为和配置选项。
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
代理框架 ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )
    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )
    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )
主要差异:
- 
              默认行为: ChatAgent自动循环访问工具调用,同时AssistantAgent需要显式max_tool_iterations设置
- 
              运行时配置: ChatAgent.run()接受tools和tool_choice参数进行按调用自定义
- 工厂方法:代理框架直接从聊天客户端提供方便的工厂方法
- 
              状态管理:ChatAgent是无状态的,并且不维护调用之间的聊天历史记录,这与将会话历史记录作为状态的一部分不同AssistantAgent
使用 AgentThread 管理聊天状态
若要继续对话 ChatAgent,请使用 AgentThread 以下方法管理对话历史记录:
# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()
    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"
    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)
    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
默认情况下无状态:快速演示
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"
有关线程管理示例,请参阅:
- 使用线程的 Azure AI - 对话状态管理
- 使用线程的 OpenAI 聊天客户端 - 线程使用模式
- Redis 支持的线程 - 在外部保留会话状态
OpenAI 助手代理等效性
这两个框架都提供 OpenAI 助手 API 集成:
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
有关 OpenAI 助手示例,请参阅:
- OpenAI 助手基本 - 基本助理设置
- 使用函数工具的 OpenAI 助手 - 自定义工具集成
- Azure OpenAI 助手基本 - Azure 助手设置
- 使用线程的 OpenAI 助手 - 线程管理
流式处理支持
这两个框架都实时流式传输令牌,从客户端和代理流式传输令牌,以保持 UI 响应。
AutoGen 流式处理
# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")
代理框架流式处理
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")
    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)
提示:在代理框架中,客户端和代理生成相同的更新形状;可以在任一情况下读取 chunk.text 。
消息类型和创建
了解消息的工作原理对于有效的代理通信至关重要。 这两个框架提供了不同的消息创建和处理方法,AutoGen 使用单独的消息类和使用统一消息系统的代理框架。
AutoGen 消息类型
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
代理框架消息类型
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)
主要差异:
- AutoGen 对字段使用单独的消息类 (TextMessage,MultiModalMessage)source
- 代理框架使用具有类型化内容对象和字段的统ChatMessage一role
- 代理框架消息使用 Role枚举(USER、ASSISTANT、SYSTEM、TOOL)而不是字符串源
工具创建和集成
工具将代理功能扩展到文本生成之外。 这些框架采用不同的工具来创建工具,代理框架提供更自动化的架构生成。
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
代理框架 @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
有关详细示例,请参阅:
- OpenAI 聊天代理基本 - 简单的 OpenAI 聊天代理
- 使用函数工具的 OpenAI - 使用自定义工具的代理
- Azure OpenAI Basic - Azure OpenAI 代理设置
托管工具(代理框架独占)
代理框架提供 AutoGen 中不可用的托管工具:
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)
有关详细示例,请参阅:
- 使用代码解释器的 Azure AI - 代码执行工具
- 使用多个工具的 Azure AI - 多个托管工具
- OpenAI 与 Web 搜索 - Web 搜索集成
要求和注意事项:
- 托管工具仅适用于支持它们的模型/帐户。 在启用这些工具之前,请验证提供商的权利和模型支持。
- 配置因提供程序而异;按照每个示例中的先决条件进行设置和权限。
- 并非每个模型都支持每个托管工具(例如 Web 搜索与代码解释器)。 在环境中选择兼容的模型。
注释
AutoGen 支持本地代码执行工具,但此功能计划用于将来的 Agent Framework 版本。
              主要区别:代理框架在代理级别自动处理工具迭代。 与 AutoGen max_tool_iterations 的参数不同,代理框架代理会继续执行工具,直到默认完成,内置安全机制可防止无限循环。
MCP 服务器支持
对于高级工具集成,这两个框架都支持模型上下文协议(MCP),使代理能够与外部服务和数据源进行交互。 代理框架提供更全面的内置支持。
AutoGen MCP 支持
AutoGen 通过扩展提供基本的 MCP 支持(具体实现详细信息因版本而异)。
代理框架 MCP 支持
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
有关 MCP 示例,请参阅:
- 将 OpenAI 与本地 MCP 配合使用 - 将 MCPStreamableHTTPTool 与 OpenAI 配合使用
- 使用托管 MCP 的 OpenAI - 使用托管 MCP 服务
- 将 Azure AI 与本地 MCP 配合使用 - 将 MCP 与 Azure AI 配合使用
- 将 Azure AI 与托管 MCP 配合使用 - 将托管 MCP 与 Azure AI 配合使用
代理即工具模式
一种强大的模式是使用代理本身作为工具,从而启用分层代理体系结构。 这两个框架都支持具有不同实现的此模式。
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)
显式迁移说明:在 AutoGen 中,在将代理包装为工具时在协调器模型客户端上设置 parallel_tool_calls=False ,以避免调用同一代理实例时出现并发问题。
在 Agent Framework 中,不需要禁用并行工具调用, as_tool() 因为默认情况下代理是无状态的。
中间件(代理框架功能)
代理框架引入了 AutoGen 缺少的中间件功能。 中间件可实现强大的跨领域关注点,例如日志记录、安全性和性能监视。
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")
async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)
agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)
优点:
- 安全性:输入验证和内容筛选
- 可观测性:日志记录、指标和跟踪
- 性能:缓存和速率限制
- 错误处理:正常降级和重试逻辑
有关详细的中间件示例,请参阅:
自定义代理
有时,你根本不需要模型支持的代理,你需要具有自定义逻辑的确定性代理或 API 支持的代理。 这两个框架都支持生成自定义代理,但模式不同。
AutoGen:Subclass BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)
    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)
    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- 实现 on_messages(...)并返回Response聊天消息。
- (可选)实现 on_reset(...)以清除运行之间的内部状态。
代理框架:扩展 BaseAgent (线程感知)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)
class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)
        return AgentRunResponse(messages=[reply])
    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
- 
              AgentThread在外部维护聊天状态;使用agent.get_new_thread()并将其传递给run/run_stream。
- 调用 self._notify_thread_of_new_messages(thread, input_messages, response_messages)以便线程具有交换的两侧。
- 请参阅完整示例: 自定义代理
接下来,让我们看看多代理业务流程,即框架差异最大的区域。
多代理功能映射
编程模型概述
多代理编程模型表示两个框架之间的最显著差异。
AutoGen 的双模型方法
AutoGen 提供两种编程模型:
- 
              autogen-core:低级别、事件驱动的编程和RoutedAgent消息订阅
- 
              
              Team抽象:基于基础构建的高级、以运行为中心的模型autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
挑战:
- 对于大多数用户来说,低级别模型过于复杂
- 高级模型可能会对复杂行为进行限制
- 两个模型之间的桥接增加了实现复杂性
代理框架的统一工作流模型
代理框架提供一个抽象,它将这两种方法的最佳组合在一 Workflow 起:
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
有关详细的工作流示例,请参阅:
优点:
- 统一模型:所有复杂级别的单个抽象
- 类型安全性:强类型输入和输出
- 图形可视化:清除数据流表示形式
- 灵活的组合:混合代理、函数和子工作流
工作流与 GraphFlow
Agent Framework 的 Workflow 抽象灵感来自 AutoGen 的实验 GraphFlow 功能,但代表了设计理念的重大演变:
- GraphFlow:基于控制流,边缘正在转换,消息广播到所有代理;转换条件为广播消息内容
- 工作流:基于数据流,消息通过特定边缘路由,执行程序由边缘激活,同时支持并发执行。
视觉对象概述
下图将 AutoGen 的控制流 GraphFlow(左)与 Agent Framework 的数据流工作流(右)形成对比。 GraphFlow 将代理建模为具有条件转换和广播的节点。 通过类型化边缘连接的工作流模型执行程序(代理、函数或子工作流);它还支持请求/响应暂停和检查点。
flowchart LR
  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end
  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end
  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2
  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1
在实践中:
- GraphFlow 使用代理作为节点并广播消息;边缘表示条件转换。
- 工作流路由沿边缘键入的消息。 节点(执行程序)可以是代理、纯函数或子工作流。
- 请求/响应允许工作流暂停外部输入;检查点会保留进度并启用恢复。
代码比较
1) 顺序 + 条件
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")
workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) 扇出 + 加入 (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join
wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)
wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) 定向路由 (无广播)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
要注意的内容:
- GraphFlow 广播消息并使用条件转换。 联接行为是通过目标端activation和每边缘activation_group/activation_condition配置的(例如,将两个边缘组合到一起)。join_dactivation_condition="any"
- 工作流显式路由数据;用于 target_id选择下游执行程序。 联接行为位于接收执行程序(例如,第一个输入时产生,或者等待全部),或通过业务流程生成器/聚合器生成。
- 工作流中的执行程序是自由格式的:包装、 ChatAgent函数或子工作流,并在同一图形中混合它们。
主要区别
下表总结了 AutoGen 的 GraphFlow 和 Agent Framework 工作流之间的基本差异:
| 方面 | AutoGen GraphFlow | 代理框架工作流 | 
|---|---|---|
| 流类型 | 控制流(边缘正在转换) | 数据流(边缘路由消息) | 
| 节点类型 | 仅代理 | 代理、函数、子工作流 | 
| 激活 | 消息广播 | 基于边缘的激活 | 
| 类型安全性 | 受限制 | 在整个类型中强键入 | 
| 可组合性 | 受限制 | 高度可组合 | 
嵌套模式
AutoGen 团队嵌套
# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
AutoGen 嵌套特征:
- 嵌套团队接收来自外部团队的所有消息
- 嵌套团队消息将广播给所有外部团队参与者
- 跨所有级别的共享消息上下文
代理框架工作流嵌套
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())
代理框架嵌套特征:
- 隔离的输入/输出通过 WorkflowExecutor
- 无消息广播 - 数据流经特定连接
- 每个工作流级别的独立状态管理
群组聊天模式
群组聊天模式使多个代理能够协作处理复杂的任务。 下面是框架之间常见模式的转换方式。
RoundRobinGroupChat 模式
AutoGen 实现:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
代理框架实现:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]
有关详细的业务流程示例,请参阅:
对于并发执行模式,代理框架还提供:
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())
# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents
有关并发执行示例,请参阅:
MagenticOneGroupChat 模式
AutoGen 实现:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
代理框架实现:
from agent_framework import (
    MagenticBuilder, MagenticCallbackMode, WorkflowOutputEvent,
    MagenticCallbackEvent, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent
)
# Assume we have researcher, coder, and coordinator_client from previous examples
async def on_event(event: MagenticCallbackEvent) -> None:
    if isinstance(event, MagenticOrchestratorMessageEvent):
        print(f"[ORCHESTRATOR]: {event.message.text}")
    elif isinstance(event, MagenticAgentDeltaEvent):
        print(f"[{event.agent_id}]: {event.text}", end="")
workflow = (MagenticBuilder()
           .participants(researcher=researcher, coder=coder)
           .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
           .with_standard_manager(
               chat_client=coordinator_client,
               max_round_count=20,
               max_stall_count=3,
               max_reset_count=2
           )
           .build())
# Example usage (would be in async context)
async def magentic_example():
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, WorkflowOutputEvent):
            final_result = event.data
代理框架自定义选项:
Magentic 工作流提供广泛的自定义选项:
- 管理器配置:自定义业务流程协调程序模型和提示
- 
              舍入限制: max_round_count、、max_stall_count、max_reset_count
- 事件回调:使用精细事件筛选进行实时流式处理
- 代理专用化:每个代理的自定义说明和工具
- 
              回调模式: STREAMING用于实时更新或BATCH最终结果
- 人工循环规划:交互式工作流的自定义规划器函数
# Advanced customization example with human-in-the-loop
from agent_framework.openai import OpenAIChatClient
from agent_framework import MagenticBuilder, MagenticCallbackMode, MagenticPlannerContext
# Assume we have researcher_agent, coder_agent, analyst_agent, detailed_event_handler
# and get_human_input function defined elsewhere
async def custom_planner(context: MagenticPlannerContext) -> str:
    """Custom planner with human input for critical decisions."""
    if context.round_count > 5:
        # Request human input for complex decisions
        return await get_human_input(f"Next action for: {context.current_state}")
    return "Continue with automated planning"
workflow = (MagenticBuilder()
           .participants(
               researcher=researcher_agent,
               coder=coder_agent,
               analyst=analyst_agent
           )
           .with_standard_manager(
               chat_client=OpenAIChatClient(model_id="gpt-5"),
               max_round_count=15,      # Limit total rounds
               max_stall_count=2,       # Prevent infinite loops
               max_reset_count=1,       # Allow one reset on failure
               orchestrator_prompt="Custom orchestration instructions..."
           )
           .with_planner(custom_planner)  # Human-in-the-loop planning
           .on_event(detailed_event_handler, mode=MagenticCallbackMode.STREAMING)
           .build())
有关详细的 Magentic 示例,请参阅:
- 基本 Magentic 工作流 - 标准协调的多代理工作流
- 使用检查点进行放大 - 持久协调的工作流
- 放大缩小字体计划更新 - 人机循环规划
未来模式
代理框架路线图包括当前正在开发的多个 AutoGen 模式:
- 群模式:基于交接的代理协调
- SelectorGroupChat:LLM 驱动的扬声器选择
具有请求响应的人机循环
Agent Framework Workflow 中的主要新功能是 请求和响应的概念,它允许工作流暂停执行并等待外部输入,然后再继续。 AutoGen 的 Team 抽象中不存在此功能,可实现复杂的人工循环模式。
AutoGen 限制
AutoGen 的 Team 抽象在启动后持续运行,不提供用于暂停人工输入执行的内置机制。 任何人工循环功能都需要框架外部的自定义实现。
Agent Framework RequestInfoExecutor
代理框架提供 RequestInfoExecutor - 一个工作流原生桥,在请求信息时暂停图形、发出 RequestInfoEvent 具有类型化有效负载的桥,并在应用程序提供匹配 RequestResponse后恢复执行。
from agent_framework import (
    RequestInfoExecutor, RequestInfoEvent, RequestInfoMessage,
    RequestResponse, WorkflowBuilder, WorkflowContext, executor
)
from dataclasses import dataclass
from typing_extensions import Never
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest(RequestInfoMessage):
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""
# Workflow executor that requests human approval
@executor(id="reviewer")
async def approval_executor(
    agent_response: str,
    ctx: WorkflowContext[ApprovalRequest]
) -> None:
    # Request human input with structured data
    approval_request = ApprovalRequest(
        content=agent_response,
        agent_name="writer_agent"
    )
    await ctx.send_message(approval_request)
# Human feedback handler
@executor(id="processor")
async def process_approval(
    feedback: RequestResponse[ApprovalRequest, str],
    ctx: WorkflowContext[Never, str]
) -> None:
    decision = feedback.data.strip().lower()
    original_content = feedback.original_request.content
    if decision == "approved":
        await ctx.yield_output(f"APPROVED: {original_content}")
    else:
        await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
hitl_executor = RequestInfoExecutor(id="request_approval")
workflow = (WorkflowBuilder()
           .add_edge(agent_executor, approval_executor)
           .add_edge(approval_executor, hitl_executor)
           .add_edge(hitl_executor, process_approval)
           .set_start_executor(agent_executor)
           .build())
运行人工循环工作流
代理框架提供流式处理 API 来处理暂停恢复周期:
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False
    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )
        events = [event async for event in stream]
        pending_responses = None
        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")
                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}
            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True
有关人工循环工作流示例,请参阅:
- 使用人工输入猜测游戏 - 具有用户反馈的交互式工作流
- 使用人工输入作为代理的工作流 - 通过人工交互的嵌套工作流
检查点和恢复工作流
Agent Framework Workflow 对 AutoGen Team 抽象的另一个关键优势是内置支持检查点和恢复执行。 这样,工作流就可以在任何检查点暂停、持久化和恢复,从而提供容错并启用长时间运行的或异步工作流。
AutoGen 限制
AutoGen 的 Team 抽象不提供内置检查点功能。 必须在外部实施任何持久性或恢复机制,这通常需要复杂的状态管理和序列化逻辑。
代理框架检查点
代理框架提供全面的检查点 FileCheckpointStorage 和 with_checkpointing() 方法 WorkflowBuilder。 检查点捕获:
- 
              执行程序状态:使用每个执行程序的本地状态 ctx.set_executor_state()
- 
              共享状态:使用跨执行程序状态 ctx.set_shared_state()
- 消息队列:执行程序之间的挂起消息
- 工作流位置:当前执行进度和后续步骤
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")
        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })
        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)
        await ctx.send_message(result)
class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())
# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")
从检查点恢复
代理框架提供从特定检查点列出、检查和恢复的 API:
from agent_framework import (
    RequestInfoExecutor, FileCheckpointStorage, WorkflowBuilder,
    Executor, WorkflowContext, handler
)
from typing_extensions import Never
class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)
class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")
    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()
    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = RequestInfoExecutor.checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
        print(f"  Shared state: {checkpoint.shared_state}")
        print(f"  Executor states: {list(checkpoint.executor_states.keys())}")
    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id
        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream_from_checkpoint(
            chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")
高级检查点功能
具有 Human-in-the-Loop 集成的检查点:
检查点可与人工循环工作流无缝配合工作,允许工作流暂停供人工输入,稍后恢复:
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_responses_example():
    # Resume with pre-supplied human responses
    responses = {"request_id_123": "approved"}
    async for event in workflow.run_stream_from_checkpoint(
        checkpoint_id,
        checkpoint_storage=checkpoint_storage,
        responses=responses  # Pre-supply human responses
    ):
        print(f"Event: {event}")
主要优势
与 AutoGen 相比,代理框架的检查点提供:
- 自动持久性:无需手动状态管理
- 精细恢复:从任何超级步骤边界恢复
- 状态隔离:单独的执行程序本地和共享状态
- 人机循环集成:使用人工输入无缝暂停恢复
- 容错:从故障或中断中可靠恢复
实例
有关全面的检查点示例,请参阅:
- 具有“恢复”的检查点 - 基本检查点和交互式恢复
- 具有人机循环的检查点 - 具有人工审批入口的持久工作流
- 子工作流检查点 - 检查点嵌套工作流
- 放大检查点 - 检查点协调多代理工作流
Observability
AutoGen 和 Agent Framework 都提供可观测性功能,但具有不同的方法和功能。
AutoGen 可观测性
AutoGen 对 OpenTelemetry 提供本机支持,并针对:
- 
              运行时跟踪: SingleThreadedAgentRuntime和GrpcWorkerAgentRuntime
- 
              工具执行: BaseTool具有以下execute_toolGenAI 语义约定
- 
              代理作: BaseChatAgent使用create_agent和invoke_agent跨度
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Agent Framework 可观测性
代理框架通过多种方法提供全面的可观测性:
- 零代码设置:通过环境变量自动检测
- 手动配置:使用自定义参数以编程方式设置
- 丰富的遥测:代理、工作流和工具执行跟踪
- 控制台输出:内置控制台日志记录和可视化效果
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced
主要差异:
- 设置复杂性:代理框架提供更简单的零代码设置选项
- 范围:代理框架提供更广泛的覆盖范围,包括工作流级别的可观测性
- 可视化效果:代理框架包括内置的控制台输出和开发 UI
- 配置:代理框架提供更灵活的配置选项
有关详细的可观测性示例,请参阅:
结论
本迁移指南提供 AutoGen 和 Microsoft Agent Framework 之间的全面映射,涵盖从基本代理创建到复杂的多代理工作流的所有内容。 迁移的关键要点:
- 单代理迁移 非常简单,代理框架中具有类似的 API 和增强功能
- 多代理模式 需要重新考虑从事件驱动的体系结构到基于数据流的体系结构的方法,但如果已经熟悉 GraphFlow,转换会更容易
- 代理框架提供 其他功能,如中间件、托管工具和类型化工作流
有关其他示例和详细的实现指南,请参阅 Agent Framework 示例 目录。
其他示例类别
代理框架提供了多个其他重要领域的示例: