适用于: SQL Server 2025 (17.x) 预览版
本文介绍如何配置 SQL Server 2025(17.x)预览版中引入的更改事件流(CES)功能。
注释
更改事件流目前为 SQL Server 2025 预览 版,需要启用 预览功能数据库范围的配置。 在预览期间,此功能可能会更改。 有关当前可支持性,请参阅 “限制”。
概述
若要配置和使用更改事件流式处理,请遵循以下步骤序列:
- 使用现有或创建新的 Azure 事件中心 命名空间和事件中心实例。 事件中心实例接收事件。
- 为用户数据库启用更改事件流式处理。
- 创建事件流组。 使用此组,配置目标、凭据、消息大小限制和分区架构。
- 将一个或多个表添加到事件流组。
本文的以下部分详细介绍了每个步骤。
先决条件
若要配置更改事件流式处理,需要满足以下条件:
- Azure 事件中心命名空间
- Azure 事件中心实例
- Azure 事件中心主机名
- 具有 发送 访问级别的策略
- db_owner 角色中的登录名,或具有要为其启用 CES 的数据库的 CONTROL DATABASE 权限的登录名。
- 将 数据库范围的凭据预览配置选项 设置为“是”。
配置 Azure 事件中心
若要了解如何创建 Azure 事件中心,请查看 使用 Azure 门户创建事件中心。
若要使用 AMQP 协议(默认的本机 Azure 事件中心协议)配置流式传输到 Azure 事件中心,请为 Azure 事件中心命名空间和实例名称生成 SAS 令牌。 可以使用任何编程或脚本语言以编程方式执行此作。 本文中的示例演示如何使用 PowerShell 脚本从新的或现有策略生成 SAS 令牌。
安装所需的模块
若要使用 PowerShell 脚本管理 Azure 事件中心资源,需要具有以下模块:
- Az PowerShell 模块
- Az.EventHub PowerShell 模块
以下脚本安装所需的模块:
Install-Module -Name Az -AllowClobber -Scope CurrentUser -Repository PSGallery -Force
Install-Module -Name Az.EventHub -Scope CurrentUser -Force
如果已有所需的模块,并且想要将其更新到最新版本,请运行以下脚本:
Update-Module -Name Az -Force
Update-Module -Name Az.EventHub -Force
连接到 Azure
可以使用 Azure Cloud Shell 或登录并设置订阅上下文。
若要使用 Azure Cloud Shell 运行,请查看 “登录到 Azure”。
定义策略
若要创建 SAS 令牌,需要策略。 您可以选择:
使用特定权限创建新策略。
或
使用具有正确权限的现有策略。
为新策略或现有策略创建 SAS 令牌
注释
为了提高安全性,强烈建议尽可能通过 SAS 令牌身份验证替代基于密钥的身份验证。 SAS 令牌的最佳做法包括:定义适当的访问范围、设置过期日期并定期轮换 SAS 密钥。 对于基于密钥的身份验证,请确保定期轮换密钥。 使用 Azure Key Vault 或类似服务安全地存储所有机密。
创建新策略时,请确保它具有 “发送 ”权限。 如果使用现有策略,请验证它是否具有 “发送 ”权限。
以下脚本将创建新的策略,或获取现有策略,然后以 HTTP 授权标头格式从该策略生成完整的 SAS 令牌。
将尖括号 (<value>) 中的值替换为环境的值。
function Generate-SasToken {
$subscriptionId = "<Azure-Subscription-ID>"
$resourceGroupName = "<Resource-group-name>"
$namespaceName = "<Azure-Event-Hub-Namespace-name>"
$eventHubName = "<Azure-Event-Hubs-instance-name>"
$policyName = "<Policy-name>"
# Modifying the rest of the script is not necessary.
# Login to Azure and set Azure Subscription.
Connect-AzAccount
# Get current context and check subscription
$currentContext = Get-AzContext
if ($currentContext.Subscription.Id -ne $subscriptionId) {
Write-Host "Current subscription is $($currentContext.Subscription.Id), switching to $subscriptionId..."
Set-AzContext -SubscriptionId $subscriptionId | Out-Null
} else {
Write-Host "Already using subscription $subscriptionId."
}
# Try to get the authorization policy (it should have Send rights)
$rights = @("Send")
$policy = Get-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -ErrorAction SilentlyContinue
# If the policy does not exist, create it
if (-not $policy) {
Write-Output "Policy '$policyName' does not exist. Creating it now..."
# Create a new policy with the Manage, Send and Listen rights
$policy = New-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -Rights $rights
if (-not $policy) {
throw "Error. Policy was not created."
}
Write-Output "Policy '$policyName' created successfully."
} else {
Write-Output "Policy '$policyName' already exists."
}
if ("Send" -in $policy.Rights) {
Write-Host "Authorization rule has required right: Send."
} else {
throw "Authorization rule is missing Send right."
}
$keys = Get-AzEventHubKey -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName
if (-not $keys) {
throw "Could not obtain Azure Event Hub Key. Script failed and will end now."
}
if (-not $keys.PrimaryKey) {
throw "Could not obtain Primary Key. Script failed and will end now."
}
# Get the Primary Key of the Shared Access Policy
$primaryKey = ($keys.PrimaryKey)
Write-Host $primaryKey
## Check that the primary key is not empty.
# Define a function to create a SAS token (similar to the C# code provided)
function Create-SasToken {
param (
[string]$resourceUri, [string]$keyName, [string]$key
)
$sinceEpoch = [datetime]::UtcNow - [datetime]"1970-01-01"
$expiry = [int]$sinceEpoch.TotalSeconds + (60 * 60 * 24 * 31 * 6) # 6 months
$stringToSign = [System.Web.HttpUtility]::UrlEncode($resourceUri) + "`n" + $expiry
$hmac = New-Object System.Security.Cryptography.HMACSHA256
$hmac.Key = [Text.Encoding]::UTF8.GetBytes($key)
$signature = [Convert]::ToBase64String($hmac.ComputeHash([Text.Encoding]::UTF8.GetBytes($stringToSign)))
$sasToken = "SharedAccessSignature sr=$([System.Web.HttpUtility]::UrlEncode($resourceUri))&sig=$([System.Web.HttpUtility]::UrlEncode($signature))&se=$expiry&skn=$keyName"
return $sasToken
}
# Construct the resource URI for the SAS token
$resourceUri = "https://$namespaceName.servicebus.windows.net/$eventHubName"
# Generate the SAS token using the primary key from the new policy
$sasToken = Create-SasToken -resourceUri $resourceUri -keyName $policyName -key $primaryKey
# Output the SAS token
Write-Output @"
-- Generated SAS Token --
$sasToken
-- End of generated SAS Token --
"@
}
Generate-SasToken
启用和配置变更事件流式处理
若要启用和配置更改事件流式处理,请将数据库上下文更改为用户数据库,然后执行以下步骤:
- 如果尚未配置,请将数据库设置为 完整恢复模式。
- 创建主密钥和数据库范围的凭据。
- 启用事件流式处理。
- 创建事件流组。
- 将一个或多个表添加到事件流组。
本节中的示例演示如何为 AMQP 协议和 Apache Kafka 协议启用 CES。
下面是本部分中示例的示例参数值:
@stream_group_name = N'myStreamGroup'@destination_location = N'myEventHubsNamespace.servicebus.windows.net/myEventHubsInstance'@partition_key_scheme = N'None'- 主密钥或辅助密钥值:
Secret = 'BVFnT3baC/K6I8xNZzio4AeoFt6nHeK0i+ZErNGsxiw=' EXEC sys.sp_add_object_to_event_stream_group N'myStreamGroup', N'dbo.myTable'
示例:通过 AMQP 协议流式传输到 Azure 事件中心(SAS 令牌身份验证)
将尖括号 (<value>) 中的值替换为环境的值。
USE <database name>
-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'
CREATE DATABASE SCOPED CREDENTIAL <CredentialName>
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = '<Generated SAS Token>'
EXEC sys.sp_enable_event_stream
EXEC sys.sp_create_event_stream_group
@stream_group_name = N'<EventStreamGroupName>',
@destination_type = N'AzureEventHubsAmqp',
@destination_location = N'<AzureEventHubsHostName>/<EventHubsInstance>',
@destination_credential = <CredentialName>,
@max_message_size_kb = <MaxMessageSize>,
@partition_key_scheme = N'<PartitionKeyScheme>'
EXEC sys.sp_add_object_to_event_stream_group
N'<EventStreamGroupName>',
N'<SchemaName>.<TableName>'
示例:通过 AMQP 协议流式传输到 Azure 事件中心(密钥值身份验证)
将尖括号 (<value>) 中的值替换为环境的值。
USE <database name>
-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'
CREATE DATABASE SCOPED CREDENTIAL <CredentialName>
WITH IDENTITY = '<Azure Event Hubs SAS Policy name>',
SECRET = '<Primary or Secondary key value>'
EXEC sys.sp_enable_event_stream
EXEC sys.sp_create_event_stream_group
@stream_group_name = N'<EventStreamGroupName>',
@destination_type = N'AzureEventHubsAmqp',
@destination_location = N'<AzureEventHubsHostName>/<EventHubsInstance>',
@destination_credential = <CredentialName>,
@max_message_size_kb = <MaxMessageSize>,
@partition_key_scheme = N'<PatitionKeyScheme>'
EXEC sys.sp_add_object_to_event_stream_group
N'<EventStreamGroupName>',
N'<SchemaName>.<TableName>'
示例:通过 Apache Kafka 协议流式传输到 Azure 事件中心(连接字符串身份验证)
将尖括号 (<value>) 中的值替换为环境的值。
USE <database name>
-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'
CREATE DATABASE SCOPED CREDENTIAL credential1
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = '<Event Hubs Namespace – Primary or Secondary connection string>'
EXEC sys.sp_enable_event_stream
EXEC sys.sp_create_event_stream_group
@stream_group_name = N'<EventStreamGroupName>',
@destination_type = N'AzureEventHubsApacheKafka',
@destination_location = N'<AzureEventHubsHostName>:<port>/<EventHubsInstance>',
@destination_credential = <CredentialName>,
@max_message_size_kb = <MaxMessageSize>,
@partition_key_scheme = N'<PatitionKeyScheme>'
EXEC sys.sp_add_object_to_event_stream_group
N'<EventStreamGroupName>',
N'<SchemaName>.<TableName>'
示例:通过 Apache Kafka 协议流式传输到 Azure 事件中心(密钥值身份验证)
将尖括号 (<value>) 中的值替换为环境的值。
USE <database name>
-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'
CREATE DATABASE SCOPED CREDENTIAL credential1
WITH IDENTITY = '<Azure Event Hubs SAS Policy name>',
SECRET = '<Primary or Secondary key value>' -- BVFnT3baC/K6I8xNZzio4AeoFt6nHeK0i+ZErNGsxiw=
EXEC sys.sp_enable_event_stream
EXEC sys.sp_create_event_stream_group
@stream_group_name = N'<EventStreamGroupName>', -- myStreamGroup
@destination_type = N'AzureEventHubsApacheKafka',
@destination_location = N'<AzureEventHubsHostName>:<port>/<EventHubsInstance>', -- myEventHubsNamespace.servicebus.windows.net:9093/myEventHubsInstance
@destination_credential = <CredentialName>,
@max_message_size_kb = <MaxMessageSize>, -- 1024
@partition_key_scheme = N'<PatitionKeyScheme>' -- N'None'
EXEC sys.sp_add_object_to_event_stream_group
N'<EventStreamGroupName>',
N'<SchemaName>.<TableName>' -- dbo.myTable
查看 CES 配置和功能
在 sys.databases 中, is_event_stream_enabled = 1 指示为数据库启用了更改事件流式处理。
以下查询返回启用了更改事件流式处理的所有数据库:
SELECT * FROM sys.databases WHERE is_event_stream_enabled = 1
在 sys.tables 中, is_replicated = 1 指示表已流式传输, sp_help_change_feed_table 提供有关更改事件流式处理表组和表元数据的信息。
以下查询返回启用了更改事件流式处理的所有表,并提供元数据信息:
SELECT name, is_replicated FROM sys.tables
EXEC sp_help_change_feed_table @source_schema = '<schema name>', @source_name = '<table name>'
注释
目前,为 SQL Server 配置了变更数据捕获(CDC)、事务复制或 Fabric 镜像数据库的数据库不支持 CES。
CES 存储过程、系统函数和 DMV
下表列出了用于配置、禁用和监视更改事件流的存储过程、系统函数和 DMV:
| System 对象 | DESCRIPTION |
|---|---|
|
|
|
| sys.sp_enable_event_stream | 为当前用户数据库启用 CES。 |
| sys.sp_create_event_stream_group | 创建流组,该流组是一组表的流配置。 流组还定义了目标和相关详细信息(例如身份验证、消息大小、分区)。 该过程完成后,会自动为最终用户生成并显示stream_group_id。 |
| sys.sp_add_object_to_event_stream_group | 将表添加到流组。 |
|
|
|
| sys.sp_remove_object_from_event_stream_group | 从流组中删除表。 |
| sys.sp_drop_event_stream_group | 删除流组。 流组必须未被使用。 |
| sys.sp_disable_event_stream | 禁用当前用户数据集的 CES。 |
|
|
|
| sys.dm_change_feed_errors | 返回交付错误。 |
| sys.dm_change_feed_log_scan_sessions | 返回有关日志扫描活动的信息。 |
| sys.sp_help_change_feed_settings | 提供配置的更改事件流式处理的状态和信息。 |
| sys.sp_help_change_feed | 监视变更流的当前配置。 |
| sys.sp_help_change_feed_table_groups | 返回用于配置更改事件流式处理组的元数据。 |
| sys.sp_help_change_feed_table | 提供用于更改事件流的流式处理组和表元数据的状态和信息。 |
局限性
变更事件流(CES)具有以下限制:
服务器级和常规限制
- Linux 上的 SQL Server 2025 或 SQL Server 2025 Express 版本不支持 CES。
- CES 仅针对来自
INSERTUPDATE和DELETEDML 语句的数据更改发出事件。 - CES 不会处理模式更改(DDL操作),这意味着它不会对DDL操作发出事件。 但是,不会阻止 DDL 操作,因此,如果执行的话,后续 DML 事件的架构将反映更新的表结构。 用户应正常处理具有更新架构的事件。
- 当 JSON 是指定的输出格式时,大型事件消息可能会拆分为大约 25% 每个流组配置的最大消息大小。 此限制不适用于二进制输出类型。
- 如果消息超出 Azure 事件中心消息大小限制,则目前只能通过扩展事件观察到失败。
- 已配置为 CES 的表无法进行表和列重命名。 允许数据库重命名。
数据库级限制
- CES 仅在配置了完整恢复模式的数据库上受支持。
- 不支持在配置了 用于 SQL Server 的 Fabric 镜像数据库、事务复制、更改数据捕获 或 Azure Synapse Link 的数据库上使用 CES。 数据库在配置 CES 后,支持更改跟踪。
- CES 只能从可写的主数据库流式传输。 属于 AlwaysOn 可用性组或使用 托管实例链接 的辅助数据库无法配置为流式处理源。
- 无法在视图或物化视图上启用 CES。
表级限制
- 表只能属于一个流式处理组。 不支持将同一个表流式传输到多个目标。
- 只能为 CES 配置用户表。 不支持系统表。
- 最多可以配置 4,096 个流组。 每个流组最多可包含 40,000 个表。
- 在表上启用 CES 时,主键约束不能添加到该表或从该表中删除。
-
ALTER TABLE SWITCH PARTITION不支持在针对 CES 进行配置的表中使用。 -
TRUNCATE TABLE在为 CES 启用的表上不受支持。 - CES 不支持使用以下任何功能的表:
- 聚集列存储索引
- 临时历史记录表或账本历史记录表
- 始终加密 (Always Encrypted)
- 内存内 OLTP(内存优化表)
- 图形表
- 外部表
列级限制
- CES 不支持以下数据类型。 流式处理会跳过如下这些类型的列:
jsonimagetext/ntextxmlrowversion/timestampsql_variant- 用户定义的类型 (UDT)
geometrygeographyvector
源数据库中的权限
- 对于行级安全性,CES 会发送所有行的更改,而不考虑用户权限。
- 动态数据掩码不适用于通过 CES 发送的数据。 数据以未掩码形式流式传输,即使已配置掩码。
- CES 不会发出与对象级权限更改相关的事件(例如,向特定列授予权限)。