本文档深入探讨 Microsoft Agent Framework 中的工作流 事件 系统。
概述
有内置事件提供对工作流执行过程的观察能力。
内置事件类型
// Workflow lifecycle events
WorkflowStartedEvent // Workflow execution begins
WorkflowCompletedEvent // Workflow reaches completion
WorkflowErrorEvent // Workflow encounters an error
// Executor events
ExecutorInvokeEvent // Executor starts processing
ExecutorCompleteEvent // Executor finishes processing
ExecutorFailureEvent // Executor encounters an error
// Superstep events
SuperStepStartedEvent // Superstep begins
SuperStepCompletedEvent // Superstep completes
// Request events
RequestInfoEvent // A request is issued
# Workflow lifecycle events
WorkflowStartedEvent # Workflow execution begins
WorkflowOutputEvent # Workflow produces an output
WorkflowErrorEvent # Workflow encounters an error
# Executor events
ExecutorInvokeEvent # Executor starts processing
ExecutorCompleteEvent # Executor finishes processing
# Request events
RequestInfoEvent # A request is issued
使用事件
using Microsoft.Agents.Workflows;
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokeEvent invoke:
Console.WriteLine($"Starting {invoke.ExecutorId}");
break;
case ExecutorCompleteEvent complete:
Console.WriteLine($"Completed {complete.ExecutorId}: {complete.Data}");
break;
case WorkflowCompletedEvent finished:
Console.WriteLine($"Workflow finished: {finished.Data}");
return;
case WorkflowErrorEvent error:
Console.WriteLine($"Workflow error: {error.Exception}");
return;
}
}
from agent_framework import (
ExecutorCompleteEvent,
ExecutorInvokeEvent,
WorkflowOutputEvent,
WorkflowErrorEvent,
)
async for event in workflow.run_stream(input_message):
match event:
case ExecutorInvokeEvent() as invoke:
print(f"Starting {invoke.executor_id}")
case ExecutorCompleteEvent() as complete:
print(f"Completed {complete.executor_id}: {complete.data}")
case WorkflowOutputEvent() as output:
print(f"Workflow produced output: {output.data}")
return
case WorkflowErrorEvent() as error:
print(f"Workflow error: {error.exception}")
return
自定义事件
用户可以在工作流执行期间定义和发出自定义事件,以提高可观测性。
using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;
internal sealed class CustomEvent(string message) : WorkflowEvent(message) { }
internal sealed class CustomExecutor() : ReflectingExecutor<CustomExecutor>("CustomExecutor"), IMessageHandler<string>
{
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
await context.AddEventAsync(new CustomEvent($"Processing message: {message}"));
// Executor logic...
}
}
from agent_framework import (
handler,
Executor,
WorkflowContext,
WorkflowEvent,
)
class CustomEvent(WorkflowEvent):
def __init__(self, message: str):
super().__init__(message)
class CustomExecutor(Executor):
@handler
async def handle(self, message: str, ctx: WorkflowContext[str]) -> None:
await ctx.add_event(CustomEvent(f"Processing message: {message}"))
# Executor logic...
后续步骤
- 了解如何在工作流中使用代理 生成智能工作流。
- 了解如何将工作流用作代理。
- 了解如何处理工作流中的请求和响应 。
- 了解如何管理 工作流中的状态。
- 了解如何创建检查点并从中恢复。