配置变更事件流传输

适用于: SQL Server 2025 (17.x) 预览版

本文介绍如何配置 SQL Server 2025(17.x)预览版中引入的更改事件流(CES)功能。

注释

更改事件流目前为 SQL Server 2025 预览 版,需要启用 预览功能数据库范围的配置。 在预览期间,此功能可能会更改。 有关当前可支持性,请参阅 “限制”。

概述

若要配置和使用更改事件流式处理,请遵循以下步骤序列:

  1. 使用现有或创建新的 Azure 事件中心 命名空间和事件中心实例。 事件中心实例接收事件。
  2. 为用户数据库启用更改事件流式处理。
  3. 创建事件流组。 使用此组,配置目标、凭据、消息大小限制和分区架构。
  4. 将一个或多个表添加到事件流组。

本文的以下部分详细介绍了每个步骤。

先决条件

若要配置更改事件流式处理,需要满足以下条件:

配置 Azure 事件中心

若要了解如何创建 Azure 事件中心,请查看 使用 Azure 门户创建事件中心

若要使用 AMQP 协议(默认的本机 Azure 事件中心协议)配置流式传输到 Azure 事件中心,请为 Azure 事件中心命名空间和实例名称生成 SAS 令牌。 可以使用任何编程或脚本语言以编程方式执行此作。 本文中的示例演示如何使用 PowerShell 脚本从新的或现有策略生成 SAS 令牌。

安装所需的模块

若要使用 PowerShell 脚本管理 Azure 事件中心资源,需要具有以下模块:

  • Az PowerShell 模块
  • Az.EventHub PowerShell 模块

以下脚本安装所需的模块:

Install-Module -Name Az -AllowClobber -Scope CurrentUser -Repository PSGallery -Force
Install-Module -Name Az.EventHub -Scope CurrentUser -Force

如果已有所需的模块,并且想要将其更新到最新版本,请运行以下脚本:

Update-Module -Name Az -Force
Update-Module -Name Az.EventHub -Force

连接到 Azure

可以使用 Azure Cloud Shell 或登录并设置订阅上下文。

若要使用 Azure Cloud Shell 运行,请查看 “登录到 Azure”。

定义策略

若要创建 SAS 令牌,需要策略。 您可以选择:

  • 使用特定权限创建新策略。

  • 使用具有正确权限的现有策略。

为新策略或现有策略创建 SAS 令牌

注释

为了提高安全性,强烈建议尽可能通过 SAS 令牌身份验证替代基于密钥的身份验证。 SAS 令牌的最佳做法包括:定义适当的访问范围、设置过期日期并定期轮换 SAS 密钥。 对于基于密钥的身份验证,请确保定期轮换密钥。 使用 Azure Key Vault 或类似服务安全地存储所有机密。

创建新策略时,请确保它具有 “发送 ”权限。 如果使用现有策略,请验证它是否具有 “发送 ”权限。

以下脚本将创建新的策略,或获取现有策略,然后以 HTTP 授权标头格式从该策略生成完整的 SAS 令牌。

将尖括号 (<value>) 中的值替换为环境的值。

function Generate-SasToken {
$subscriptionId = "<Azure-Subscription-ID>"
$resourceGroupName = "<Resource-group-name>"
$namespaceName = "<Azure-Event-Hub-Namespace-name>"
$eventHubName = "<Azure-Event-Hubs-instance-name>"
$policyName = "<Policy-name>"

# Modifying the rest of the script is not necessary.

# Login to Azure and set Azure Subscription.
Connect-AzAccount

# Get current context and check subscription
$currentContext = Get-AzContext
if ($currentContext.Subscription.Id -ne $subscriptionId) {
    Write-Host "Current subscription is $($currentContext.Subscription.Id), switching to $subscriptionId..."
    Set-AzContext -SubscriptionId $subscriptionId | Out-Null
} else {
    Write-Host "Already using subscription $subscriptionId."
}

# Try to get the authorization policy (it should have Send rights)
$rights = @("Send")
$policy = Get-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -ErrorAction SilentlyContinue

# If the policy does not exist, create it
if (-not $policy) {
    Write-Output "Policy '$policyName' does not exist. Creating it now..."

    # Create a new policy with the Manage, Send and Listen rights
    $policy = New-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -Rights $rights
    if (-not $policy) {
        throw "Error. Policy was not created."
    }
    Write-Output "Policy '$policyName' created successfully."
} else {
    Write-Output "Policy '$policyName' already exists."
}

if ("Send" -in $policy.Rights) {
    Write-Host "Authorization rule has required right: Send."
} else {
    throw "Authorization rule is missing Send right."
}

$keys = Get-AzEventHubKey -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName

if (-not $keys) {
    throw "Could not obtain Azure Event Hub Key. Script failed and will end now."
}
if (-not $keys.PrimaryKey) {
    throw "Could not obtain Primary Key. Script failed and will end now."
}

# Get the Primary Key of the Shared Access Policy
$primaryKey = ($keys.PrimaryKey) 
Write-Host $primaryKey

## Check that the primary key is not empty.

# Define a function to create a SAS token (similar to the C# code provided)
function Create-SasToken {
    param (
        [string]$resourceUri, [string]$keyName, [string]$key
    )

$sinceEpoch = [datetime]::UtcNow - [datetime]"1970-01-01"
    $expiry = [int]$sinceEpoch.TotalSeconds + (60 * 60 * 24 * 31 * 6)  # 6 months
    $stringToSign = [System.Web.HttpUtility]::UrlEncode($resourceUri) + "`n" + $expiry
    $hmac = New-Object System.Security.Cryptography.HMACSHA256
    $hmac.Key = [Text.Encoding]::UTF8.GetBytes($key)
    $signature = [Convert]::ToBase64String($hmac.ComputeHash([Text.Encoding]::UTF8.GetBytes($stringToSign)))
    $sasToken = "SharedAccessSignature sr=$([System.Web.HttpUtility]::UrlEncode($resourceUri))&sig=$([System.Web.HttpUtility]::UrlEncode($signature))&se=$expiry&skn=$keyName"
    return $sasToken
}

# Construct the resource URI for the SAS token
$resourceUri = "https://$namespaceName.servicebus.windows.net/$eventHubName"

# Generate the SAS token using the primary key from the new policy
$sasToken = Create-SasToken -resourceUri $resourceUri -keyName $policyName -key $primaryKey

# Output the SAS token
Write-Output @"
-- Generated SAS Token --
$sasToken
-- End of generated SAS Token --
"@
}

Generate-SasToken

启用和配置变更事件流式处理

若要启用和配置更改事件流式处理,请将数据库上下文更改为用户数据库,然后执行以下步骤:

  1. 如果尚未配置,请将数据库设置为 完整恢复模式
  2. 创建主密钥和数据库范围的凭据。
  3. 启用事件流式处理。
  4. 创建事件流组。
  5. 将一个或多个表添加到事件流组。

本节中的示例演示如何为 AMQP 协议和 Apache Kafka 协议启用 CES。

下面是本部分中示例的示例参数值:

  • @stream_group_name = N'myStreamGroup'
  • @destination_location = N'myEventHubsNamespace.servicebus.windows.net/myEventHubsInstance'
  • @partition_key_scheme = N'None'
  • 主密钥或辅助密钥值: Secret = 'BVFnT3baC/K6I8xNZzio4AeoFt6nHeK0i+ZErNGsxiw='
  • EXEC sys.sp_add_object_to_event_stream_group N'myStreamGroup', N'dbo.myTable'

示例:通过 AMQP 协议流式传输到 Azure 事件中心(SAS 令牌身份验证)

将尖括号 (<value>) 中的值替换为环境的值。

USE <database name>

-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'

CREATE DATABASE SCOPED CREDENTIAL <CredentialName>
    WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
    SECRET = '<Generated SAS Token>'

EXEC sys.sp_enable_event_stream

EXEC sys.sp_create_event_stream_group
    @stream_group_name =      N'<EventStreamGroupName>',
    @destination_type =       N'AzureEventHubsAmqp',
    @destination_location =   N'<AzureEventHubsHostName>/<EventHubsInstance>',
    @destination_credential = <CredentialName>,
    @max_message_size_kb =    <MaxMessageSize>, 
    @partition_key_scheme =   N'<PartitionKeyScheme>'

EXEC sys.sp_add_object_to_event_stream_group
    N'<EventStreamGroupName>',
    N'<SchemaName>.<TableName>'

示例:通过 AMQP 协议流式传输到 Azure 事件中心(密钥值身份验证)

将尖括号 (<value>) 中的值替换为环境的值。

USE <database name>

-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'

CREATE DATABASE SCOPED CREDENTIAL <CredentialName>
    WITH IDENTITY = '<Azure Event Hubs SAS Policy name>',
    SECRET = '<Primary or Secondary key value>'

EXEC sys.sp_enable_event_stream

EXEC sys.sp_create_event_stream_group
    @stream_group_name =      N'<EventStreamGroupName>',
    @destination_type =       N'AzureEventHubsAmqp',
    @destination_location =   N'<AzureEventHubsHostName>/<EventHubsInstance>',
    @destination_credential = <CredentialName>,
    @max_message_size_kb =    <MaxMessageSize>,
    @partition_key_scheme =   N'<PatitionKeyScheme>'

EXEC sys.sp_add_object_to_event_stream_group
    N'<EventStreamGroupName>',
    N'<SchemaName>.<TableName>'

示例:通过 Apache Kafka 协议流式传输到 Azure 事件中心(连接字符串身份验证)

将尖括号 (<value>) 中的值替换为环境的值。

USE <database name>

-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'

CREATE DATABASE SCOPED CREDENTIAL credential1
    WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
    SECRET = '<Event Hubs Namespace – Primary or Secondary connection string>'

EXEC sys.sp_enable_event_stream

EXEC sys.sp_create_event_stream_group
    @stream_group_name =      N'<EventStreamGroupName>',
    @destination_type =       N'AzureEventHubsApacheKafka',
    @destination_location =   N'<AzureEventHubsHostName>:<port>/<EventHubsInstance>',
    @destination_credential = <CredentialName>,
    @max_message_size_kb =    <MaxMessageSize>,
    @partition_key_scheme =   N'<PatitionKeyScheme>'

EXEC sys.sp_add_object_to_event_stream_group
    N'<EventStreamGroupName>',
    N'<SchemaName>.<TableName>'

示例:通过 Apache Kafka 协议流式传输到 Azure 事件中心(密钥值身份验证)

将尖括号 (<value>) 中的值替换为环境的值。

USE <database name>

-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'

CREATE DATABASE SCOPED CREDENTIAL credential1
    WITH IDENTITY = '<Azure Event Hubs SAS Policy name>',
    SECRET = '<Primary or Secondary key value>' -- BVFnT3baC/K6I8xNZzio4AeoFt6nHeK0i+ZErNGsxiw=

EXEC sys.sp_enable_event_stream

EXEC sys.sp_create_event_stream_group
    @stream_group_name =      N'<EventStreamGroupName>',  -- myStreamGroup
    @destination_type =       N'AzureEventHubsApacheKafka',
    @destination_location =   N'<AzureEventHubsHostName>:<port>/<EventHubsInstance>', -- myEventHubsNamespace.servicebus.windows.net:9093/myEventHubsInstance
    @destination_credential = <CredentialName>,
    @max_message_size_kb =    <MaxMessageSize>,       -- 1024
    @partition_key_scheme =   N'<PatitionKeyScheme>'  -- N'None'

EXEC sys.sp_add_object_to_event_stream_group
    N'<EventStreamGroupName>',
    N'<SchemaName>.<TableName>' -- dbo.myTable

查看 CES 配置和功能

sys.databases 中, is_event_stream_enabled = 1 指示为数据库启用了更改事件流式处理。

以下查询返回启用了更改事件流式处理的所有数据库:

SELECT * FROM sys.databases WHERE is_event_stream_enabled = 1

sys.tables 中, is_replicated = 1 指示表已流式传输, sp_help_change_feed_table 提供有关更改事件流式处理表组和表元数据的信息。

以下查询返回启用了更改事件流式处理的所有表,并提供元数据信息:

SELECT name, is_replicated FROM sys.tables

EXEC sp_help_change_feed_table @source_schema = '<schema name>', @source_name = '<table name>'

注释

目前,为 SQL Server 配置了变更数据捕获(CDC)、事务复制或 Fabric 镜像数据库的数据库不支持 CES。

CES 存储过程、系统函数和 DMV

下表列出了用于配置、禁用和监视更改事件流的存储过程、系统函数和 DMV:

System 对象 DESCRIPTION
配置 CES
sys.sp_enable_event_stream 为当前用户数据库启用 CES。
sys.sp_create_event_stream_group 创建流组,该流组是一组表的流配置。 流组还定义了目标和相关详细信息(例如身份验证、消息大小、分区)。 该过程完成后,会自动为最终用户生成并显示stream_group_id。
sys.sp_add_object_to_event_stream_group 将表添加到流组。
禁用 CES
sys.sp_remove_object_from_event_stream_group 从流组中删除表。
sys.sp_drop_event_stream_group 删除流组。 流组必须未被使用。
sys.sp_disable_event_stream 禁用当前用户数据集的 CES。
显示器 CES
sys.dm_change_feed_errors 返回交付错误。
sys.dm_change_feed_log_scan_sessions 返回有关日志扫描活动的信息。
sys.sp_help_change_feed_settings 提供配置的更改事件流式处理的状态和信息。
sys.sp_help_change_feed 监视变更流的当前配置。
sys.sp_help_change_feed_table_groups 返回用于配置更改事件流式处理组的元数据。
sys.sp_help_change_feed_table 提供用于更改事件流的流式处理组和表元数据的状态和信息。

局限性

变更事件流(CES)具有以下限制:

服务器级和常规限制

  • Linux 上的 SQL Server 2025 或 SQL Server 2025 Express 版本不支持 CES。
  • CES 仅针对来自INSERTUPDATEDELETE DML 语句的数据更改发出事件。
  • CES 不会处理模式更改(DDL操作),这意味着它不会对DDL操作发出事件。 但是,不会阻止 DDL 操作,因此,如果执行的话,后续 DML 事件的架构将反映更新的表结构。 用户应正常处理具有更新架构的事件。
  • 当 JSON 是指定的输出格式时,大型事件消息可能会拆分为大约 25% 每个流组配置的最大消息大小。 此限制不适用于二进制输出类型。
  • 如果消息超出 Azure 事件中心消息大小限制,则目前只能通过扩展事件观察到失败。
  • 已配置为 CES 的表无法进行表和列重命名。 允许数据库重命名。

数据库级限制

表级限制

  • 表只能属于一个流式处理组。 不支持将同一个表流式传输到多个目标。
  • 只能为 CES 配置用户表。 不支持系统表。
  • 最多可以配置 4,096 个流组。 每个流组最多可包含 40,000 个表。
  • 在表上启用 CES 时,主键约束不能添加到该表或从该表中删除。
  • ALTER TABLE SWITCH PARTITION 不支持在针对 CES 进行配置的表中使用。
  • TRUNCATE TABLE 在为 CES 启用的表上不受支持。
  • CES 不支持使用以下任何功能的表:
    • 聚集列存储索引
    • 临时历史记录表或账本历史记录表
    • 始终加密 (Always Encrypted)
    • 内存内 OLTP(内存优化表)
    • 图形表
    • 外部表

列级限制

  • CES 不支持以下数据类型。 流式处理会跳过如下这些类型的列:
    • json
    • image
    • text / ntext
    • xml
    • rowversion / timestamp
    • sql_variant
    • 用户定义的类型 (UDT)
    • geometry
    • geography
    • vector

源数据库中的权限

  • 对于行级安全性,CES 会发送所有行的更改,而不考虑用户权限。
  • 动态数据掩码不适用于通过 CES 发送的数据。 数据以未掩码形式流式传输,即使已配置掩码。
  • CES 不会发出与对象级权限更改相关的事件(例如,向特定列授予权限)。