Microsoft Agent 框架工作流 - 与代理协作

本页概述了如何在 Microsoft Agent Framework 工作流中使用 代理

概述

若要将智能添加到工作流,可以将 AI 代理用作工作流执行的一部分。 AI 代理可以轻松集成到工作流中,使你能够创建以前难以实现的复杂智能解决方案。

将代理直接添加到工作流

可以通过边缘将代理添加到工作流:

using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);

// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();

运行工作流

在上面创建的工作流中,代理实际上包装在处理代理与工作流其他部分通信的执行程序内。 执行程序可以处理三种消息类型:

  • ChatMessage:单个聊天消息
  • List<ChatMessage>:聊天消息列表
  • TurnToken:指示新轮次开始的轮次标记

执行程序只有在收到TurnToken后才会触发代理的响应。 收到 TurnToken 之前接收的任何消息都会被缓冲,并在收到 TurnToken 时发送给代理。

StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    // The agents will run in streaming mode and an AgentRunUpdateEvent
    // will be emitted as new chunks are generated.
    if (evt is AgentRunUpdateEvent agentRunUpdate)
    {
        Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
    }
}

使用内置代理执行器

可以通过边缘将代理添加到工作流:

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

运行工作流

在上面创建的工作流中,代理实际上包装在处理代理与工作流其他部分通信的执行程序内。 执行程序可以处理三种消息类型:

  • str:字符串格式的单个聊天消息
  • ChatMessage:单个聊天消息
  • List<ChatMessage>:聊天消息列表

每当执行程序收到其中一种类型的消息时,都会触发代理进行响应,响应类型将是一个 AgentExecutorResponse 对象。 此编程类包含有关代理程序响应的有用信息,包括:

  • executor_id:生成此响应的执行程序的 ID
  • agent_run_response:代理的完整响应
  • full_conversation:到目前为止的完整对话历史记录

运行工作流时,可以发出两种与代理响应相关的事件类型:

  • AgentRunUpdateEvent 包含代理响应的区块,因为它们是在流式处理模式下生成的。
  • AgentRunEvent 包含非流式处理模式下代理的完整响应。

默认情况下,代理包装在以流式处理模式运行的执行器中。 可以通过创建自定义执行程序来自定义此行为。 有关更多详细信息,请参阅下一部分。

last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
    if isinstance(event, AgentRunUpdateEvent):
        if event.executor_id != last_executor_id:
            if last_executor_id is not None:
                print()
            print(f"{event.executor_id}:", end=" ", flush=True)
            last_executor_id = event.executor_id
        print(event.data, end="", flush=True)

使用自定义代理执行程序

有时,你可能想要自定义 AI 代理如何集成到工作流中。 可以通过创建自定义执行程序来实现此目的。 这允许你控制:

  • 代理的调用方式:流式处理或非流式处理
  • 代理将处理的消息类型,包括自定义消息类型
  • 代理的生命周期,包括初始化和清理
  • 代理线程和其他资源的使用情况
  • 在代理执行期间发出的额外事件,包括自定义事件
  • 与其他工作流功能(例如共享状态和请求/响应)集成
internal sealed class CustomAgentExecutor : ReflectingExecutor<CustomAgentExecutor>, IMessageHandler<CustomInput, CustomOutput>
{
    private readonly AIAgent _agent;

    /// <summary>
    /// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
    /// </summary>
    /// <param name="agent">The AI agent used for custom processing</param>
    public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
    {
        this._agent = agent;
    }

    public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
    {
        // Retrieve any shared states if needed
        var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");

        // Render the input for the agent
        var agentInput = RenderInput(message, sharedState);

        // Invoke the agent
        // Assume the agent is configured with structured outputs with type `CustomOutput`
        var response = await this._agent.RunAsync(agentInput);
        var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);

        return customOutput;
    }
}
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        # Create a domain specific agent using your configured AzureChatClient.
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and edit contents based on the feedback."
            ),
        )
        # Associate the agent with this executor node. The base Executor stores it on self.agent.
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
        # Invoke the agent with the incoming message and get the response
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        # Accumulate messages and send them to the next executor in the workflow.
        messages.extend(response.messages)
        await ctx.send_message(messages)

后续步骤