重要
截至 2025 年 9 月 30 日,Azure SQL Edge 已停用。 有关详细信息和迁移选项,请参阅停用通知。
注释
Azure SQL Edge 不再支持 ARM64 平台。
本文介绍如何在 Azure SQL Edge 中创建 T-SQL 流式处理作业。 创建外部流输入和输出对象,然后在创建流式处理作业的过程中定义流式处理作业查询。
配置外部流输入和输出对象
T-SQL 流式处理使用 SQL Server 的外部数据源功能,来定义与流式处理作业的外部流输入和输出相关联的数据源。 使用以下 T-SQL 命令创建外部流输入或输出对象:
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
此外,如果将 Azure SQL Edge、SQL Server 或 Azure SQL 数据库用作输出流,则需使用 CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL)。 此 T-SQL 命令定义用于访问数据库的凭据。
支持的输入和输出流数据源
Azure SQL Edge 目前仅支持以下数据源作为流输入和输出。
| 数据源类型 | Input | 输出 | Description |
|---|---|---|---|
| Azure IoT Edge 中心 | Y | Y | 用于通过 Azure IoT Edge 中心读取和写入流式处理数据的数据源。 有关详细信息,请参阅 IoT Edge 中心。 |
| SQL Database | N | Y | 将流式处理数据写入 SQL 数据库的数据源连接。 数据库可以是 Azure SQL Edge 中的本地数据库,也可以是 SQL Server 或 Azure SQL 数据库中的远程数据库。 |
| Kafka | Y | N | 从 Kafka 主题读取流式处理数据的数据源。 |
示例:为 Azure IoT Edge 中心创建外部流输入/输出对象
以下示例为 Azure IoT Edge 中心创建外部流对象。 若要为 Azure IoT Edge 中心创建外部流输入/输出数据源,首先还需要针对要读取或写入的数据的布局创建一个外部文件格式。
创建 JSON 类型的外部文件格式。
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO为 Azure IoT Edge 中心创建外部数据源。 以下 T-SQL 脚本创建到 IoT Edge 中心的数据源连接,该中心与 Azure SQL Edge 在同一 Docker 主机上运行。
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO为 Azure IoT Edge 中心创建外部流对象。 以下 T-SQL 脚本为 IoT Edge 中心创建流对象。 对于 IoT Edge 中心流对象,LOCATION 参数是要读取或写入的 IoT Edge 中心主题或通道的名称。
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
示例:创建 Azure SQL 数据库的外部流对象
以下示例为 Azure SQL Edge 中的本地数据库创建一个外部流对象。
在数据库上创建主密钥。 这是加密凭据密钥所必需的。
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';创建数据库范围凭据以访问 SQL Server 源。 以下示例为外部数据源创建一个凭据,其中 IDENTITY = 用户名,SECRET = 密码。
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。 下面的示例:
- 创建名为 LocalSQLOutput 的外部数据源。
- 标识外部数据源 (
LOCATION = '<vendor>://<server>[:<port>]')。 在示例中,它指向 Azure SQL Edge 的本地实例。 - 使用先前创建的凭据。
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO创建外部流对象。 以下示例创建一个指向 MySQLDatabase 数据库中的 dbo.TemperatureMeasurements 表的外部流对象。
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
示例:为 Kafka 创建外部流对象
以下示例为 Azure SQL Edge 中的本地数据库创建一个外部流对象。 此示例假设已将 kafka 服务器配置为使用匿名访问。
使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。 下面的示例:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO为 Kafka 输入创建外部文件格式。 以下示例创建一个采用 GZipped 压缩的 JSON 文件格式。
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO创建外部流对象。 以下示例创建指向 Kafka 主题
TemperatureMeasurement的外部流对象。CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
创建流式处理作业和流式处理查询
使用 sys.sp_create_streaming_job 系统存储过程来定义流式处理查询并创建流式处理作业。
sp_create_streaming_job 存储过程采用以下参数:
-
@job_name:流式处理作业的名称。 流式处理作业名称在实例中是唯一的。 -
@statement:基于流分析查询语言的流式处理查询语句。
以下示例创建一个简单的流式处理作业,其中包含一个流式处理查询。 此查询从 IoT Edge 中心读取输入,并将数据写入到数据库中的 dbo.TemperatureMeasurements。
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
以下示例创建一个更复杂的流式处理作业,其中包含多个不同的查询。 这些查询中有一个查询使用内置的 AnomalyDetection_ChangePoint 函数来识别温度数据中的异常。
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
启动、停止、删除和监视流式处理作业
若要在 Azure SQL Edge 中启动流式处理作业,请运行 sys.sp_start_streaming_job 存储过程。 该存储过程需要使用要启动的流式处理作业的名称作为输入。
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
若要停止流式处理作业,请运行 sys.sp_stop_streaming_job 存储过程。 该存储过程需要使用要停止的流式处理作业的名称作为输入。
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
若要丢弃(或删除)流式处理作业,请运行 sys.sp_drop_streaming_job 存储过程。 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
若要获取流式处理作业的当前状态,请运行 sys.sp_get_streaming_job 存储过程。 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。 它输出流式处理作业的名称和当前状态。
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
流式处理作业可以处于以下任一状态:
| 状态 | Description |
|---|---|
| 已创建 | 流式处理作业已创建,但尚未启动。 |
| 正在启动 | 流式处理作业处于开始阶段。 |
| 空闲 | 流式处理作业正在运行,但没有要处理的输入。 |
| Processing | 流式处理作业正在运行,且正在处理输入。 此状态指示流式处理作业的正常运行状态。 |
| 已降级 | 流式处理作业正在运行,但在处理输入期间出现一些非致命错误。 输入作业继续运行,但会丢弃遇到错误的输入。 |
| 已停止 | 流式处理作业已停止。 |
| 已失败 | 流式处理作业失败。 这通常表示在处理过程中出现灾难性错误。 |
注释
由于流式处理作业是异步执行的,因此作业可能会在运行时遇到错误。 若要排查流式处理作业失败问题,请使用 sys.sp_get_streaming_job 存储过程,或查看来自 Azure SQL Edge 容器的 Docker 日志,该容器可以提供流式处理作业中的错误详细信息。