Microsoft Agent Framework 工作流核心概念 - 边缘

本文档深入探讨 Microsoft Agent Framework 工作流系统的 Edges 组件。

概述

边缘定义具有可选条件的执行程序之间的消息流动方式。 它们表示工作流图中的连接并确定数据流路径。

边缘类型

框架支持多种边缘模式:

  1. 直接边缘:执行程序之间的简单一对一连接
  2. 条件边缘:具有确定消息何时应流动的条件的边缘
  3. 扇出边缘:一个执行程序将消息发送到多个目标
  4. 扇入边缘:多个执行程序将消息发送到单个目标

Direct Edges

两个执行程序之间最简单的连接形式:

using Microsoft.Agents.Workflows;

WorkflowBuilder builder = new(sourceExecutor);
builder.AddEdge(sourceExecutor, targetExecutor);
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(source_executor, target_executor)
builder.set_start_executor(source_executor)
workflow = builder.build()

条件边缘

仅当满足特定条件时才激活的边缘:

// Route based on message content
builder.AddEdge(
    source: spamDetector,
    target: emailProcessor,
    condition: result => result is SpamResult spam && !spam.IsSpam
);

builder.AddEdge(
    source: spamDetector,
    target: spamHandler,
    condition: result => result is SpamResult spam && spam.IsSpam
);
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(spam_detector, email_processor, condition=lambda result: isinstance(result, SpamResult) and not result.is_spam)
builder.add_edge(spam_detector, spam_handler, condition=lambda result: isinstance(result, SpamResult) and result.is_spam)
builder.set_start_executor(spam_detector)
workflow = builder.build()

切换结构的边缘

根据条件将消息路由到不同的执行程序:

builder.AddSwitch(routerExecutor, switchBuilder =>
    switchBuilder
        .AddCase(
            message => message.Priority < Priority.Normal,
            executorA
        )
        .AddCase(
            message => message.Priority < Priority.High,
            executorB
        )
        .SetDefault(executorC)
);
from agent_framework import (
    Case,
    Default,
    WorkflowBuilder,
)

builder = WorkflowBuilder()
builder.set_start_executor(router_executor)
builder.add_switch_case_edge_group(
    router_executor,
    [
        Case(
            condition=lambda message: message.priority < Priority.NORMAL,
            target=executor_a,
        ),
        Case(
            condition=lambda message: message.priority < Priority.HIGH,
            target=executor_b,
        ),
        Default(target=executor_c)
    ],
)
workflow = builder.build()

扇出边缘

将消息从一个执行程序分发到多个目标:

// Send to all targets
builder.AddFanOutEdge(splitterExecutor, targets: [worker1, worker2, worker3]);

// Send to specific targets based on partitioner function
builder.AddFanOutEdge(
    source: routerExecutor,
    partitioner: (message, targetCount) => message.Priority switch
    {
        Priority.High => [0], // Route to first worker only
        Priority.Normal => [1, 2], // Route to workers 2 and 3
        _ => Enumerable.Range(0, targetCount) // Route to all workers
    },
    targets: [highPriorityWorker, normalWorker1, normalWorker2]
);
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(splitter_executor, [worker1, worker2, worker3])
workflow = builder.build()

# Send to specific targets based on partitioner function
builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(
    splitter_executor,
    [worker1, worker2, worker3],
    selection_func=lambda message, target_ids: (
        [0] if message.priority == Priority.HIGH else
        [1, 2] if message.priority == Priority.NORMAL else
        list(range(target_count))
    )
)
workflow = builder.build()

汇聚边缘

将来自多个源的消息收集到单个目标中:

// Aggregate results from multiple workers
builder.AddFanInEdge(aggregatorExecutor, sources: [worker1, worker2, worker3]);
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)

下一步