✅ Azure 流分析 ✅ 构造事件流
LAG 分析运算符允许在特定约束内在事件流中查找“上一个”事件。 它非常适用于计算变量的增长率、检测变量何时越过阈值或条件开始或停止为 true 的情况。
在流分析中,使用 LIMIT DURATION 子句(即,与当前事件相去甚远)的范围始终限制为有限的时间间隔。 LAG 可以选择性地限制为仅考虑使用 PARTITION BY 和 WHEN 子句与特定属性或条件上的当前事件匹配的事件。
LAG 不受 WHERE 子句中的谓词、JOIN 子句中的联接条件或当前查询的 GROUP BY 子句中的分组表达式的影响,因为它是在这些子句之前计算的。
语法
LAG(<scalar_expression >, [<offset >], [<default>])
OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
例如:
LAG(reading) OVER (LIMIT DURATION(hour, 3))
LAG(name, 2, 'none such') OVER (PARTITION BY userId LIMIT DURATION(minute, 2))
论据
scalar_expression
要根据指定偏移量返回的值。 它是返回单个(标量)值或通配符表达式“*”的任何类型的表达式。 对于“*”,将返回根据指定的偏移量返回整个事件,并将包含在结果事件(嵌套记录) 中。
scalar_expression不能包含其他分析函数或外部函数。
抵消
要从中获取值的当前事件返回的事件数。 如果未指定,则默认值为 1,这意味着它将返回上一个事件。 偏移量必须是大于或等于 1 的整数。 事件按时态顺序进行处理。 如果有多个事件具有相同时间戳事件,则按到达顺序进行处理。
默认
在指定偏移量处没有事件时要返回的值。 如果未指定默认值,则返回 NULL。 如果指定的偏移量处的事件数小于指定的偏移量或 2),则“指定偏移量处没有事件”可以是 1)的情况,前提是指定的偏移量的事件根据指定的limit_duration_clause 3) 事件存在,但与when_clause中指定的布尔条件不匹配。
如果指定偏移量处的事件存在且scalar_expression的值为 NULL,则为 NULL
返回 。 default 可以是列、子查询或其他表达式,但它不能包含其他表达式
分析函数或外部函数。 default 必须与 具有相同的类型
scalar_expression。
OVER ([ partition_by_clause ] limit_duration_clause [when_clause])
partition_by_clause PARTITION BY <分区键> 子句仅请求其值为的事件
<分区键与当前事件的分区键> 相同。 例如,
LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
将返回与当前事件相同的传感器的上一个读数(如果在前 1 小时内发生此类事件)。
limit_duration 子句 DURATION(<单位>、 <长度>)
指定必须考虑当前事件中的历史记录量。 有关支持的单位及其缩写的详细说明,请参阅 DATEDIFF。 如果在 DURATION 间隔内找不到足够的匹配事件,则 <返回默认值> 。
when_clause
指定在 LAG 计算中要考虑的事件的布尔条件。 如果在 DURATION 间隔内找不到足够的匹配事件,则 <返回默认值> 。 when_clause是可选的。
返回类型
指定scalar_expression的数据类型。 如果scalar_expression,则返回 NULL
一般备注
LAG 具有不确定性。 事件按时态顺序进行处理。 如果有多个事件具有相同时间戳事件,则按到达顺序进行处理。
对 开窗函数 的结果集应用 LAG 可能会产生意外的结果。 窗口化函数会更改事件的时间戳,因为每个窗口作都会在窗口末尾输出事件。 在窗口作后,可以使用 system.timestamp()访问事件的当前时间戳,它与原始事件时间属性不同。 如果在窗口作之前无法移动 LAG,请考虑使用 CollectTop,按原始事件时间排序。
例子
计算每个传感器的增长率:
SELECT sensorId,
growth = reading -
LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM input
查找以前的非 null 传感器读数:
SELECT
sensorId,
LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)
FROM input
查找特定传感器类型的以前的非 null 传感器读数:
WITH filterSensor AS
(
SELECT *
FROM input
WHERE input.sensorType = 4 AND sensorId IS NOT NULL
)
SELECT
LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM filterSensor
确定变量何时越过阈值:
SELECT
sensorId, reading
FROM input
WHERE
devicetype = 'thermostat'
AND reading > 100
AND LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN devicetype = 'thermostat') <= 100
另请参阅
ISFIRST
上次