方案:Azure AI 服务 - 多变量异常情况检测

此方案演示如何在 Apache Spark 上使用 SynapseML 和 Azure AI 服务进行多变量异常检测。 多变量异常检测涉及检测许多变量或时序之间的异常情况,同时考虑不同变量之间的所有关联和依赖关系。 此方案使用 SynapseML 和 Azure AI 服务来训练模型进行多变量异常检测。 然后,我们使用模型推断数据集中的多变量异常,该数据集包含来自三个 IoT 传感器的合成度量。

重要

从 2023 年 9 月 20 日开始,无法创建新的异常检测器资源。 异常检测器服务将于 2026 年 10 月 1 日停用。

有关 Azure AI 异常检测器的详细信息,请访问 异常检测器 信息资源。

先决条件

  • Azure 订阅 - 免费创建订阅
  • 将笔记本附加到湖屋。 在左侧,选择“添加”以添加现有湖屋或创建湖屋。

安装

从现有 Anomaly Detector 资源开始,可以探索处理各种表单数据的方法。 Azure AI 中的服务目录提供了多个选项:

创建异常检测器资源

  • 在 Azure 门户中,在资源组中选择创建,然后键入异常检测器。 选择“异常检测器”资源。
  • 为资源命名,理想情况下使用与资源组的其余区域相同的区域。 使用其余选项的默认选项,然后选择“查看 + 创建”,然后选择“创建”。
  • 创建异常检测器资源后,打开它,然后选择 Keys and Endpoints 左侧导航栏中的面板。 将异常检测器资源的密钥复制到 ANOMALY_API_KEY 环境变量中,或将其存储在 anomalyKey 变量中。

创建存储帐户资源

若要保存中间数据,必须创建 Azure Blob 存储帐户。 在该存储帐户中,创建用于存储中间数据的容器。 记下容器名称,并将连接字符串复制到该容器。 稍后需要它来填充 containerName 变量和 BLOB_CONNECTION_STRING 环境变量。

输入服务密钥

首先,为服务密钥设置环境变量。 下一个单元格基于 Azure Key Vault 中存储的值设置 ANOMALY_API_KEY 环境变量和 BLOB_CONNECTION_STRING 环境变量。 如果在自己的环境中运行本教程,请确保在继续作之前设置以下环境变量:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

ANOMALY_API_KEYBLOB_CONNECTION_STRING环境变量读取,并设置containerNamelocation变量。

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

连接到存储帐户,使异常检测器可以在该存储帐户中保存中间结果:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

导入所有必要的模块:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

将示例数据读入 Spark 数据帧:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

现在可以创建一个 estimator 对象,用于训练模型。 我们指定训练数据的开始和结束时间。 我们还指定要使用的输入列,以及包含时间戳的列的名称。 最后,我们指定要在异常情况检测滑动窗口中使用的数据点数,并将连接字符串设置为 Azure Blob 存储帐户:

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

让我们将 estimator 拟合到数据中:

model = estimator.fit(df)

训练完成后,可以使用模型进行推理。 下一个单元格中的代码指定要在其中检测异常的数据的开始和结束时间:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

在上一个单元格中, .show(5) 显示前五个数据帧行。 结果都是 null 因为他们降落在推理窗口外。

若要仅显示推断数据的结果,请选择所需的列。 然后,我们可以按升序对数据帧中的行进行排序,并筛选结果以仅显示推理窗口范围内的行。 在这里, inferenceEndTime 匹配数据帧中的最后一行,因此可以忽略它。

最后,为了更好地绘制结果,请将 Spark 数据帧转换为 Pandas 数据帧:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

设置 contributors 列的格式,该列将每个传感器的贡献分数存储到检测到的异常中。 下一个单元格处理此问题,并将每个传感器的贡献分数拆分为其自己的列:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

现在,我们的 series_0series_1series_2 列中分别有传感器 1、2 和 3 的贡献分数。

若要绘制结果,请运行下一个单元格。 该 minSeverity 参数指定要绘制的异常的最小严重性:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

多变量异常情况检测结果绘图的屏幕截图。

绘图以橙色、绿色和蓝色显示来自传感器(推理窗口内)的原始数据。 第一张图中的红色竖线显示检测到的异常,其严重程度大于或等于 minSeverity

第二张图显示所有检测到的异常的严重程度分数,阈值 minSeverity 以红色虚线显示。

最后,最后一张图显示每个传感器的数据对检测到的异常的贡献。 它帮助我们诊断和了解每个异常最可能的原因。