教程:创建、评估和评分计算机故障检测模型

本教程演示了 Microsoft Fabric 中 Synapse 数据科学工作流的端到端示例。 此方案使用机器学习进行更系统的方法进行故障诊断、主动识别问题并在实际计算机故障之前采取措施。 目标是根据进程温度、旋转速度等预测计算机是否会遇到故障。

本教程介绍以下步骤:

  • 安装自定义库
  • 加载和处理数据
  • 通过探索性数据分析了解数据
  • 使用 scikit-learn、LightGBM 和 MLflow 训练机器学习模型,并使用 Fabric 自动记录功能跟踪试验
  • 使用 Fabric PREDICT 功能为训练的模型评分,保存最佳模型,并为预测加载该模型
  • 使用 Power BI 可视化效果显示加载的模型性能

先决条件

在笔记本中继续操作

可以选择以下选项之一以在笔记中继续操作:

  • 打开并运行内置笔记本。
  • 从 GitHub 上传笔记本。

打开内置笔记本

本教程随附“计算机故障”示例笔记本

  1. 若要打开本教程的示例笔记本,请按照 为数据科学教程准备系统中的说明进行操作。

  2. 在开始运行代码之前,请务必将湖屋附加到笔记本

从 GitHub 导入笔记本

本教程随附 AISample - 预测性维护笔记本。

步骤 1:安装自定义库

对于机器学习模型开发或即席数据分析,可能需要为 Apache Spark 会话快速安装自定义库。 有两个选项可用于安装库。

  • 使用笔记本的内联安装功能(%pip%conda),仅在当前笔记本中安装库。
  • 或者,可以创建 Fabric 环境、从公共源安装库或将自定义库上传到该环境,然后工作区管理员可以将环境附加为工作区的默认值。 然后,环境中的所有库都将可用于工作区中的任何笔记本和 Spark 作业定义。 有关环境的详细信息,请参阅 在 Microsoft Fabric中创建、配置和使用环境。

在本教程中,请使用 %pip install 在笔记本中安装 imblearn 库。

注意

运行 %pip install 后,PySpark 内核将重启。 在运行任何其他单元格之前安装所需的库。

# Use pip to install imblearn
%pip install imblearn

步骤 2:加载数据

数据集模拟将制造机器的参数记录为时间函数,这在工业环境中很常见。 它包含 10,000 个数据点,这些数据点以行的形式存储,其中特征为列。 这些功能包括:

  • 范围从 1 到 10000 的唯一标识符 (UID)

  • 产品编号,由字母 L(表示低)、M(表示中等)或 H(表示高)标识产品质量等级,以及变体特有的序列号。 低质量、中质量和高质量变体分别占所有产品的 60%、30% 和 10%

  • 气温,以凯尔文度为单位(K)

  • 过程温度,以开尔文度计

  • 转速,以每分钟转数为单位(RPM)

  • 扭矩,单位 Newton-Meters(Nm)

  • 工具磨损,以分钟为单位。 质量变体 H、M 和 L 分别给用于流程的工具增加了 5、3 和 2 分钟的磨损时间。

  • 计算机故障标签,指示计算机是否在特定数据点中失败。 此特定数据点可以具有以下五种独立故障模式中的任何一种:

    • 工具磨损故障 (TWF):工具在随机选择的工具磨损时间(200 到 240 分钟)更换或发生故障
    • 散热故障(HDF):如果空气温度与过程温度之间的差异小于 8.6 K,并且工具的旋转速度小于 1380 RPM,则散热不足会导致过程故障。
    • 功率故障(PWF):扭矩和旋转速度(以弧度/秒为单位)的乘积等于过程所需的功率。 如果此电源低于 3,500 W 或超过 9,000 W,该过程将失败
    • 过度应变故障 (OSF):如果工具磨损和扭矩的乘积超过 11,000 最小 Nm(L 产品变体,M 为 12,000,H 为 13,000),则工序因过度应变而失败。
    • 随机故障(RNF):无论进程参数如何,每个进程都有 0.1%的失败机会

注意

如果上述故障模式中至少有一种为 true,则进程会失败,并且“计算机故障”标签设置为 1。 机器学习方法无法确定哪个故障模式导致进程失败。

下载数据集并上传到湖仓 (Lakehouse)

连接到 Azure 开放数据集容器,并加载预测性维护数据集。 此代码下载公开可用的数据集版本,然后将其存储在 Fabric Lakehouse 中:

重要

在运行代码之前,将湖屋添加到笔记本。 否则,您会遇到错误。 有关添加湖屋的信息,请参阅连接湖屋和笔记本

# Download demo data files into the lakehouse if they don't exist
import os, requests
DATA_FOLDER = "Files/predictive_maintenance/"  # Folder that contains the dataset
DATA_FILE = "predictive_maintenance.csv"  # Data file name
remote_url = "https://synapseaisolutionsa.z13.web.core.windows.net/data/MachineFaultDetection"
file_list = ["predictive_maintenance.csv"]
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

if not os.path.exists("/lakehouse/default"):
    raise FileNotFoundError(
        "Default lakehouse not found, please add a lakehouse and restart the session."
    )
os.makedirs(download_path, exist_ok=True)
for fname in file_list:
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
print("Downloaded demo data files into lakehouse.")

将数据集下载到 Lakehouse 后,可以将其加载为 Spark 数据帧:

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}raw/{DATA_FILE}")
    .cache()
)
df.show(5)

下表显示了数据的预览:

UDI 产品 ID 类型 气温 [K] 进程温度 [K] 旋转速度 [rpm] 扭力 [Nm] 工具磨损时间 [分钟] 目标 失败类型
1 M14860 M 298.1 308.6 1551 42.8 0 0 无故障
2 L47181 L 298.2 308.7 1408 46.3 3 0 无故障
3 L47182 L 298.1 308.5 1498 49.4 5 0 无故障
4 L47183 L 298.2 308.6 1433 39.5 7 0 无故障
5 L47184 L 298.2 308.7 1408 40.0 9 0 无故障

将 Spark DataFrame 写入湖屋 Delta 表

设置数据的格式(例如,将空格替换为下划线),以便在后续步骤中促进 Spark 操作:

# Replace the space in the column name with an underscore to avoid an invalid character while saving 
df = df.toDF(*(c.replace(' ', '_') for c in df.columns))
table_name = "predictive_maintenance_data"
df.show(5)

下表显示了具有重新格式化列名称的数据预览:

UDI Product_ID 类型 空气温度_[K] 过程温度_[K] 转速[rpm] Torque_[Nm] Tool_wear_[min] 目标 故障类型
1 M14860 M 298.1 308.6 1551 42.8 0 0 无故障
2 L47181 L 298.2 308.7 1408 46.3 3 0 无故障
3 L47182 L 298.1 308.5 1498 49.4 5 0 无故障
4 L47183 L 298.2 308.6 1433 39.5 7 0 无故障
5 L47184 L 298.2 308.7 1408 40.0 9 0 无故障
# Save data with processed columns to the lakehouse 
df.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

步骤 3:预处理数据并执行探索性数据分析

将 Spark 数据帧转换为 pandas 数据帧,以使用与 Pandas 兼容的常用绘图库。

提示

对于大型数据集,可能需要加载该数据集的一部分。

data = spark.read.format("delta").load("Tables/predictive_maintenance_data")
SEED = 1234
df = data.toPandas()
df.drop(['UDI', 'Product_ID'],axis=1,inplace=True)
# Rename the Target column to IsFail
df = df.rename(columns = {'Target': "IsFail"})
df.info()

根据需要将数据集的特定列转换为浮点或整数类型,并将字符串('L''M''H')映射到数值(012):

# Convert temperature, rotational speed, torque, and tool wear columns to float
df['Air_temperature_[K]'] = df['Air_temperature_[K]'].astype(float)
df['Process_temperature_[K]'] = df['Process_temperature_[K]'].astype(float)
df['Rotational_speed_[rpm]'] = df['Rotational_speed_[rpm]'].astype(float)
df['Torque_[Nm]'] = df['Torque_[Nm]'].astype(float)
df['Tool_wear_[min]'] = df['Tool_wear_[min]'].astype(float)

# Convert the 'Target' column to an integer 
df['IsFail'] = df['IsFail'].astype(int)
# Map 'L', 'M', 'H' to numerical values 
df['Type'] = df['Type'].map({'L': 0, 'M': 1, 'H': 2})

通过可视化效果浏览数据

# Import packages and set plotting style
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
sns.set_style('darkgrid')

# Create the correlation matrix
corr_matrix = df.corr(numeric_only=True)

# Plot a heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True)
plt.show()

显示特征相关矩阵绘图的屏幕截图。

如预期所示,失败(IsFail)与所选特征(列)相关联。 相关矩阵显示,Air_temperatureProcess_temperatureRotational_speedTorqueTool_wearIsFail 变量具有最高相关性。

# Plot histograms of select features
fig, axes = plt.subplots(2, 3, figsize=(18,10))
columns = ['Air_temperature_[K]', 'Process_temperature_[K]', 'Rotational_speed_[rpm]', 'Torque_[Nm]', 'Tool_wear_[min]']
data=df.copy()
for ind, item in enumerate (columns):
    column = columns[ind]
    df_column = data[column]
    df_column.hist(ax = axes[ind%2][ind//2], bins=32).set_title(item)
fig.supylabel('count')
fig.subplots_adjust(hspace=0.2)
fig.delaxes(axes[1,2])

显示特征图绘图的屏幕截图。

如绘图图所示,Air_temperatureProcess_temperatureRotational_speedTorqueTool_wear 变量不稀疏。 它们似乎在特征空间中具有良好的连续性。 这些绘图确认在此数据集上训练机器学习模型可能会生成可通用化到新数据集的可靠结果。

检查目标变量的类别不平衡

计算失败和未失败设备的样本数,并检查每个类的数据平衡(IsFail=0IsFail=1):

# Plot the counts for no failure and each failure type
plt.figure(figsize=(12, 2))
ax = sns.countplot(x='Failure_Type', data=df)
for p in ax.patches:
    ax.annotate(f'{p.get_height()}', (p.get_x()+0.4, p.get_height()+50))

plt.show()

# Plot the counts for no failure versus the sum of all failure types
plt.figure(figsize=(4, 2))
ax = sns.countplot(x='IsFail', data=df)
for p in ax.patches:
    ax.annotate(f'{p.get_height()}', (p.get_x()+0.4, p.get_height()+50))

plt.show()

显示样本不平衡的图表的屏幕截图。

这些绘图指示无故障类(显示为第二个绘图中的 IsFail=0)构成了大部分样本。 使用过度采样技术创建更均衡的训练数据集:

# Separate features and target
features = df[['Type', 'Air_temperature_[K]', 'Process_temperature_[K]', 'Rotational_speed_[rpm]', 'Torque_[Nm]', 'Tool_wear_[min]']]
labels = df['IsFail']

# Split the dataset into the training and testing sets
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

# Ignore warnings
import warnings
warnings.filterwarnings('ignore')
# Save test data to the lakehouse for use in future sections
table_name = "predictive_maintenance_test_data"
df_test_X = spark.createDataFrame(X_test)
df_test_X.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

过采样以平衡训练数据集中的类别

前面的分析显示数据集高度不平衡。 这种不平衡成为一个问题,因为少数类的示例太少,模型无法有效地了解决策边界。

SMOTE 可以解决此问题。 SMOTE 是一种广泛使用的过度采样技术,用于生成综合示例。 它基于数据点之间的欧几里得距离生成少数类别的样本。 此方法不同于随机过采样,因为它创建的新示例并不是简单地复制少数类别的数据。 该方法成为处理不平衡数据集的更有效的技术。

# Disable MLflow autologging because you don't want to track SMOTE fitting
import mlflow

mlflow.autolog(disable=True)

from imblearn.combine import SMOTETomek
smt = SMOTETomek(random_state=SEED)
X_train_res, y_train_res = smt.fit_resample(X_train, y_train)

# Plot the counts for both classes
plt.figure(figsize=(4, 2))
ax = sns.countplot(x='IsFail', data=pd.DataFrame({'IsFail': y_train_res.values}))
for p in ax.patches:
    ax.annotate(f'{p.get_height()}', (p.get_x()+0.4, p.get_height()+50))

plt.show()

一张显示样本均衡的图表的截图。

已成功均衡数据集。 现在可以进行模型训练了。

步骤 4:训练和评估模型

MLflow 注册模型、训练和比较各种模型,并选取用于预测的最佳模型。 可以使用以下三个模型进行模型训练:

  • 随机林分类器
  • 逻辑回归分类器
  • XGBoost 分类器

训练随机林分类器

import numpy as np 
from sklearn.ensemble import RandomForestClassifier
from mlflow.models.signature import infer_signature
from sklearn.metrics import f1_score, accuracy_score, recall_score

mlflow.set_experiment("Machine_Failure_Classification")
mlflow.autolog(exclusive=False) # This is needed to override the preconfigured autologging behavior

with mlflow.start_run() as run:
    rfc_id = run.info.run_id
    print(f"run_id {rfc_id}, status: {run.info.status}")
    rfc = RandomForestClassifier(max_depth=5, n_estimators=50)
    rfc.fit(X_train_res, y_train_res) 
    signature = infer_signature(X_train_res, y_train_res)

    mlflow.sklearn.log_model(
        rfc,
        "machine_failure_model_rf",
        signature=signature,
        registered_model_name="machine_failure_model_rf"
    ) 

    y_pred_train = rfc.predict(X_train)
    # Calculate the classification metrics for test data
    f1_train = f1_score(y_train, y_pred_train, average='weighted')
    accuracy_train = accuracy_score(y_train, y_pred_train)
    recall_train = recall_score(y_train, y_pred_train, average='weighted')

    # Log the classification metrics to MLflow
    mlflow.log_metric("f1_score_train", f1_train)
    mlflow.log_metric("accuracy_train", accuracy_train)
    mlflow.log_metric("recall_train", recall_train)

    # Print the run ID and the classification metrics
    print("F1 score_train:", f1_train)
    print("Accuracy_train:", accuracy_train)
    print("Recall_train:", recall_train)    

    y_pred_test = rfc.predict(X_test)
    # Calculate the classification metrics for test data
    f1_test = f1_score(y_test, y_pred_test, average='weighted')
    accuracy_test = accuracy_score(y_test, y_pred_test)
    recall_test = recall_score(y_test, y_pred_test, average='weighted')

    # Log the classification metrics to MLflow
    mlflow.log_metric("f1_score_test", f1_test)
    mlflow.log_metric("accuracy_test", accuracy_test)
    mlflow.log_metric("recall_test", recall_test)

    # Print the classification metrics
    print("F1 score_test:", f1_test)
    print("Accuracy_test:", accuracy_test)
    print("Recall_test:", recall_test)

从输出中可以看到,使用随机森林分类器时,训练和测试数据集的 F1 分数、准确率和召回率均约为 0.9。

训练逻辑回归分类器

from sklearn.linear_model import LogisticRegression

with mlflow.start_run() as run:
    lr_id = run.info.run_id
    print(f"run_id {lr_id}, status: {run.info.status}")
    lr = LogisticRegression(random_state=42)
    lr.fit(X_train_res, y_train_res)
    signature = infer_signature(X_train_res, y_train_res)
  
    mlflow.sklearn.log_model(
        lr,
        "machine_failure_model_lr",
        signature=signature,
        registered_model_name="machine_failure_model_lr"
    ) 

    y_pred_train = lr.predict(X_train)
    # Calculate the classification metrics for training data
    f1_train = f1_score(y_train, y_pred_train, average='weighted')
    accuracy_train = accuracy_score(y_train, y_pred_train)
    recall_train = recall_score(y_train, y_pred_train, average='weighted')

    # Log the classification metrics to MLflow
    mlflow.log_metric("f1_score_train", f1_train)
    mlflow.log_metric("accuracy_train", accuracy_train)
    mlflow.log_metric("recall_train", recall_train)

    # Print the run ID and the classification metrics
    print("F1 score_train:", f1_train)
    print("Accuracy_train:", accuracy_train)
    print("Recall_train:", recall_train)    

    y_pred_test = lr.predict(X_test)
    # Calculate the classification metrics for test data
    f1_test = f1_score(y_test, y_pred_test, average='weighted')
    accuracy_test = accuracy_score(y_test, y_pred_test)
    recall_test = recall_score(y_test, y_pred_test, average='weighted')

    # Log the classification metrics to MLflow
    mlflow.log_metric("f1_score_test", f1_test)
    mlflow.log_metric("accuracy_test", accuracy_test)
    mlflow.log_metric("recall_test", recall_test)

训练 XGBoost 分类器

from xgboost import XGBClassifier

with mlflow.start_run() as run:
    xgb = XGBClassifier()
    xgb_id = run.info.run_id 
    print(f"run_id {xgb_id}, status: {run.info.status}")
    xgb.fit(X_train_res.to_numpy(), y_train_res.to_numpy()) 
    signature = infer_signature(X_train_res, y_train_res)
  
    mlflow.xgboost.log_model(
        xgb,
        "machine_failure_model_xgb",
        signature=signature,
        registered_model_name="machine_failure_model_xgb"
    ) 

    y_pred_train = xgb.predict(X_train)
    # Calculate the classification metrics for training data
    f1_train = f1_score(y_train, y_pred_train, average='weighted')
    accuracy_train = accuracy_score(y_train, y_pred_train)
    recall_train = recall_score(y_train, y_pred_train, average='weighted')

    # Log the classification metrics to MLflow
    mlflow.log_metric("f1_score_train", f1_train)
    mlflow.log_metric("accuracy_train", accuracy_train)
    mlflow.log_metric("recall_train", recall_train)

    # Print the run ID and the classification metrics
    print("F1 score_train:", f1_train)
    print("Accuracy_train:", accuracy_train)
    print("Recall_train:", recall_train)    

    y_pred_test = xgb.predict(X_test)
    # Calculate the classification metrics for test data
    f1_test = f1_score(y_test, y_pred_test, average='weighted')
    accuracy_test = accuracy_score(y_test, y_pred_test)
    recall_test = recall_score(y_test, y_pred_test, average='weighted')

    # Log the classification metrics to MLflow
    mlflow.log_metric("f1_score_test", f1_test)
    mlflow.log_metric("accuracy_test", accuracy_test)
    mlflow.log_metric("recall_test", recall_test)

步骤 5:选择最佳模型并预测输出

在上一部分中,你训练了三个不同的分类器:随机林、逻辑回归和 XGBoost。 现在可以选择以编程方式访问结果或使用用户界面(UI)。

对于 UI 路径选项,请导航到工作区并筛选模型。

筛选器的屏幕截图,其中选择了模型。

选择单个模型,了解模型性能的详细信息。

模型性能详细信息的屏幕截图。

此示例演示如何通过 MLflow 以编程方式访问模型:

runs = {'random forest classifier':   rfc_id,
        'logistic regression classifier': lr_id,
        'xgboost classifier': xgb_id}

# Create an empty DataFrame to hold the metrics
df_metrics = pd.DataFrame()

# Loop through the run IDs and retrieve the metrics for each run
for run_name, run_id in runs.items():
    metrics = mlflow.get_run(run_id).data.metrics
    metrics["run_name"] = run_name
    df_metrics = df_metrics.append(metrics, ignore_index=True)

# Print the DataFrame
print(df_metrics)

尽管 XGBoost 在训练集上生成了最佳结果,但它在测试数据集上表现不佳。 性能不佳意味着过度拟合。 逻辑回归分类器在训练数据集和测试数据集上表现不佳。 总体而言,随机林在训练性能和避免过度拟合之间取得了良好的平衡。

在下一部分中,选择已注册的随机林模型,并使用 PREDICT 功能执行预测:

from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(X_test.columns),
    outputCol='predictions',
    modelName='machine_failure_model_rf',
    modelVersion=1
)

使用创建的用于加载模型进行推理的 MLFlowTransformer 对象,请使用转换器 API 对测试数据集上的模型评分:

predictions = model.transform(spark.createDataFrame(X_test))
predictions.show()

此表显示输出:

类型 空气温度_[K] 过程温度_[K] 转速_[rpm] Torque_[Nm] Tool_wear_[min] 预测
0 300.6 309.7 1639.0 30.4 121.0 0
0 303.9 313.0 1551.0 36.8 140.0 0
1 299.1 308.6 1491.0 38.5 166.0 0
0 300.9 312.1 1359.0 51.7 146.0 1
0 303.7 312.6 1621.0 38.8 182.0 0
0 299.0 310.3 1868.0 24.0 221.0 1
2 297.8 307.5 1631.0 31.3 124.0 0
0 297.5 308.2 1327.0 56.5 189.0 1
0 301.3 310.3 1460.0 41.5 197.0 0
2 297.6 309.0 1413.0 40.2 51.0 0
1 300.9 309.4 1724.0 25.6 119.0 0
0 303.3 311.3 1389.0 53.9 39.0 0
0 298.4 307.9 1981.0 23.2 16.0 0
0 299.3 308.8 1636.0 29.9 201.0 0
1 298.1 309.2 1460.0 45.8 80.0 0
0 300.0 309.5 1728.0 26.0 37.0 0
2 299.0 308.7 1940.0 19.9 98.0 0
0 302.2 310.8 1383.0 46.9 45.0 0
0 300.2 309.2 1431.0 51.3 57.0 0
0 299.6 310.2 1468.0 48.0 9.0 0

将数据保存到湖屋中。 然后,数据可供以后使用 ,例如 Power BI 仪表板。

# Save test data to the lakehouse for use in the next section. 
table_name = "predictive_maintenance_test_with_predictions"
predictions.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

步骤 6:通过 Power BI 中的可视化效果查看商业智能

使用 Power BI 仪表板以脱机格式显示结果。

Power BI 仪表板中显示的数据的屏幕截图。

仪表板显示,Tool_wearTorque 在失败和未失败的事例之间创建明显的边界,如步骤 2 中先前的相关分析的预期一样。