本文档深入探讨 Microsoft Agent Framework 工作流系统的 Edges 组件。
概述
边缘定义具有可选条件的执行程序之间的消息流动方式。 它们表示工作流图中的连接并确定数据流路径。
边缘类型
框架支持多种边缘模式:
- 直接边缘:执行程序之间的简单一对一连接
- 条件边缘:具有确定消息何时应流动的条件的边缘
- 扇出边缘:一个执行程序将消息发送到多个目标
- 扇入边缘:多个执行程序将消息发送到单个目标
Direct Edges
两个执行程序之间最简单的连接形式:
using Microsoft.Agents.Workflows;
WorkflowBuilder builder = new(sourceExecutor);
builder.AddEdge(sourceExecutor, targetExecutor);
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.add_edge(source_executor, target_executor)
builder.set_start_executor(source_executor)
workflow = builder.build()
条件边缘
仅当满足特定条件时才激活的边缘:
// Route based on message content
builder.AddEdge(
source: spamDetector,
target: emailProcessor,
condition: result => result is SpamResult spam && !spam.IsSpam
);
builder.AddEdge(
source: spamDetector,
target: spamHandler,
condition: result => result is SpamResult spam && spam.IsSpam
);
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.add_edge(spam_detector, email_processor, condition=lambda result: isinstance(result, SpamResult) and not result.is_spam)
builder.add_edge(spam_detector, spam_handler, condition=lambda result: isinstance(result, SpamResult) and result.is_spam)
builder.set_start_executor(spam_detector)
workflow = builder.build()
切换结构的边缘
根据条件将消息路由到不同的执行程序:
builder.AddSwitch(routerExecutor, switchBuilder =>
switchBuilder
.AddCase(
message => message.Priority < Priority.Normal,
executorA
)
.AddCase(
message => message.Priority < Priority.High,
executorB
)
.SetDefault(executorC)
);
from agent_framework import (
Case,
Default,
WorkflowBuilder,
)
builder = WorkflowBuilder()
builder.set_start_executor(router_executor)
builder.add_switch_case_edge_group(
router_executor,
[
Case(
condition=lambda message: message.priority < Priority.NORMAL,
target=executor_a,
),
Case(
condition=lambda message: message.priority < Priority.HIGH,
target=executor_b,
),
Default(target=executor_c)
],
)
workflow = builder.build()
扇出边缘
将消息从一个执行程序分发到多个目标:
// Send to all targets
builder.AddFanOutEdge(splitterExecutor, targets: [worker1, worker2, worker3]);
// Send to specific targets based on partitioner function
builder.AddFanOutEdge(
source: routerExecutor,
partitioner: (message, targetCount) => message.Priority switch
{
Priority.High => [0], // Route to first worker only
Priority.Normal => [1, 2], // Route to workers 2 and 3
_ => Enumerable.Range(0, targetCount) // Route to all workers
},
targets: [highPriorityWorker, normalWorker1, normalWorker2]
);
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(splitter_executor, [worker1, worker2, worker3])
workflow = builder.build()
# Send to specific targets based on partitioner function
builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(
splitter_executor,
[worker1, worker2, worker3],
selection_func=lambda message, target_ids: (
[0] if message.priority == Priority.HIGH else
[1, 2] if message.priority == Priority.NORMAL else
list(range(target_count))
)
)
workflow = builder.build()
汇聚边缘
将来自多个源的消息收集到单个目标中:
// Aggregate results from multiple workers
builder.AddFanInEdge(aggregatorExecutor, sources: [worker1, worker2, worker3]);
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)
下一步
- 了解工作流 ,了解如何生成和执行工作流。