使用 Postgres 矢量存储库连接器(预览版)

警告

Postgres 向量存储功能处于预览状态,需要重大更改的改进可能在发布前的有限情况下发生。

警告

语义内核向量存储功能处于预览状态,在发布前的某些有限情况下,可能仍会进行需要重大更改的改进。

警告

语义内核向量存储功能处于预览状态,在发布前的某些有限情况下,可能仍会进行需要重大更改的改进。

概述

Postgres 矢量存储连接器可用于访问和管理 Postgres 中的数据,并支持 Neon 无服务器 Postgres

连接器具有以下特征。

功能区域 支持
集合映射到 Postgres 表
支持的键属性类型
  • 整数
  • 字符串
  • Guid
支持的数据属性类型
  • 布尔
  • 整数
  • float
  • 两倍
  • 十进制
  • 字符串
  • 日期时间
  • 日期时间偏移 (DateTimeOffset)
  • Guid
  • byte[]
  • bool 枚举集合
  • 简短枚举ables
  • int 枚举类型
  • 长可枚举对象
  • 浮点数枚举
  • 双 Enumerables
  • 十进制可枚举的
  • 字符串可枚举集合
  • DateTime 可枚举对象
  • DateTimeOffset 枚举集合
  • 指导枚举器
支持的向量属性类型
  • 只读内存(ReadOnlyMemory)<浮点数(float)>
  • <嵌入浮点>
  • float[]
  • 只读内存<Half>
  • 嵌入<一半>
  • 一半[]
  • BitArray
  • Pgvector.SparseVector
支持的索引类型 Hnsw
支持的距离函数
  • CosineDistance
  • 余弦相似度
  • 点积相似度 (DotProductSimilarity)
  • EuclideanDistance
  • 曼哈顿距离
支持的过滤条件
  • AnyTagEqualTo
  • EqualTo
支持记录中的多个向量 是的
是否支持 IsIndexed?
是否支持FullTextIndexed?
StorageName支持吗? 是的
支持 HybridSearch?

局限性

重要

手动初始化 NpgsqlDataSource 时,必须在 UseVector 上调用 NpgsqlDataSourceBuilder。 这可实现向量支持。 如果没有此情况,VectorStore 实现的使用将失败。

下面是如何调用 UseVector的示例。

NpgsqlDataSourceBuilder dataSourceBuilder = new("Host=localhost;Port=5432;Username=postgres;Password=example;Database=postgres;");
dataSourceBuilder.UseVector();
NpgsqlDataSource dataSource = dataSourceBuilder.Build();

使用 AddPostgresVectorStore 依赖项注入注册方法并结合连接字符串时,数据源将通过此方法构造,并自动应用 UseVector

入门指南

将 Postgres Vector Store 连接器 NuGet 包添加到项目。

dotnet add package Microsoft.SemanticKernel.Connectors.PgVector --prerelease

可以使用语义内核提供的扩展方法将矢量存储添加到 IServiceCollection 依赖项注入容器。

using Microsoft.Extensions.DependencyInjection;

var kernelBuilder = Kernel.CreateBuilder();
kernelBuilder.Services.AddPostgresVectorStore("<Connection String>");

其中 <Connection String> Postgres 实例的连接字符串采用 Npgsql 所需的格式,例如 Host=localhost;Port=5432;Database=postgres;Username=postgres;Password=postgres

还提供不带参数的扩展方法。 这些要求 将 NpgsqlDataSource 的实例单独注册到依赖项注入容器。 请注意, UseVector 必须在生成器上调用,才能通过 pgvector-dotnet 启用矢量支持:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.SemanticKernel;

// Using IServiceCollection with ASP.NET Core.
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPostgresVectorStore("<Connection String>");
using Microsoft.Extensions.DependencyInjection;
using Microsoft.SemanticKernel;
using Npgsql;

// Using IServiceCollection with ASP.NET Core.
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<NpgsqlDataSource>(sp => 
{
    NpgsqlDataSourceBuilder dataSourceBuilder = new("<Connection String>");
    dataSourceBuilder.UseVector();
    return dataSourceBuilder.Build();
});
builder.Services.AddPostgresVectorStore();

可以使用自定义数据源或连接字符串直接构造 Postgres Vector Store 实例。

using Microsoft.SemanticKernel.Connectors.PgVector;
using Npgsql;

NpgsqlDataSourceBuilder dataSourceBuilder = new("<Connection String>");
dataSourceBuilder.UseVector();
NpgsqlDataSource dataSource = dataSourceBuilder.Build();
var vectorStore = new PostgresVectorStore(dataSource, ownsDataSource: true);
using Microsoft.SemanticKernel.Connectors.PgVector;

var connection = new PostgresVectorStore("<Connection String>");

可以使用自定义数据源或连接字符串构造对命名集合的直接引用。

using Microsoft.SemanticKernel.Connectors.PgVector;
using Npgsql;

NpgsqlDataSourceBuilder dataSourceBuilder = new("<Connection String>");
dataSourceBuilder.UseVector();
var dataSource = dataSourceBuilder.Build();

var collection = new PostgresCollection<string, Hotel>(dataSource, "skhotels", ownsDataSource: true);
using Microsoft.SemanticKernel.Connectors.PgVector;

var collection = new PostgresCollection<string, Hotel>("<Connection String>", "skhotels");

数据映射

Postgres 连接器在将数据从数据模型映射到存储时提供默认映射器。 默认映射器使用模型注释或记录定义来确定每个属性的类型,并将模型映射到可序列化为 Postgres 的字典。

  • 批注为键的数据模型属性将映射到 Postgres 表中的主键。
  • 作为数据批注的数据模型属性将映射到 Postgres 中的表列。
  • 作为向量批注的数据模型属性将映射到 Postgres 中具有 pgvector VECTOR 类型的表列。

属性名称重写

可以提供覆盖字段名称,以便在存储时使用与数据模型属性名称不同的名称。 这允许你匹配表列名,即使它们与数据模型的属性名称不匹配。

属性名称重写是通过数据模型属性或记录定义设置 StorageName 选项来完成的。

下面是一个在其属性上设置了 StorageName 的数据模型示例,以及它将在 Postgres 中作为一个表表示的方式,假设集合的名称为 Hotels

using System;
using Microsoft.Extensions.VectorData;

public class Hotel
{
    [VectorStoreKey(StorageName = "hotel_id")]
    public int HotelId { get; set; }

    [VectorStoreData(StorageName = "hotel_name")]
    public string HotelName { get; set; }

    [VectorStoreData(StorageName = "hotel_description")]
    public string Description { get; set; }

    [VectorStoreVector(Dimensions: 4, DistanceFunction = DistanceFunction.CosineDistance, IndexKind = IndexKind.Hnsw, StorageName = "hotel_description_embedding")]
    public ReadOnlyMemory<float>? DescriptionEmbedding { get; set; }
}
CREATE TABLE IF NOT EXISTS public."Hotels" (
    "hotel_id" INTEGER PRIMARY KEY NOT NULL,
    hotel_name TEXT,
    hotel_description TEXT,
    hotel_description_embedding VECTOR(4)
);

矢量索引

在以上Hotel模型中的hotel_description_embedding是带IndexKind.HNSW索引的向量属性。 创建集合时,将自动创建此索引。 HNSW 是创建索引时唯一支持的索引类型。 IVFFlat 索引生成要求在创建索引时表中已存在数据,因此它不适合创建空表。 可以自由地在连接器外部的表上创建和修改索引,该索引将在执行查询时由连接器使用。

与 Entra 身份验证一起使用

Azure Database for PostgreSQL 提供使用 Entra 身份验证连接到数据库的功能。 这样就无需在连接字符串中存储用户名和密码。 若要对 Azure DB for PostgreSQL 数据库使用 Entra 身份验证,可以使用以下 Npgsql 扩展方法并设置没有用户名或密码的连接字符串:

using System.Text;
using System.Text.Json;
using Azure.Core;
using Azure.Identity;
using Npgsql;

namespace Program;

public static class NpgsqlDataSourceBuilderExtensions
{
    private static readonly TokenRequestContext s_azureDBForPostgresTokenRequestContext = new(["https://ossrdbms-aad.database.windows.net/.default"]);

    public static NpgsqlDataSourceBuilder UseEntraAuthentication(this NpgsqlDataSourceBuilder dataSourceBuilder, TokenCredential? credential = default)
    {
        credential ??= new DefaultAzureCredential();

        if (dataSourceBuilder.ConnectionStringBuilder.Username == null)
        {
            var token = credential.GetToken(s_azureDBForPostgresTokenRequestContext, default);
            SetUsernameFromToken(dataSourceBuilder, token.Token);
        }

        SetPasswordProvider(dataSourceBuilder, credential, s_azureDBForPostgresTokenRequestContext);

        return dataSourceBuilder;
    }

    public static async Task<NpgsqlDataSourceBuilder> UseEntraAuthenticationAsync(this NpgsqlDataSourceBuilder dataSourceBuilder, TokenCredential? credential = default, CancellationToken cancellationToken = default)
    {
        credential ??= new DefaultAzureCredential();

        if (dataSourceBuilder.ConnectionStringBuilder.Username == null)
        {
            var token = await credential.GetTokenAsync(s_azureDBForPostgresTokenRequestContext, cancellationToken).ConfigureAwait(false);
            SetUsernameFromToken(dataSourceBuilder, token.Token);
        }

        SetPasswordProvider(dataSourceBuilder, credential, s_azureDBForPostgresTokenRequestContext);

        return dataSourceBuilder;
    }

    private static void SetPasswordProvider(NpgsqlDataSourceBuilder dataSourceBuilder, TokenCredential credential, TokenRequestContext tokenRequestContext)
    {
        dataSourceBuilder.UsePasswordProvider(_ =>
        {
            var token = credential.GetToken(tokenRequestContext, default);
            return token.Token;
        }, async (_, ct) =>
        {
            var token = await credential.GetTokenAsync(tokenRequestContext, ct).ConfigureAwait(false);
            return token.Token;
        });
    }

    private static void SetUsernameFromToken(NpgsqlDataSourceBuilder dataSourceBuilder, string token)
    {
        var username = TryGetUsernameFromToken(token);

        if (username != null)
        {
            dataSourceBuilder.ConnectionStringBuilder.Username = username;
        }
        else
        {
            throw new Exception("Could not determine username from token claims");
        }
    }

    private static string? TryGetUsernameFromToken(string jwtToken)
    {
        // Split the token into its parts (Header, Payload, Signature)
        var tokenParts = jwtToken.Split('.');
        if (tokenParts.Length != 3)
        {
            return null;
        }

        // The payload is the second part, Base64Url encoded
        var payload = tokenParts[1];

        // Add padding if necessary
        payload = AddBase64Padding(payload);

        // Decode the payload from Base64Url
        var decodedBytes = Convert.FromBase64String(payload);
        var decodedPayload = Encoding.UTF8.GetString(decodedBytes);

        // Parse the decoded payload as JSON
        var payloadJson = JsonSerializer.Deserialize<JsonElement>(decodedPayload);

        // Try to get the username from 'upn', 'preferred_username', or 'unique_name' claims
        if (payloadJson.TryGetProperty("upn", out var upn))
        {
            return upn.GetString();
        }
        else if (payloadJson.TryGetProperty("preferred_username", out var preferredUsername))
        {
            return preferredUsername.GetString();
        }
        else if (payloadJson.TryGetProperty("unique_name", out var uniqueName))
        {
            return uniqueName.GetString();
        }

        return null;
    }

    private static string AddBase64Padding(string base64) => (base64.Length % 4) switch
    {
        2 => base64 + "==",
        3 => base64 + "=",
        _ => base64,
    };
}

现在,可以使用该方法 UseEntraAuthentication 为 Postgres 连接器设置连接字符串:

using Microsoft.SemanticKernel.Connectors.Postgres;

var connectionString = "Host=mydb.postgres.database.azure.com;Port=5432;Database=postgres;SSL Mode=Require;";  // No Username or Password
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
dataSourceBuilder.UseEntraAuthentication();
dataSourceBuilder.UseVector();
var dataSource = dataSourceBuilder.Build();

var vectorStore = new PostgresVectorStore(dataSource, ownsDataSource: true);

默认情况下,该方法 UseEntraAuthentication 使用 DefaultAzureCredential 通过 Azure AD 进行身份验证。 如果需要,还可以提供自定义 TokenCredential 实现。

入门指南

使用 Postgres 附加组件安装语义内核,其中包括 Postgres 客户端

pip install semantic-kernel[postgres]

然后,可以使用该类创建矢量存储实例 PostgresStore 。 可以直接传递 psycopg_poolAsyncConnectionPool ,也可以使用 PostgresSettings 环境变量创建连接池。

from semantic_kernel.connectors.postgres import PostgresStore, PostgresSettings

settings = PostgresSettings()

pool = await settings.create_connection_pool()
async with pool:
    vector_store = PostgresStore(connection_pool=pool)
    ...

您可以在环境中设置 POSTGRES_CONNECTION_STRING,或者根据 libpq 的定义设置环境变量 PGHOSTPGPORTPGUSERPGPASSWORDPGDATABASE,以及可选的 PGSSLMODE。 类将使用 PostgresSettings 这些值来创建连接池。

也可以直接创建集合。 集合本身是上下文管理器,因此可以在 with 块中使用它。 如果未传入连接池,集合对象将使用 PostgresSettings 类来创建一个连接池。

from semantic_kernel.connectors.postgres import PostgresCollection

collection = PostgresCollection(collection_name="skhotels", record_type=Hotel)
async with collection:  # This will create a connection pool using PostgresSettings
    ...

数据映射

Postgres 连接器在将数据从数据模型映射到存储时提供默认映射器。 默认映射器使用模型注释或记录定义来确定每个属性的类型,并将模型转换为一个可以被序列化为 Postgres 行的 dict

  • 批注为键的数据模型属性将映射到 Postgres 表中的主键。
  • 数据模型中标记为数据的属性将映射到PostgreSQL的表列中。
  • 被标注为向量的数据模型属性将映射到 Postgres 中具有 pgvector VECTOR 类型的表列。
from typing import Annotated
from pydantic import BaseModel

from semantic_kernel.connectors.postgres import PostgresCollection
from semantic_kernel.data.vector import (
    DistanceFunction,
    IndexKind,
    VectorStoreField,
    vectorstoremodel,
)

@vectorstoremodel
class Hotel(BaseModel):
    hotel_id: Annotated[int, VectorStoreField("key")]
    hotel_name: Annotated[str, VectorStoreField("data")]
    hotel_description: Annotated[str, VectorStoreField("data")]
    hotel_description_embedding: Annotated[
        list[float] | None,
        VectorStoreField(
            "vector",
            dimensions=4,
            index_kind=IndexKind.HNSW,
            distance_function=DistanceFunction.COSINE_SIMILARITY,
        ),
    ] = None

collection = PostgresCollection(collection_name="Hotels", record_type=Hotel)

async with collection:
    await collection.ensure_collection_exists()
CREATE TABLE IF NOT EXISTS public."Hotels" (
    "hotel_id" INTEGER NOT NULL,
    "hotel_name" TEXT,
    "hotel_description" TEXT,
    "hotel_description_embedding" VECTOR(4)
    PRIMARY KEY ("hotel_id")
);

矢量索引

在上面的Hotel模型中,hotel_description_embedding是具有IndexKind.HNSW索引的向量属性。 创建集合时,将自动创建此索引。 HNSW 是创建索引时唯一支持的索引类型。 IVFFlat 索引生成要求在创建索引时表中已存在数据,因此它不适合创建空表。 可以自由地在连接器外部的表上创建和修改索引,该索引将在执行查询时由连接器使用。

与 Entra 身份验证一起使用

Azure Database for PostgreSQL 提供使用 Entra 身份验证连接到数据库的功能。 这样就无需在连接字符串中存储用户名和密码。 若要对 Azure DB for PostgreSQL 数据库使用 Entra 身份验证,可以使用以下自定义 AsyncConnection 类:

import base64
import json
import logging
from functools import lru_cache

from azure.core.credentials import TokenCredential
from azure.core.credentials_async import AsyncTokenCredential
from azure.identity import DefaultAzureCredential
from psycopg import AsyncConnection

AZURE_DB_FOR_POSTGRES_SCOPE = "https://ossrdbms-aad.database.windows.net/.default"

logger = logging.getLogger(__name__)

async def get_entra_token_async(credential: AsyncTokenCredential) -> str:
    """Get the password from Entra using the provided credential."""
    logger.info("Acquiring Entra token for postgres password")

    async with credential:
        cred = await credential.get_token(AZURE_DB_FOR_POSTGRES_SCOPE)
        return cred.token

def get_entra_token(credential: TokenCredential | None) -> str:
    """Get the password from Entra using the provided credential."""
    logger.info("Acquiring Entra token for postgres password")
    credential = credential or get_default_azure_credentials()

    return credential.get_token(AZURE_DB_FOR_POSTGRES_SCOPE).token

@lru_cache(maxsize=1)
def get_default_azure_credentials() -> DefaultAzureCredential:
    """Get the default Azure credentials.

    This method caches the credentials to avoid creating new instances.
    """
    return DefaultAzureCredential()

def decode_jwt(token):
    """Decode the JWT payload to extract claims."""
    payload = token.split(".")[1]
    padding = "=" * (4 - len(payload) % 4)
    decoded_payload = base64.urlsafe_b64decode(payload + padding)
    return json.loads(decoded_payload)

async def get_entra_conninfo(credential: TokenCredential | AsyncTokenCredential | None) -> dict[str, str]:
    """Fetches a token returns the username and token."""
    # Fetch a new token and extract the username
    if isinstance(credential, AsyncTokenCredential):
        token = await get_entra_token_async(credential)
    else:
        token = get_entra_token(credential)
    claims = decode_jwt(token)
    username = claims.get("upn") or claims.get("preferred_username") or claims.get("unique_name")
    if not username:
        raise ValueError("Could not extract username from token. Have you logged in?")

    return {"user": username, "password": token}

class AsyncEntraConnection(AsyncConnection):
    """Asynchronous connection class for using Entra auth with Azure DB for PostgreSQL."""

    @classmethod
    async def connect(cls, *args, **kwargs):
        """Establish an asynchronous connection using Entra auth with Azure DB for PostgreSQL."""
        credential = kwargs.pop("credential", None)
        if credential and not isinstance(credential, (TokenCredential, AsyncTokenCredential)):
            raise ValueError("credential must be a TokenCredential or AsyncTokenCredential")
        if not kwargs.get("user") or not kwargs.get("password"):
            credential = credential or get_default_azure_credentials()
            entra_conninfo = await get_entra_conninfo(credential)
            if kwargs.get("user"):
                entra_conninfo.pop("user", None)
            kwargs.update(entra_conninfo)
        return await super().connect(*args, **kwargs)

可以将自定义连接类与方法一起使用 PostgresSettings.get_connection_pool 来创建连接池。

from semantic_kernel.connectors.postgres import PostgresSettings, PostgresStore


pool = await PostgresSettings().create_connection_pool(connection_class=AsyncEntraConnection)
async with pool:
    vector_store = PostgresStore(connection_pool=pool)
    ...

默认情况下,该 AsyncEntraConnection 类使用 DefaultAzureCredential 通过 Azure AD 进行身份验证。 如果需要,您还可以在 kwargs 中提供另一个 TokenCredential

from azure.identity import ManagedIdentityCredential


pool = await PostgresSettings().create_connection_pool(
    connection_class=AsyncEntraConnection, credential=ManagedIdentityCredential()
)
async with pool:
    vector_store = PostgresStore(connection_pool=pool)
    ...

JDBC

JDBC 连接器可用于连接到 Postgres。