代理中间件

代理框架中的中间件提供了一种在执行的各个阶段截获、修改和增强代理交互的强大方法。 可以使用中间件实现交叉问题,例如日志记录、安全验证、错误处理和结果转换,而无需修改核心代理或函数逻辑。

可以使用三种不同类型的中间件自定义代理框架:

  1. 代理运行中间件:允许截获所有代理运行,以便根据需要检查和/或修改输入和输出。
  2. 函数调用中间件:允许截获代理执行的所有函数调用,以便根据需要检查和修改输入和输出。
  3. IChatClient 中间件:允许截获对 IChatClient 实现的调用,其中代理用于 IChatClient 推理调用,例如使用 ChatClientAgent时。

所有类型的中间件都通过函数回调实现,当注册同一类型的多个中间件实例时,它们会形成一个链,其中每个中间件实例应通过提供的 nextFunc链调用下一个中间件实例。

代理运行和函数调用中间件类型可以在代理上注册,方法是将代理生成器与现有代理对象一起使用。

var middlewareEnabledAgent = originalAgent
    .AsBuilder()
        .Use(CustomAgentRunMiddleware)
        .Use(CustomFunctionCallingMiddleware)
    .Build();

IChatClient中间件可以在与聊天客户端生成器模式一起使用ChatClientAgent之前先在中间IChatClient件上注册。

var chatClient = new AzureOpenAIClient(new Uri("https://<myresource>.openai.azure.com"), new AzureCliCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

var middlewareEnabledChatClient = chatClient
    .AsBuilder()
        .Use(getResponseFunc: CustomChatClientMiddleware, getStreamingResponseFunc: null)
    .Build();

var agent = new ChatClientAgent(middlewareEnabledChatClient, instructions: "You are a helpful assistant.");

IChatClient 当通过 SDK 客户端上的某个帮助程序方法构造代理时,还可以使用工厂方法注册中间件。

var agent = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
    .GetChatClient(deploymentName)
    .CreateAIAgent("You are a helpful assistant.", clientFactory: (chatClient) => chatClient
        .AsBuilder()
            .Use(getResponseFunc: CustomChatClientMiddleware, getStreamingResponseFunc: null)
        .Build());

代理运行中间件

下面是代理运行中间件的示例,可以检查和/或修改代理运行的输入和输出。

async Task<AgentRunResponse> CustomAgentRunMiddleware(
    IEnumerable<ChatMessage> messages,
    AgentThread? thread,
    AgentRunOptions? options,
    AIAgent innerAgent,
    CancellationToken cancellationToken)
{
    Console.WriteLine(messages.Count());
    var response = await innerAgent.RunAsync(messages, thread, options, cancellationToken).ConfigureAwait(false);
    Console.WriteLine(response.Messages.Count);
    return response;
}

函数调用中间件

注释

函数调用中间件当前仅支持 AIAgent 使用该 Microsoft.Extensions.AI.FunctionInvokingChatClient中间件,例如 ChatClientAgent

下面是函数调用中间件的示例,可以检查和/或修改所调用的函数,以及函数调用的结果。

async ValueTask<object?> CustomFunctionCallingMiddleware(
    AIAgent agent,
    FunctionInvocationContext context,
    Func<FunctionInvocationContext, CancellationToken, ValueTask<object?>> next,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Function Name: {context!.Function.Name}");
    var result = await next(context, cancellationToken);
    Console.WriteLine($"Function Call Result: {result}");

    return result;
}

通过将提供的 FunctionInvocationContext.Terminate 设置为 true,可以使用函数调用中间件终止函数调用循环。 这将阻止函数调用循环向推理服务发出请求,其中包含函数调用后函数调用结果。 如果在此迭代期间有多个可用于调用的函数,则它还可能会阻止执行任何剩余的函数。

警告

终止函数调用循环可能会导致线程处于不一致状态,例如包含没有函数结果内容的函数调用内容。 这可能会导致线程无法进一步运行。

IChatClient 中间件

下面是聊天客户端中间件的示例,可以检查和/或修改聊天客户端提供的推理服务请求的输入和输出。

async Task<ChatResponse> CustomChatClientMiddleware(
    IEnumerable<ChatMessage> messages,
    ChatOptions? options,
    IChatClient innerChatClient,
    CancellationToken cancellationToken)
{
    Console.WriteLine(messages.Count());
    var response = await innerChatClient.GetResponseAsync(messages, options, cancellationToken);
    Console.WriteLine(response.Messages.Count);

    return response;
}

注释

有关中间件的详细信息 IChatClient ,请参阅 Microsoft.Extensions.AI 文档中的 自定义 IChatClient 中间件

Function-Based 中间件

基于函数的中间件是使用异步函数实现中间件的最简单方法。 此方法非常适合无状态作,并为常见中间件方案提供轻型解决方案。

代理中间件

代理中间件截获和修改代理运行执行。 它使用 AgentRunContext 包含:

  • agent:正在调用的代理
  • messages:聊天中的聊天消息列表
  • is_streaming:指示响应是否正在流式传输的布尔值
  • metadata:用于在中间件之间存储其他数据的字典
  • result:代理的响应(可以修改)
  • terminate:用于停止进一步处理的标志
  • kwargs:传递给代理运行方法的其他关键字参数

next可调用者继续中间件链,或者执行代理(如果它是最后一个中间件)。

下面是一个简单的日志记录示例,其中包含可调用之前和之后 next 的逻辑:

async def logging_agent_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
    """Agent middleware that logs execution timing."""
    # Pre-processing: Log before agent execution
    print("[Agent] Starting execution")

    # Continue to next middleware or agent execution
    await next(context)

    # Post-processing: Log after agent execution
    print("[Agent] Execution completed")

函数中间件

函数中间件截获代理中的函数调用。 它使用 FunctionInvocationContext 包含:

  • function:要调用的函数
  • arguments:函数的已验证参数
  • metadata:用于在中间件之间存储其他数据的字典
  • result:函数的返回值(可以修改)
  • terminate:用于停止进一步处理的标志
  • kwargs:传递给调用此函数的聊天方法的其他关键字参数

next可调用方继续执行下一个中间件或执行实际函数。

下面是一个简单的日志记录示例,其中包含可调用之前和之后 next 的逻辑:

async def logging_function_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]],
) -> None:
    """Function middleware that logs function execution."""
    # Pre-processing: Log before function execution
    print(f"[Function] Calling {context.function.name}")

    # Continue to next middleware or function execution
    await next(context)

    # Post-processing: Log after function execution
    print(f"[Function] {context.function.name} completed")

聊天中间件

聊天中间件截获发送到 AI 模型的聊天请求。 它使用 ChatContext 包含:

  • chat_client:正在调用的聊天客户端
  • messages:发送到 AI 服务的消息列表
  • chat_options:聊天请求的选项
  • is_streaming:指示这是否为流式调用的布尔值
  • metadata:用于在中间件之间存储其他数据的字典
  • result:来自 AI 的聊天响应(可以修改)
  • terminate:用于停止进一步处理的标志
  • kwargs:传递给聊天客户端的其他关键字参数

next可调用者会继续下一个中间件或将请求发送到 AI 服务。

下面是一个简单的日志记录示例,其中包含可调用之前和之后 next 的逻辑:

async def logging_chat_middleware(
    context: ChatContext,
    next: Callable[[ChatContext], Awaitable[None]],
) -> None:
    """Chat middleware that logs AI interactions."""
    # Pre-processing: Log before AI call
    print(f"[Chat] Sending {len(context.messages)} messages to AI")

    # Continue to next middleware or AI service
    await next(context)

    # Post-processing: Log after AI response
    print("[Chat] AI response received")

函数中间件修饰器

修饰器提供显式中间件类型声明,而无需类型注释。 当以下情况下,它们非常有用:

  • 不使用类型批注
  • 需要显式中间件类型声明
  • 你想要防止类型不匹配
from agent_framework import agent_middleware, function_middleware, chat_middleware

@agent_middleware  # Explicitly marks as agent middleware
async def simple_agent_middleware(context, next):
    """Agent middleware with decorator - types are inferred."""
    print("Before agent execution")
    await next(context)
    print("After agent execution")

@function_middleware  # Explicitly marks as function middleware
async def simple_function_middleware(context, next):
    """Function middleware with decorator - types are inferred."""
    print(f"Calling function: {context.function.name}")
    await next(context)
    print("Function call completed")

@chat_middleware  # Explicitly marks as chat middleware
async def simple_chat_middleware(context, next):
    """Chat middleware with decorator - types are inferred."""
    print(f"Processing {len(context.messages)} chat messages")
    await next(context)
    print("Chat processing completed")

Class-Based 中间件

基于类的中间件对于受益于面向对象的设计模式的有状态作或复杂逻辑非常有用。

代理中间件类

基于类的代理中间件使用与 process 基于函数的中间件具有相同签名和行为的方法。 该方法 process 接收相同的 context 参数和 next 参数,以完全相同的方式调用。

from agent_framework import AgentMiddleware, AgentRunContext

class LoggingAgentMiddleware(AgentMiddleware):
    """Agent middleware that logs execution."""

    async def process(
        self,
        context: AgentRunContext,
        next: Callable[[AgentRunContext], Awaitable[None]],
    ) -> None:
        # Pre-processing: Log before agent execution
        print("[Agent Class] Starting execution")

        # Continue to next middleware or agent execution
        await next(context)

        # Post-processing: Log after agent execution
        print("[Agent Class] Execution completed")

函数中间件类

基于类的函数中间件还使用与 process 基于函数的中间件相同的签名和行为的方法。 该方法接收相同的 context 参数和 next 参数。

from agent_framework import FunctionMiddleware, FunctionInvocationContext

class LoggingFunctionMiddleware(FunctionMiddleware):
    """Function middleware that logs function execution."""

    async def process(
        self,
        context: FunctionInvocationContext,
        next: Callable[[FunctionInvocationContext], Awaitable[None]],
    ) -> None:
        # Pre-processing: Log before function execution
        print(f"[Function Class] Calling {context.function.name}")

        # Continue to next middleware or function execution
        await next(context)

        # Post-processing: Log after function execution
        print(f"[Function Class] {context.function.name} completed")

聊天中间件类

基于类的聊天中间件采用与基于函数的聊天中间件相同的签名和行为的方法遵循相同的模式 process

from agent_framework import ChatMiddleware, ChatContext

class LoggingChatMiddleware(ChatMiddleware):
    """Chat middleware that logs AI interactions."""

    async def process(
        self,
        context: ChatContext,
        next: Callable[[ChatContext], Awaitable[None]],
    ) -> None:
        # Pre-processing: Log before AI call
        print(f"[Chat Class] Sending {len(context.messages)} messages to AI")

        # Continue to next middleware or AI service
        await next(context)

        # Post-processing: Log after AI response
        print("[Chat Class] AI response received")

中间件注册

中间件可以在两个级别注册,其范围和行为不同。

Agent-Level 与 Run-Level 中间件

from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential

# Agent-level middleware: Applied to ALL runs of the agent
async with AzureAIAgentClient(async_credential=credential).create_agent(
    name="WeatherAgent",
    instructions="You are a helpful weather assistant.",
    tools=get_weather,
    middleware=[
        SecurityAgentMiddleware(),  # Applies to all runs
        TimingFunctionMiddleware(),  # Applies to all runs
    ],
) as agent:

    # This run uses agent-level middleware only
    result1 = await agent.run("What's the weather in Seattle?")

    # This run uses agent-level + run-level middleware
    result2 = await agent.run(
        "What's the weather in Portland?",
        middleware=[  # Run-level middleware (this run only)
            logging_chat_middleware,
        ]
    )

    # This run uses agent-level middleware only (no run-level)
    result3 = await agent.run("What's the weather in Vancouver?")

主要差异:

  • 代理级别:在创建代理时在所有运行中持久配置一次
  • 运行级别:仅适用于特定运行,允许按请求自定义
  • 执行顺序:代理中间件(最外层)→运行中间件(最内部)→代理执行

中间件终止

中间件可以使用 < a0/> 提前 终止执行。 这对于安全检查、速率限制或验证失败非常有用。

async def blocking_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
    """Middleware that blocks execution based on conditions."""
    # Check for blocked content
    last_message = context.messages[-1] if context.messages else None
    if last_message and last_message.text:
        if "blocked" in last_message.text.lower():
            print("Request blocked by middleware")
            context.terminate = True
            return

    # If no issues, continue normally
    await next(context)

终止的含义:

  • 设置 context.terminate = True 处理应停止的信号
  • 可以在终止之前提供自定义结果,为用户提供反馈
  • 中间件终止时完全跳过代理执行

中间件结果重写

中间件可以替代非流式处理和流式处理方案的结果,从而允许修改或完全替换代理响应。

结果类型 context.result 取决于代理调用是流式处理还是非流式处理:

  • 非流式处理context.result 包含 AgentRunResponse 完整响应
  • 流式处理context.result 包含生成 AgentRunResponseUpdate 区块的异步生成器

可用于 context.is_streaming 区分这些方案并适当处理结果替代。

async def weather_override_middleware(
    context: AgentRunContext, 
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    """Middleware that overrides weather results for both streaming and non-streaming."""

    # Execute the original agent logic
    await next(context)

    # Override results if present
    if context.result is not None:
        custom_message_parts = [
            "Weather Override: ",
            "Perfect weather everywhere today! ",
            "22°C with gentle breezes. ",
            "Great day for outdoor activities!"
        ]

        if context.is_streaming:
            # Streaming override
            async def override_stream() -> AsyncIterable[AgentRunResponseUpdate]:
                for chunk in custom_message_parts:
                    yield AgentRunResponseUpdate(contents=[TextContent(text=chunk)])

            context.result = override_stream()
        else:
            # Non-streaming override
            custom_message = "".join(custom_message_parts)
            context.result = AgentRunResponse(
                messages=[ChatMessage(role=Role.ASSISTANT, text=custom_message)]
            )

此中间件方法允许实现复杂的响应转换、内容筛选、结果增强和流式处理自定义,同时使代理逻辑保持干净且专注。

后续步骤