本文档概述了 Microsoft Agent Framework 工作流系统中 的共享状态 。
概述
共享状态允许工作流中的多个执行程序访问和修改常见数据。 此功能在工作流的不同部分需要共享信息时至关重要,尤其是在直接消息传递不可行或效率不高的情况下。
写入共享状态
using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;
internal sealed class FileReadExecutor() : ReflectingExecutor<FileReadExecutor>("FileReadExecutor"), IMessageHandler<string, string>
{
/// <summary>
/// Reads a file and stores its content in a shared state.
/// </summary>
/// <param name="message">The path to the embedded resource file.</param>
/// <param name="context">The workflow context for accessing shared states.</param>
/// <returns>The ID of the shared state where the file content is stored.</returns>
public async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
{
// Read file content from embedded resource
string fileContent = File.ReadAllText(message);
// Store file content in a shared state for access by other executors
string fileID = Guid.NewGuid().ToString();
await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");
return fileID;
}
}
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class FileReadExecutor(Executor):
@handler
async def handle(self, file_path: str, ctx: WorkflowContext[str]):
# Read file content from embedded resource
with open(file_path, 'r') as file:
file_content = file.read()
# Store file content in a shared state for access by other executors
file_id = str(uuid.uuid4())
await ctx.set_shared_state(file_id, file_content)
await ctx.send_message(file_id)
访问共享状态
using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;
internal sealed class WordCountingExecutor() : ReflectingExecutor<WordCountingExecutor>("WordCountingExecutor"), IMessageHandler<string, int>
{
/// <summary>
/// Counts the number of words in the file content stored in a shared state.
/// </summary>
/// <param name="message">The ID of the shared state containing the file content.</param>
/// <param name="context">The workflow context for accessing shared states.</param>
/// <returns>The number of words in the file content.</returns>
public async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
{
// Retrieve the file content from the shared state
var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
?? throw new InvalidOperationException("File content state not found");
return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
}
}
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class WordCountingExecutor(Executor):
@handler
async def handle(self, file_id: str, ctx: WorkflowContext[int]):
# Retrieve the file content from the shared state
file_content = await ctx.get_shared_state(file_id)
if file_content is None:
raise ValueError("File content state not found")
await ctx.send_message(len(file_content.split()))
后续步骤
- 了解如何在工作流中使用代理 生成智能工作流。
- 了解如何将工作流用作代理。
- 了解如何处理工作流中的请求和响应 。
- 了解如何创建检查点并从中恢复。