将聊天历史记录存储在第三方存储中

本教程演示如何通过实现自定义 ChatMessageStore 并将其与 ChatClientAgent 一起使用,将代理聊天记录存储在外部存储中。

默认情况下,使用 ChatClientAgent时,聊天历史记录存储在对象或基础推理服务(如果服务支持)中的 AgentThread 内存中。

如果服务不需要将聊天历史记录存储在服务中,则可以提供用于保存聊天历史记录的自定义存储,而不是依赖于默认内存中行为。

先决条件

有关先决条件,请参阅本教程中的 “创建并运行简单代理 ”步骤。

安装 NuGet 包

若要将 Microsoft Agent Framework 与 Azure OpenAI 配合使用,需要安装以下 NuGet 包:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

此外,你将使用内存中矢量存储来存储聊天消息。

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

创建自定义 ChatMessage 应用商店

若要创建自定义 ChatMessageStore,需要实现抽象 ChatMessageStore 类并为所需的方法提供实现。

消息存储和检索方法

实现的最重要方法是:

  • AddMessagesAsync - 调用以向存储区添加新消息。
  • GetMessagesAsync - 调用以从存储区检索消息。

GetMessagesAsync 应按时间顺序按升序返回消息。 它返回的所有消息将在调用ChatClientAgent的基础IChatClient时使用。 因此,此方法必须考虑基础模型的限制,并且只返回模型可以处理的消息数。

在从GetMessagesAsync返回消息之前,应完成任何聊天记录减少逻辑处理,例如摘要或修整。

Serialization

在创建线程以及从序列化状态恢复线程时,ChatMessageStore 实例会被创建并附加到 AgentThread

虽然组成聊天历史记录的实际消息存储在外部, ChatMessageStore 但实例可能需要存储密钥或其他状态来标识外部存储中的聊天历史记录。

若要允许持久化线程,需要实现 SerializeStateAsync 类的 ChatMessageStore 方法。 还需要提供采用 JsonElement 参数的构造函数,该构造函数可用于在恢复线程时反序列化状态。

示例 ChatMessageStore 实现

以下示例实现将聊天消息存储在向量存储中。

AddMessagesAsync 对每个消息使用唯一键将消息插入向量存储中。

GetMessagesAsync 从向量存储中检索当前线程的消息,按时间戳对它们进行排序,并按升序返回它们。

收到第一条消息时,存储会生成线程的唯一键,然后用于标识矢量存储中的聊天历史记录以供后续调用。

唯一键存储在ThreadDbKey属性中,该属性通过SerializeStateAsync方法和接受JsonElement的构造函数进行序列化和反序列化。 因此,此密钥将保留为状态的 AgentThread 一部分,允许稍后恢复线程并继续使用相同的聊天历史记录。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatMessageStore : ChatMessageStore
{
    private readonly VectorStore _vectorStore;

    public VectorChatMessageStore(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.ThreadDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? ThreadDbKey { get; private set; }

    public override async Task AddMessagesAsync(
        IEnumerable<ChatMessage> messages,
        CancellationToken cancellationToken)
    {
        this.ThreadDbKey ??= Guid.NewGuid().ToString("N");
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        await collection.UpsertAsync(messages.Select(x => new ChatHistoryItem()
        {
            Key = this.ThreadDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            ThreadId = this.ThreadDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override async Task<IEnumerable<ChatMessage>> GetMessagesAsync(
        CancellationToken cancellationToken)
    {
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.ThreadId == this.ThreadDbKey, 10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        messages.Reverse();
        return messages;
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the thread id, so that on deserialization you can retrieve the messages using the same thread id.
        JsonSerializer.SerializeToElement(this.ThreadDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? ThreadId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

将自定义 ChatMessageStore 与 ChatClientAgent 配合使用

若要使用自定义 ChatMessageStore,需要在创建代理时提供一个 ChatMessageStoreFactory 。 此工厂允许代理为每个线程创建所需的 ChatMessageStore 新实例。

当创建 ChatClientAgent 时,可以提供一个 ChatClientAgentOptions 对象,该对象不仅允许提供 ChatMessageStoreFactory,还可以提供其他所有代理选项。

using Azure.AI.OpenAI;
using Azure.Identity;
using OpenAI;

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .CreateAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         Instructions = "You are good at telling jokes.",
         ChatMessageStoreFactory = ctx =>
         {
             // Create a new chat message store for this agent that stores the messages in a vector store.
             return new VectorChatMessageStore(
                new InMemoryVectorStore(),
                ctx.SerializedState,
                ctx.JsonSerializerOptions);
         }
     });

本教程演示如何通过实现自定义 ChatMessageStore 并将其与 ChatAgent 一起使用,将代理聊天记录存储在外部存储中。

默认情况下,使用 ChatAgent时,聊天历史记录存储在对象或基础推理服务(如果服务支持)中的 AgentThread 内存中。

如果服务不需要或无法将聊天历史记录存储在服务中,则可以提供用于保存聊天历史记录的自定义存储,而不是依赖于默认内存中行为。

先决条件

有关先决条件,请参阅本教程中的 “创建并运行简单代理 ”步骤。

创建自定义 ChatMessage 应用商店

若要创建自定义 ChatMessageStore,需要实现 ChatMessageStore 协议并为所需的方法提供实现。

消息存储和检索方法

实现的最重要方法是:

  • add_messages - 调用以向存储区添加新消息。
  • list_messages - 调用以从存储区检索消息。

list_messages 应按时间顺序按升序返回消息。 调用基础聊天客户端时,所有它返回的消息将被 ChatAgent 使用。 因此,此方法必须考虑基础模型的限制,并且只返回模型可以处理的消息数。

在从list_messages返回消息之前,应完成任何聊天记录减少逻辑处理,例如摘要或修整。

Serialization

在创建线程以及从序列化状态恢复线程时,ChatMessageStore 实例会被创建并附加到 AgentThread

虽然组成聊天历史记录的实际消息存储在外部, ChatMessageStore 但实例可能需要存储密钥或其他状态来标识外部存储中的聊天历史记录。

若要允许持久化线程,需要实现serialize_state协议的deserialize_stateChatMessageStore方法。 这些方法允许在恢复线程时持久保存和还原存储的状态。

示例 ChatMessageStore 实现

以下示例实现使用 Redis 列表数据结构将聊天消息存储在 Redis 中。

在其中 add_messages,它使用 RPUSH 将消息存储在 Redis 中,以按时间顺序将它们追加到列表的末尾。

list_messages 使用 LRANGE 从 Redis 检索当前线程的消息,并按升序顺序返回它们。

当接收到第一条消息时,店会生成该线程的唯一键,然后用于在 Redis 中标识聊天记录,以便后续调用。

唯一键和其他配置可以存储,并可使用serialize_statedeserialize_state方法进行序列化和反序列化。 因此,此状态将保留为状态的 AgentThread 一部分,从而允许以后恢复线程并继续使用相同的聊天历史记录。

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

将自定义 ChatMessageStore 与 ChatAgent 配合使用

若要使用自定义 ChatMessageStore,需要在创建代理时提供一个 chat_message_store_factory 。 此工厂允许代理为每个线程创建所需的 ChatMessageStore 新实例。

创建 a ChatAgent时,除了所有其他代理选项外,还可以提供 chat_message_store_factory 参数。

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

后续步骤