本页概述了 Microsoft Agent Framework 工作流系统中的 检查点 。
概述
检查点允许你在执行过程中在特定点保存工作流的状态,并在以后从这些点恢复。 此功能对于以下方案特别有用:
- 长时间运行的工作流,你希望在发生故障时避免丢失进度。
- 在长时间运行的工作流中,您希望可以在以后暂停和恢复执行。
- 出于审核或合规性目的,需要定期保存状态的工作流。
- 需要跨不同环境或实例迁移的工作流。
何时创建检查点?
请记住,工作流在 超级步骤中执行,如 核心概念中所述。 检查点在每个超级步骤结束时创建,之后该超级步骤中的所有执行程序都已完成执行。 检查点捕获工作流的整个状态,包括:
- 所有执行器的当前状态
- 下一个超级步骤工作流中的所有挂起消息
- 待处理的请求和响应
- 共享状态
捕获检查点
若要启用检查点功能,创建工作流运行时需要提供一个 CheckpointManager 标签。 然后,可以通过一个SuperStepCompletedEvent访问检查点。
using Microsoft.Agents.Workflows;
// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();
// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint and store it
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint != null)
{
checkpoints.Add(checkpoint);
}
}
}
若要启用检查点,需要在创建工作流时提供CheckpointStorage。 然后,可以通过存储访问检查点。
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()
# Run the workflow
async for event in workflow.run_streaming(input):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()
从检查点恢复
可以直接在同一次运行中从特定检查点恢复工作流。
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowCompletedEvent workflowCompletedEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowCompletedEvt.Data}");
}
}
可以直接在同一工作流实例上从特定检查点恢复工作流。
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream_from_checkpoint(saved_checkpoint.checkpoint_id):
...
从检查点恢复
或者,可以将工作流从检查点解除冻结到新的运行实例。
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowCompletedEvent workflowCompletedEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowCompletedEvt.Data}");
}
}
或者,可以从检查点解除冻结新的工作流实例。
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream_from_checkpoint(
saved_checkpoint.checkpoint_id,
checkpoint_storage,
):
...
保存执行器状态
为了确保执行器的状态在检查点中被捕获,执行器必须重写OnCheckpointingAsync方法并将其状态保存到工作流上下文。
using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;
internal sealed class CustomExecutor() : ReflectingExecutor<CustomExecutor>("CustomExecutor"), IMessageHandler<string>
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
此外,为了确保从检查点恢复时正确还原状态,执行程序必须重写 OnCheckpointRestoredAsync 该方法并从工作流上下文加载其状态。
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
后续步骤
- 了解如何在工作流中使用代理 生成智能工作流。
- 了解如何将工作流用作代理。
- 了解如何处理工作流中的请求和响应 。
- 了解如何管理 工作流中的状态。