延迟

✅ Azure 流分析 ✅ 构造事件流

LAG 分析运算符允许在特定约束内在事件流中查找“上一个”事件。 它非常适用于计算变量的增长率、检测变量何时越过阈值或条件开始或停止为 true 的情况。

在流分析中,使用 LIMIT DURATION 子句(即,与当前事件相去甚远)的范围始终限制为有限的时间间隔。 LAG 可以选择性地限制为仅考虑使用 PARTITION BY 和 WHEN 子句与特定属性或条件上的当前事件匹配的事件。

LAG 不受 WHERE 子句中的谓词、JOIN 子句中的联接条件或当前查询的 GROUP BY 子句中的分组表达式的影响,因为它是在这些子句之前计算的。

语法

LAG(<scalar_expression >, [<offset >], [<default>])  
     OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
  

例如:

LAG(reading) OVER (LIMIT DURATION(hour, 3))  
LAG(name, 2, 'none such') OVER (PARTITION BY userId LIMIT DURATION(minute, 2))  

论据

scalar_expression

要根据指定偏移量返回的值。 它是返回单个(标量)值或通配符表达式“*”的任何类型的表达式。 对于“*”,将返回根据指定的偏移量返回整个事件,并将包含在结果事件(嵌套记录) 中。
scalar_expression不能包含其他分析函数或外部函数。

抵消

要从中获取值的当前事件返回的事件数。 如果未指定,则默认值为 1,这意味着它将返回上一个事件。 偏移量必须是大于或等于 1 的整数。 事件按时态顺序进行处理。 如果有多个事件具有相同时间戳事件,则按到达顺序进行处理。

默认

在指定偏移量处没有事件时要返回的值。 如果未指定默认值,则返回 NULL。 如果指定的偏移量处的事件数小于指定的偏移量或 2),则“指定偏移量处没有事件”可以是 1)的情况,前提是指定的偏移量的事件根据指定的limit_duration_clause 3) 事件存在,但与when_clause中指定的布尔条件不匹配。

如果指定偏移量处的事件存在且scalar_expression的值为 NULL,则为 NULL
返回 。 default 可以是列、子查询或其他表达式,但它不能包含其他表达式
分析函数或外部函数。 default 必须与 具有相同的类型
scalar_expression。

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause PARTITION BY <分区键> 子句仅请求其值为的事件
<分区键与当前事件的分区键> 相同。 例如,

LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  

将返回与当前事件相同的传感器的上一个读数(如果在前 1 小时内发生此类事件)。

limit_duration 子句 DURATION(<单位>、 <长度>)

指定必须考虑当前事件中的历史记录量。 有关支持的单位及其缩写的详细说明,请参阅 DATEDIFF。 如果在 DURATION 间隔内找不到足够的匹配事件,则 <返回默认值> 。

when_clause
指定在 LAG 计算中要考虑的事件的布尔条件。 如果在 DURATION 间隔内找不到足够的匹配事件,则 <返回默认值> 。 when_clause是可选的。

返回类型

指定scalar_expression的数据类型。 如果scalar_expression,则返回 NULL

一般备注

LAG 具有不确定性。 事件按时态顺序进行处理。 如果有多个事件具有相同时间戳事件,则按到达顺序进行处理。

开窗函数 的结果集应用 LAG 可能会产生意外的结果。 窗口化函数会更改事件的时间戳,因为每个窗口作都会在窗口末尾输出事件。 在窗口作后,可以使用 system.timestamp()访问事件的当前时间戳,它与原始事件时间属性不同。 如果在窗口作之前无法移动 LAG,请考虑使用 CollectTop,按原始事件时间排序。

例子

计算每个传感器的增长率:

SELECT sensorId,  
       growth = reading -
                        LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  
FROM input  
  

查找以前的非 null 传感器读数:

SELECT  
     sensorId,  
     LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
     FROM input  
  

查找特定传感器类型的以前的非 null 传感器读数:

WITH filterSensor AS
(
  SELECT *
  FROM input
  WHERE input.sensorType = 4 AND sensorId IS NOT NULL
)

SELECT
  LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM filterSensor

确定变量何时越过阈值:

SELECT
    sensorId, reading
FROM input
WHERE
    devicetype = 'thermostat'
    AND reading > 100
    AND LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN devicetype = 'thermostat') <= 100

另请参阅

ISFIRST
上次