Microsoft Agent Framework 工作流核心概念 - 事件

本文档深入探讨 Microsoft Agent Framework 中的工作流 事件 系统。

概述

有内置事件提供对工作流执行过程的观察能力。

内置事件类型

// Workflow lifecycle events
WorkflowStartedEvent    // Workflow execution begins
WorkflowCompletedEvent  // Workflow reaches completion
WorkflowErrorEvent      // Workflow encounters an error

// Executor events
ExecutorInvokeEvent     // Executor starts processing
ExecutorCompleteEvent   // Executor finishes processing
ExecutorFailureEvent    // Executor encounters an error

// Superstep events
SuperStepStartedEvent   // Superstep begins
SuperStepCompletedEvent // Superstep completes

// Request events
RequestInfoEvent        // A request is issued
# Workflow lifecycle events
WorkflowStartedEvent    # Workflow execution begins
WorkflowOutputEvent     # Workflow produces an output
WorkflowErrorEvent      # Workflow encounters an error

# Executor events
ExecutorInvokeEvent     # Executor starts processing
ExecutorCompleteEvent   # Executor finishes processing

# Request events
RequestInfoEvent        # A request is issued

使用事件

using Microsoft.Agents.Workflows;

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case ExecutorInvokeEvent invoke:
            Console.WriteLine($"Starting {invoke.ExecutorId}");
            break;

        case ExecutorCompleteEvent complete:
            Console.WriteLine($"Completed {complete.ExecutorId}: {complete.Data}");
            break;

        case WorkflowCompletedEvent finished:
            Console.WriteLine($"Workflow finished: {finished.Data}");
            return;

        case WorkflowErrorEvent error:
            Console.WriteLine($"Workflow error: {error.Exception}");
            return;
    }
}
from agent_framework import (
    ExecutorCompleteEvent,
    ExecutorInvokeEvent,
    WorkflowOutputEvent,
    WorkflowErrorEvent,
)

async for event in workflow.run_stream(input_message):
    match event:
        case ExecutorInvokeEvent() as invoke:
            print(f"Starting {invoke.executor_id}")
        case ExecutorCompleteEvent() as complete:
            print(f"Completed {complete.executor_id}: {complete.data}")
        case WorkflowOutputEvent() as output:
            print(f"Workflow produced output: {output.data}")
            return
        case WorkflowErrorEvent() as error:
            print(f"Workflow error: {error.exception}")
            return

自定义事件

用户可以在工作流执行期间定义和发出自定义事件,以提高可观测性。

using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;

internal sealed class CustomEvent(string message) : WorkflowEvent(message) { }

internal sealed class CustomExecutor() : ReflectingExecutor<CustomExecutor>("CustomExecutor"), IMessageHandler<string>
{
    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        await context.AddEventAsync(new CustomEvent($"Processing message: {message}"));
        // Executor logic...
    }
}
from agent_framework import (
    handler,
    Executor,
    WorkflowContext,
    WorkflowEvent,
)

class CustomEvent(WorkflowEvent):
    def __init__(self, message: str):
        super().__init__(message)

class CustomExecutor(Executor):

    @handler
    async def handle(self, message: str, ctx: WorkflowContext[str]) -> None:
        await ctx.add_event(CustomEvent(f"Processing message: {message}"))
        # Executor logic...

后续步骤