Microsoft代理框架工作流 - 请求和响应

本页概述了 请求和响应 处理在 Microsoft Agent Framework 工作流系统中的工作原理。

概述

工作流中的执行程序可以将请求发送到工作流外部并等待响应。 这对于执行体需要与外部系统交互(例如人工参与交互或任何其他异步操作)的情况非常有用。

在工作流中启用请求和响应处理

请求和响应通过称为 InputPort的特殊类型进行处理。

// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");

将输入端口添加到工作流。

var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
    .AddEdge(inputPort, executorA)
    .AddEdge(executorA, inputPort)
    .Build<CustomRequestType>();

现在,由于在工作流中,我们的executorAinputPort是双向连接的,因此executorA需要能够通过inputPort发送请求和接收响应。 下面是我们需要在SomeExecutor执行的步骤,以发送请求并接收响应。

internal sealed class SomeExecutor() : ReflectingExecutor<SomeExecutor>("SomeExecutor"), IMessageHandler<CustomResponseType>
{
    public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

或者, SomeExecutor 可以将请求发送和响应处理分为两个处理程序。

internal sealed class SomeExecutor() : ReflectingExecutor<SomeExecutor>("SomeExecutor"), IMessageHandler<CustomResponseType>, IMessageHandler<OtherDataType>
{
    public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
    }

    public async ValueTask HandleAsync(OtherDataType message, IWorkflowContext context)
    {
        // Process the message...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

请求和响应通过名为 RequestInfoExecutor 的特殊内置执行程序进行处理。

from agent_framework import RequestInfoExecutor

# Create a RequestInfoExecutor with an ID
request_info_executor = RequestInfoExecutor(id="request-info-executor")

RequestInfoExecutor 添加到工作流中。

from agent_framework import WorkflowBuilder

executor_a = SomeExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(request_info_executor)
workflow_builder.add_edge(request_info_executor, executor_a)
workflow = workflow_builder.build()

现在,因为在工作流中,executor_a 已连接到 request_info_executor,并且是双向连接,所以 executor_a 需要能够通过 request_info_executor 发送请求和接收响应。 这是我们需要在 SomeExecutor 上做的,以便发送请求并接收响应。

from agent_framework import (
    Executor,
    RequestResponse,
    WorkflowContext,
    handler,
)

class SomeExecutor(Executor):

    @handler
    async def handle(
        self,
        request: RequestResponse[CustomRequestType, CustomResponseType],
        context: WorkflowContext[CustomResponseType],
    ):
        # Process the response...
        ...
        # Send a request
        await context.send_message(CustomRequestType(...))

或者, SomeExecutor 可以将请求发送和响应处理分为两个处理程序。

class SomeExecutor(Executor):

    @handler
    async def handle_response(
        self,
        response: CustomResponseType[CustomRequestType, CustomResponseType],
        context: WorkflowContext,
    ):
        # Process the response...
        ...

    @handler
    async def handle_other_data(
        self,
        data: OtherDataType,
        context: WorkflowContext[CustomRequestType],
    ):
        # Process the message...
        ...
        # Send a request
        await context.send_message(CustomRequestType(...))

处理请求和响应

在收到请求时,InputPort 会发出 RequestInfoEvent。 可以订阅这些事件来处理来自工作流的传入请求。 从外部系统收到响应时,请使用响应机制将其发送回工作流。 框架会自动将响应路由到发送原始请求的执行程序。

StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
            await handle.SendResponseAsync(response).ConfigureAwait(false);
            break;

        case WorkflowCompletedEvent workflowCompleteEvt:
            // The workflow has completed successfully
            Console.WriteLine($"Workflow completed with result: {workflowCompleteEvt.Data}");
            return;
    }
}

RequestInfoExecutor收到请求时,发出RequestInfoEvent。 可以订阅这些事件来处理来自工作流的传入请求。 从外部系统收到响应时,请使用响应机制将其发送回工作流。 框架会自动将响应路由到发送原始请求的执行程序。

from agent_framework import RequestInfoEvent

while True:
    request_info_events : list[RequestInfoEvent] = []
    pending_responses : dict[str, CustomResponseType] = {}

    stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)

    async for event in stream:
        if isinstance(event, RequestInfoEvent):
            # Handle `RequestInfoEvent` from the workflow
            request_info_events.append(event)

    if not request_info_events:
        break

    for request_info_event in request_info_events:
        # Handle `RequestInfoEvent` from the workflow
        response = CustomResponseType(...)
        pending_responses[request_info_event.request_id] = response

检查点和请求

若要了解有关检查点的详细信息,请参阅 此页面

创建检查点时,挂起的请求也会作为检查点状态的一部分被保存。 从检查点恢复时,所有挂起的请求将被重新发送,这样工作流就可以从中断的位置继续处理。

后续步骤