✅ Azure 流分析 ✅ 构造事件流
检测时序事件中的临时异常。
基础机器学习模型使用自适应内核密度估计算法。
语法
AnomalyDetection_SpikeAndDip(
<scalar_expression>,
<confidence>,
<historySize>,
<mode>)
OVER ([PARTITION BY <partition key>]
LIMIT DURATION(<unit>, <length>)
[WHEN boolean_expression])
论据
scalar_expression
模型对其执行异常情况检测的事件列或计算字段。 此参数的允许值包括返回单个(标量)值的 FLOAT 或 BIGINT 数据类型。
不允许通配符表达式 * 。 此外, scalar_expression 不能包含其他分析函数或外部函数。
信心
从 1.00 到 100(含)的百分比数字,用于设置机器学习模型的敏感度。 置信度越低,检测到的异常数越高,反之亦然。 从 70 到 90 之间的任意数字开始,并根据开发或测试中观察到的结果调整此值。
historySize
模型持续学习的滑动窗口中的事件数,并用于为下一个事件评分以异常情况。 通常,这应表示正常行为的时间段,使模型能够标记后续异常。 首先使用历史日志进行经过教育的猜测,并根据开发或测试中观察到的结果进行调整。
模式
一个字符串参数,其值为“spikes”、“dips”或“spikesanddips”,用于分别仅检测峰值、仅下降或同时检测峰值和下降。
OVER ([ partition_by_clause ] limit_duration_clause [when_clause])
partition_by_clause
用于根据事件中的特定列对模型的训练进行分区。 该模型在所有分区中应用相同的函数参数设置。
limit_duration_clause DURATION(单位、长度)
流分析中滑动窗口的大小(按时间计算)。 此时间窗口的建议大小相当于生成 处于 稳定状态的事件数所花费的时间。
when_clause
为模型接受的事件指定布尔条件以执行异常情况检测。 when_clause是可选的。
返回类型
该函数返回由下列列组成的嵌套记录:
IsAnomaly
一个 BIGINT (0 或 1),指示事件是否异常。
分数
指示事件异常情况的计算 p 值分数(浮点数)。 较低的分数意味着事件属于同一分布的概率较低,因此其异常性就越低。
例子
以下示例假定在 2 分钟的滑动窗口中,平均输入速率为每秒 1 个事件,历史记录大小为 120 个事件。 最终的 SELECT 语句提取并输出分数和异常状态,置信度为 95%。
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature AS FLOAT) as temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS FLOAT), 95, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') as FLOAT) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep
使用 1 秒翻转窗口进行统一的非统一输入流的示例:
WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_SpikeAndDip(temp, 95, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) as SpikeAndDipScores
FROM SmootheningStep
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS FLOAT) As
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep
使用分区查询来训练每个传感器的单独模型的示例:
WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() AS time,
CAST(temperature AS FLOAT) AS temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS FLOAT), 95, 120, 'spikesanddips')
OVER(PARTITION BY sensorid LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM input
)
SELECT
CAST (sensorid AS NVARCHAR(max)) AS sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') as float) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep