你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用带有数据流图的 WebAssembly (WASM)(预览版)

重要

带有数据流图的 WebAssembly (WASM) 现在为预览版。 此功能存在限制,不适用于生产工作负载。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

Azure IoT 操作数据流图支持 WebAssembly (WASM) 模块,便于在边缘进行自定义数据处理。 可以将自定义业务逻辑和数据转换部署为数据流管道的一部分。

小窍门

想要在带内运行 AI? 请参阅 WebAssembly 数据流图中的“运行 ONNX 推理 ”,以打包和执行 WASM 运算符内的小型 ONNX 模型。

重要

数据流图目前仅支持 MQTT、Kafka 和 OpenTelemetry 终结点。 不支持其他终结点类型,例如 Data Lake、Microsoft Fabric OneLake、Azure 数据资源管理器和本地存储。 有关详细信息,请参阅 已知问题

先决条件

  • 在已启用 Arc 的 Kubernetes 群集上部署 Azure IoT 操作实例版本 1.2 预览版或更高版本。 有关详细信息,请参阅部署 Azure IoT 操作
  • 使用 Azure 容器注册表 (ACR) 存储 WASM 模块和图形。
  • 安装 OCI 注册表即存储 (ORAS) CLI 以将 WASM 模块推送到注册表。
  • 按照为数据流图开发 WebAssembly 模块中的指南开发自定义 WASM 模块。

概述

通过 Azure IoT 操作数据流图中的 WebAssembly (WASM) 模块,可在边缘以高性能和高安全性处理数据。 WASM 在沙盒环境中运行,并支持 Rust 和 Python。

WASM 数据流图的工作原理

WASM 数据流实现遵循以下工作流:

  1. 开发 WASM 模块:用受支持的语言编写自定义处理逻辑,并将其编译为 WebAssembly 组件模型格式。
  2. 开发图定义:使用 YAML 配置文件定义数据在模块中移动的方式。 有关详细信息,请参阅配置 WebAssembly 图定义
  3. 在注册表中存储项目:使用与 OCI 兼容的工具(如 ORAS)将编译后的 WASM 模块推送到容器注册表。
  4. 配置注册表终结点:设置身份验证和连接详细信息,以便 Azure IoT 操作能够访问容器注册表。
  5. 创建数据流:定义数据源、项目名称和目标。
  6. 部署和执行:Azure IoT 操作从注册表拉取 WASM 模块,并根据图定义运行它们。

通过示例入门

这些示例演示如何为常见场景设置和部署 WASM 数据流图。 这些示例使用硬编码的值和简化的配置,使你能够快速入门。

设置容器注册表

Azure IoT 操作需要容器注册表来拉取 WASM 模块和图定义。 可以使用 Azure 容器注册表 (ACR) 或其他兼容 OCI 的注册表。

若要创建和配置 Azure 容器注册表,请参阅部署 Azure 容器注册表

安装 ORAS CLI

使用 ORAS CLI 将 WASM 模块和图定义推送到容器注册表。 有关安装说明,请参阅安装 ORAS

从公共注册表拉取示例模块

对于此预览版,请使用预生成的示例模块:

# Pull sample modules and graphs
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-simple:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-complex:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/temperature:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/window:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/snapshot:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/format:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/humidity:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/collection:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/enrichment:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/filter:1.0.0

将模块推送到注册表

获取示例模块和图形后,将其推送到容器注册表。 将 <YOUR_ACR_NAME> 替换为您的 Azure 容器注册表的名称。

# Log in to your ACR
az acr login --name <YOUR_ACR_NAME>

# Push modules to your registry
oras push <YOUR_ACR_NAME>.azurecr.io/graph-simple:1.0.0 graph-simple.yaml
oras push <YOUR_ACR_NAME>.azurecr.io/graph-complex:1.0.0 graph-complex.yaml
oras push <YOUR_ACR_NAME>.azurecr.io/temperature:1.0.0 temperature-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/window:1.0.0 window-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/snapshot:1.0.0 snapshot-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/format:1.0.0 format-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/humidity:1.0.0 humidity-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/collection:1.0.0 collection-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/enrichment:1.0.0 enrichment-1.0.0.wasm
oras push <YOUR_ACR_NAME>.azurecr.io/filter:1.0.0 filter-1.0.0.wasm

小窍门

还可以推送自己的模块并创建自定义图形,请参阅 自定义数据流图的配置

创建注册表终结点

注册表终结点定义了与容器注册表的连接。 数据流图使用注册表终结点从容器注册表拉取 WASM 模块和图定义。 若要详细了解如何使用不同的身份验证方法和注册表类型配置注册表终结点,请参阅配置注册表终结点

若要使用 Azure 容器注册表快速设置,请使用系统分配的托管标识身份验证创建注册表终结点:

目前,无法在运营环境中创建注册表端点。 必须使用 Bicep 或 Kubernetes 配置文件。 创建注册表终结点后,您推送到容器注册表的图表已准备好在数据流图中的操作体验中使用。

注释

可以跨多个数据流图和其他 Azure IoT 操作组件(如 Akri 连接器)重复使用注册表终结点。

获取扩展名称

# Get extension name
az k8s-extension list \
  --resource-group <RESOURCE_GROUP> \
  --cluster-name <CLUSTER_NAME> \
  --cluster-type connectedClusters \
  --query "[?extensionType=='microsoft.iotoperations'].name" \
  --output tsv

第一个命令会返回扩展名称(例如,azure-iot-operations-4gh3y)。

配置托管标识权限

要让 Azure IoT 操作从容器注册表拉取 WASM 模块,请为托管标识提供适当的权限。 IoT 操作扩展使用系统分配的托管标识,该标识需要对 Azure 容器注册表具有 AcrPull 角色。 确保满足以下先决条件:

  • 对 Azure 容器注册表具有所有者权限。
  • 容器注册表可以位于不同的资源组或订阅中,但必须与 IoT 操作部署位于同一租户中。

运行以下命令,将 AcrPull 角色分配给 IoT 操作托管标识:

# Get the IoT Operations extension managed identity
export EXTENSION_OBJ_ID=$(az k8s-extension list --cluster-name $CLUSTER_NAME -g $RESOURCE_GROUP --cluster-type connectedClusters --query "[?extensionType=='microsoft.iotoperations'].identity.principalId" -o tsv)

# Get the application ID for the managed identity
export SYSTEM_ASSIGNED_MAN_ID=$(az ad sp show --id $EXTENSION_OBJ_ID --query "appId" -o tsv)

# Assign the AcrPull role to the managed identity
az role assignment create --role "AcrPull" --assignee $SYSTEM_ASSIGNED_MAN_ID --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.ContainerRegistry/registries/$ACR_NAME"

有关容器注册表角色的详细信息,请参阅 Azure 容器注册表角色和权限

如果使用 Azure CLI 时遇到身份验证错误,可在 Azure 门户中分配权限:

  1. 在 Azure 门户中转到 Azure 容器注册表。
  2. 从菜单中选择“访问控制(IAM)”。
  3. 选择添加>添加角色分配
  4. 选择 AcrPull 内置角色。
  5. 对于“将访问权限分配给”,选择“用户、组或服务主体”。
  6. 搜索并选择 IoT 操作扩展名称(例如 azure-iot-operations-4gh3y)。
  7. 选择“保存”以完成角色分配。

有关详细说明,请参阅使用 Azure 门户分配 Azure 角色

示例 1:使用一个 WASM 模块进行基本部署

此示例使用 WASM 模块将温度数据的单位从华氏度转换为摄氏度。 可在 GitHub 上获取温度模块源代码。 使用推送到容器注册表的预编译版本 graph-simple:1.0.0

工作原理

图定义创建一个简单的三阶段管道:

  1. :从 MQTT 接收温度数据
  2. 映射:使用温度 WASM 模块处理数据
  3. 接收器:将转换后的数据发送回 MQTT

若要详细了解简单图定义的工作原理及其结构,请参阅示例 1:简单图定义

输入格式:

{"temperature": {"value": 100.0, "unit": "F"}}

输出格式:

{"temperature": {"value": 37.8, "unit": "C"}}

以下配置会创建使用此温度转换管道的数据流图。 该图会引用 graph-simple:1.0.0 项目,该项目包含 YAML 定义,并从容器注册表拉取温度模块。

配置数据流图

此配置定义了实现温度转换工作流的三个节点:一个订阅传入温度数据的源节点、一个运行 WASM 模块的图处理节点、一个发布转换后的结果的目标节点。

数据流图资源会“包装”图定义项目,并将其抽象源/接收器操作连接到具体终结点:

  • 图定义的 source 操作连接到数据流的源节点(MQTT 主题)
  • 图定义的 sink 操作连接到数据流的目标节点(MQTT 主题)
  • 图定义的处理操作在图形处理节点中运行

通过这种分离,可以跨环境使用不同的终结点部署同一图定义,同时使处理逻辑保持不变。

  1. 若要在 作体验中创建数据流图,请转到 “数据流 ”选项卡。

  2. 选择+创建旁边的下拉菜单,然后选择“创建数据流图

    作体验接口的屏幕截图,其中显示了如何创建数据流图。

  3. 选择占位符名称 new-data-flow 以设置数据流属性。 输入数据流图的名称,然后选择要使用的数据流配置文件。

  4. 在数据流关系图中,选择“ ”以配置源节点。 在 “源详细信息”下,选择“ 资产 ”或 “数据流终结点”。

    作体验界面的屏幕截图,其中显示了如何为数据流图选择源。

    1. 如果选择 “资产”,请选择要从中提取数据的资产,然后单击“ 应用”。

    2. 如果选择 “数据流终结点”,请输入以下详细信息,然后单击“ 应用”。

      设置 Description
      数据流终结点 选择“默认”以使用默认的 MQTT 消息代理终结点。
      主题 用于订阅传入消息的主题筛选器。 使用“主题”“添加行”以添加多个主题。>
      消息架构 用于反序列化传入消息的架构。
  5. 在数据流关系图中,选择“ 添加图形转换”(可选) 以添加图形处理节点。 在 “图形选择 ”窗格中,选择 图形简单:1 ,然后单击“ 应用”。

    作体验界面的屏幕截图,其中显示了如何创建简单的数据流图。

    重要

    此示例使用推送到容器注册表graph-simple:1.0.0 项目。 可以通过 开发自己的 WASM 模块 并将其推送到容器注册表来创建自定义图形。 推送到容器注册表的图形可在 “图形选择 ”窗格中使用。

  6. 可以通过选择关系图中的图形节点来配置某些图形运算符设置。 例如,可以选择 模块温度/地图 运算符并输入 key2example-value-2。 单击 “应用” 保存更改。

    作体验界面的屏幕截图,其中显示了如何配置简单的数据流图。

  7. 在数据流关系图中,选择 “目标 ”以配置目标节点。

  8. 在数据流图名称下选择 “保存” 以保存数据流图。

测试数据流

若要测试数据流,请从群集内发送 MQTT 消息。 首先,按照使用 MQTT 客户端测试与 MQTT 代理的连接的说明部署 MQTT 客户端 Pod。 MQTT 客户端提供用于连接到代理的身份验证令牌和证书。 若要部署 MQTT 客户端,请运行以下命令:

kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/samples/quickstarts/mqtt-client.yaml

发送温度消息

在第一个终端会话中,创建并运行脚本以发送华氏度单位的温度数据:

# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
# Create and run temperature.sh from within the MQTT client pod
while true; do
  # Generate a random temperature value between 0 and 6000 Fahrenheit
  random_value=$(shuf -i 0-6000 -n 1)
  payload="{\"temperature\":{\"value\":$random_value,\"unit\":\"F\"}}"

  echo "Publishing temperature: $payload"

  # Publish to the input topic
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$payload" \
    -t "sensor/temperature/raw" \
    -d \
    --cafile /var/run/certs/ca.crt \
    -D PUBLISH user-property __ts $(date +%s)000:0:df \
    -D CONNECT authentication-method 'K8S-SAT' \
    -D CONNECT authentication-data $(cat /var/run/secrets/tokens/broker-sat)

  sleep 1
done'

注释

MQTT 用户属性 __ts 用于向消息添加时间戳,以确保使用混合逻辑时钟 (HLC) 及时处理消息。 使用时间戳有助于数据流决定是接受还是删除消息。 属性的格式为 <timestamp>:<counter>:<nodeid>。 它使数据流处理更加准确,但处理不是必需的。

该脚本每秒将随机温度数据发布到 sensor/temperature/raw 主题。 它看起来应该如下所示:

Publishing temperature: {"temperature":{"value":1234,"unit":"F"}}
Publishing temperature: {"temperature":{"value":5678,"unit":"F"}}

使脚本保持运行状态以继续发布温度数据。

订阅已处理的消息

在第二个终端会话(也连接到 MQTT 客户端 Pod)中,订阅输出主题以查看转换后的温度值:

# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "sensor/temperature/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'

你会看到 WASM 模块将温度数据的单位从华氏度转换为摄氏度。

{"temperature":{"value":1292.2222222222222,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
{"temperature":{"value":203.33333333333334,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}

示例 2:部署复杂图

此示例演示了一个复杂的数据处理工作流,它会处理温度、湿度和图像数据等多种数据类型。 复杂图定义可协调多个 WASM 模块来执行高级分析和对象检测。

工作原理

复杂图可处理三个数据流,并将其组合到扩充的传感器分析中:

  • 温度处理:将华氏度转换为摄氏度、筛选无效读数并计算统计信息
  • 湿度处理:在时间间隔内累积湿度度量
  • 图像处理:对相机快照执行对象检测并设置结果格式

若要详细了解复杂图定义的工作原理、其结构及经历多个处理阶段的数据流,请参阅示例 2:复杂图定义

该图使用 Rust 示例中的专用模块。

配置复杂数据流图

此配置使用 graph-complex:1.0.0 项目实现多传感器处理工作流。 请注意,数据流图部署与示例 1 相似 - 即使处理逻辑不同,也使用相同的三节点模式(源、图处理器、目标)。

发生这种相似性的原因是数据流图资源充当加载和执行图定义的主机环境。 实际处理逻辑驻留在图定义项目(graph-simple:1.0.0graph-complex:1.0.0)中,其中包含 WASM 模块之间的操作和连接的 YAML 规范。 数据流图资源提供运行时基础结构来拉取项目、实例化模块,并通过定义的工作流路由数据。

  1. 若要在 作体验中创建数据流图,请转到 “数据流 ”选项卡。

  2. 选择+创建旁边的下拉菜单,然后选择“创建数据流图

    作体验界面的屏幕截图,其中显示了如何创建数据流复杂图。

  3. 选择占位符名称 new-data-flow 以设置数据流属性。 输入数据流图的名称,然后选择要使用的数据流配置文件。

  4. 在数据流关系图中,选择“ ”以配置源节点。 在 “源详细信息”下,选择“ 资产 ”或 “数据流终结点”。

    作体验界面的屏幕截图,其中显示了如何为数据流图选择源。

    1. 如果选择 “资产”,请选择要从中提取数据的资产,然后单击“ 应用”。

    2. 如果选择 “数据流终结点”,请输入以下详细信息,然后单击“ 应用”。

      设置 Description
      数据流终结点 选择“默认”以使用默认的 MQTT 消息代理终结点。
      主题 用于订阅传入消息的主题筛选器。 使用“主题”“添加行”以添加多个主题。>
      消息架构 用于反序列化传入消息的架构。
  5. 在数据流关系图中,选择“ 添加图形转换”(可选) 以添加图形处理节点。 在 “图形选择 ”窗格中,选择 图形复杂:1 ,然后单击“ 应用”。

    作体验界面的屏幕截图,其中显示了如何创建复杂的数据流图。

    重要

    此示例使用推送到容器注册表graph-complex:1.0.0 项目。 可以通过 开发自己的 WASM 模块 并将其推送到容器注册表来创建自定义图形。 推送到容器注册表的图形可在 “图形选择 ”窗格中使用。

  6. 可以通过选择关系图中的图形节点来配置某些图形运算符设置。

    操作体验界面的屏幕截图,展示了如何配置复杂的数据流图。

    Operator Description
    module-snapshot/branch 配置 snapshot 模块以对图像执行对象检测。 可以设置 snapshot_topic 配置键来指定图像数据的输入主题。
    模块-温度/映射 key2 温度值转换为另一种标度。
  7. 单击 “应用” 保存更改。

  8. 在数据流关系图中,选择 “目标 ”以配置目标节点。

  9. 在数据流图名称下选择 “保存” 以保存数据流图。

测试复杂数据流

需要获取源数据设置,然后才能看到输出。

将 RAW 图像文件上传到 mqtt-client Pod

这些图像文件用于 snapshot 模块来检测图像中的对象。 可在 GitHub 上的 images 文件夹中找到它们。

首先,克隆存储库以获取对图像文件的访问权限:

git clone https://github.com/Azure-Samples/explore-iot-operations.git
cd explore-iot-operations

若要将 RAW 图像文件从 ./samples/wasm/images 文件夹上传到 mqtt-client Pod,可使用以下命令:

kubectl cp ./samples/wasm/images azure-iot-operations/mqtt-client:/tmp

检查文件是否已上传:

kubectl exec -it mqtt-client -n azure-iot-operations -- ls /tmp/images

应该会在 /tmp/images 文件夹中看到文件列表。

beaker.raw          laptop.raw          sunny2.raw
binoculars.raw      lawnmower.raw       sunny4.raw
broom.raw           milkcan.raw         thimble.raw
camera.raw          photocopier.raw     tripod.raw
computer_mouse.raw  radiator.raw        typewriter.raw
daisy3.raw          screwdriver.raw     vacuum_cleaner.raw
digital_clock.raw   sewing_machine.raw
hammer.raw          sliding_door.raw

发布模拟温度、湿度数据并发送图像

可将发布温度、湿度数据以及发送图像的命令组合到单个脚本中。 使用以下命令:

# Connect to the MQTT client pod and run the script
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
while true; do 
  # Generate a random temperature value between 0 and 6000
  temp_value=$(shuf -i 0-6000 -n 1)
  temp_payload="{\"temperature\":{\"value\":$temp_value,\"unit\":\"F\"}}"
  echo "Publishing temperature: $temp_payload"
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$temp_payload" \
    -t "sensor/temperature/raw" \
    --cafile /var/run/certs/ca.crt \
    -D CONNECT authentication-method "K8S-SAT" \
    -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
    -D PUBLISH user-property __ts $(date +%s)000:0:df

  # Generate a random humidity value between 30 and 90
  humidity_value=$(shuf -i 30-90 -n 1)
  humidity_payload="{\"humidity\":{\"value\":$humidity_value}}"
  echo "Publishing humidity: $humidity_payload"
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$humidity_payload" \
    -t "sensor/humidity/raw" \
    --cafile /var/run/certs/ca.crt \
    -D CONNECT authentication-method "K8S-SAT" \
    -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
    -D PUBLISH user-property __ts $(date +%s)000:0:df

  # Send an image every 2 seconds
  if [ $(( $(date +%s) % 2 )) -eq 0 ]; then
    file=$(ls /tmp/images/*.raw | shuf -n 1)
    echo "Sending file: $file"
    mosquitto_pub -h aio-broker -p 18883 \
      -f $file \
      -t "sensor/images/raw" \
      --cafile /var/run/certs/ca.crt \
      -D CONNECT authentication-method "K8S-SAT" \
      -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
      -D PUBLISH user-property __ts $(date +%s)000:0:df
  fi

  # Wait for 1 second before the next iteration
  sleep 1
done'

检查输出

在新终端中,订阅输出主题:

kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "analytics/sensor/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'

输出应如下所示

{"temperature":[{"count":9,"max":2984.4444444444443,"min":248.33333333333337,"average":1849.6296296296296,"last":2612.222222222222,"unit":"C","overtemp":true}],"humidity":[{"count":10,"max":76.0,"min":30.0,"average":49.7,"last":38.0}],"object":[{"result":"milk can; broom; screwdriver; binoculars, field glasses, opera glasses; toy terrier"}]}
{"temperature":[{"count":10,"max":2490.5555555555557,"min":430.55555555555554,"average":1442.6666666666667,"last":1270.5555555555557,"unit":"C","overtemp":true}],"humidity":[{"count":9,"max":87.0,"min":34.0,"average":57.666666666666664,"last":42.0}],"object":[{"result":"broom; Saint Bernard, St Bernard; radiator"}]}

此处,输出包含温度和湿度数据,还包含在图像中检测到的对象。

开发自定义 WASM 模块

若要为数据流图创建自定义数据处理逻辑,请在 Rust 或 Python 中开发 WebAssembly 模块。 使用自定义模块,可以实现内置运算符中不可用的专用业务逻辑、数据转换和分析。

有关全面的开发指南,包括:

  • 设置开发环境
  • 在 Rust 和 Python 中创建运算符
  • 了解数据模型和接口
  • 生成和测试模块

请参阅为数据流图开发 WebAssembly 模块

若要详细了解如何创建和配置可定义数据处理工作流的 YAML 图定义,请参阅配置 WebAssembly 图定义

自定义数据流图的配置

本部分详细说明如何使用 WASM 模块配置数据流图。 它涵盖所有配置选项、数据流终结点和高级设置。

数据流图概述

数据流图定义了数据流如何流经 WebAssembly 模块进行处理。 每个图都包含:

  • 控制图是启用还是禁用的模式
  • 链接到定义缩放和资源设置的数据流配置文件的配置文件引用
  • 可选择性地为图状态启用持久存储的磁盘持久性
  • 定义源、处理和目标组件的节点
  • 指定节点之间数据流动的方式的节点连接

模式配置

模式属性可确定数据流图是否正在主动处理数据。 可以将模式设置为 EnabledDisabled(不区分大小写)。 禁用后,图形将停止处理数据,但会保留其配置。

创建或编辑数据流图时,在“数据流 属性 ”窗格中,可以选中“ 启用数据流 ”为 “是 ”以将模式设置为 Enabled“是”。 如果将其保留为未选中状态,则模式设置为 Disabled

作体验界面的屏幕截图,其中显示了如何启用或禁用模式配置。

配置文件引用

配置文件引用将数据流图连接到数据流配置文件,该配置文件定义了缩放设置、实例计数和资源限制。 如果未指定配置文件引用,必须改用 Kubernetes 所有者引用。 大多数场景使用 Azure IoT 操作提供的默认配置文件。

创建或编辑数据流图时,在 “数据流属性 ”窗格中,选择数据流配置文件。 默认选择默认数据流配置文件。 有关数据流配置文件的详细信息,请参阅 “配置数据流配置文件”。

重要

只有在创建数据流图时才能选择数据流配置文件。 创建数据流图后,无法更改数据流配置。 如果您想更改现有数据流图的数据流配置文件,请删除原始数据流图,并使用新的数据流配置文件创建新的数据流图。

请求磁盘持久性

重要

数据流图的磁盘持久性存在一个已知问题。 此功能当前未按预期工作。 有关详细信息,请参阅 已知问题

请求磁盘持久性使数据流图在重启时能够保持状态不变。 启用此功能时,如果连接的代理重启,图形可恢复“正在处理”状态。 此功能对于有状态处理场景非常有用;在该场景中,丢失中间数据会造成问题。 启用请求磁盘持久性时,代理会将 MQTT 数据(例如订阅服务器队列中的消息)保存到磁盘。 此方法可确保数据流的数据源在断电或代理重启期间不会发生数据丢失。 代理保持最佳性能,因为每个数据流都配置了持久性,所以只有需要持久性的数据流才使用此功能。

数据流图使用 MQTTv5 用户属性在订阅期间发出此持久性请求。 此功能仅在以下情况下有效:

  • 数据流使用 MQTT 代理作为源(具有 MQTT 终结点的源节点)
  • MQTT 代理已启用持久性,其中对于订阅服务器队列等数据类型,动态持久性模式设置为 Enabled

此配置使 MQTT 客户端(如数据流图)能够使用 MQTTv5 用户属性为其订阅请求磁盘持久性。 有关详细的 MQTT 代理持久性配置,请参阅配置 MQTT 代理持久性

该设置接受 EnabledDisabled,其中默认值为 Disabled

创建或编辑数据流图时,在 “数据流属性 ”窗格中,可以将 请求数据暂留 检查为 “是 ”,以将请求磁盘持久性设置为 Enabled“ 。 如果将其保留为未选中状态,则设置为 Disabled

节点配置

节点是数据流图的构建基块。 每个节点在图形中都有唯一的名称,并执行特定函数。 有三种类型的节点:

源节点

源节点定义数据进入图形的位置。 它们连接到从 MQTT 代理或 Kafka 主题接收数据的数据流终结点。 每个源节点必须指定:

  • 指向已配置的数据流终结点的终结点引用
  • 作为要订阅的 MQTT 主题或 Kafka 主题列表的数据源
  • 链接到用于架构推理的 Azure 设备注册表资产的资产引用(可选)

通过数据源数组可订阅多个主题,而无需修改终结点配置。 这种灵活性使得可在不同的数据流中重用终结点。

注释

目前,仅支持 MQTT 和 Kafka 终结点作为数据流图的数据源。 有关详细信息,请参阅 配置数据流终结点

在数据流关系图中,选择“ ”以配置源节点。 在 “源详细信息”下,选择 “数据流终结点”,然后使用 “主题 ”字段指定要订阅的传入消息的 MQTT 主题筛选器。 可以通过选择“添加行”并输入新主题来添加多个 MQTT 主题。

图形处理节点

图形处理节点包含用于转换数据的 WebAssembly 模块。 这些节点从容器注册表中提取 WASM 项目,并使用指定的配置参数执行它们。 每个图形节点都需要:

  • 指向用于拉取项目的注册表终结点的注册表终结点引用
  • 用于定义要拉取的模块名称和版本的项目规范
  • 作为传递到 WASM 模块的键值对的配置参数

通过配置数组可自定义模块行为,而无需重新生成 WASM 项目。 常见配置选项包括处理参数、阈值、转换设置和功能标志。

在数据流关系图中,选择“ 添加图形转换”(可选) 以添加图形处理节点。 在 “图形选择 ”窗格中,选择所需的图形项目(简单或复杂的图形)并单击“ 应用”。 可以通过选择关系图中的图形节点来配置某些图形运算符设置。

配置键值对在运行时会传递到 WASM 模块。 模块可以访问这些值来自定义其行为。 此方法允许:

  • 使用不同的配置部署同一个 WASM 模块
  • 在不重新生成模块的情况下调整处理参数
  • 根据部署要求启用或禁用功能
  • 设置特定于环境的值,例如阈值或终结点

目标节点

目标节点定义发送已处理数据的位置。 它们连接到将数据发送到 MQTT 代理、云存储或其他系统的数据流终结点。 每个目标节点均指定:

  • 指向已配置的数据流终结点的终结点引用
  • 作为输出数据的特定主题、路径或位置的数据目标
  • 定义序列化格式和架构验证的输出架构设置(可选)

对于 Azure Data Lake 或 Fabric OneLake 等存储目标,可以指定输出架构设置来控制数据的序列化和验证方式。

注释

目前,仅支持 MQTT、Kafka 和 OpenTelemetry 终结点作为数据流图的数据目标。 有关详细信息,请参阅 配置数据流终结点

  1. 在数据流关系图中,选择 “目标 ”节点。
  2. “数据流终结点详细信息 ”下拉列表中选择所需的数据流终结点。
  3. 选择“继续”以配置目标。
  4. 输入目标 所需的设置 ,包括要将数据发送到的主题或表。 数据目标字段根据终结点类型自动解释。 例如,如果数据流终结点是 MQTT 终结点,则目标详细信息页会提示你输入主题。

节点连接

节点连接定义节点之间的数据流路径。 每个连接指定源节点和目标节点,从而创建处理管道。 连接可选择性地包含架构验证,确保处理阶段之间的数据完整性。

指定架构验证时,系统会在数据于节点之间传输时验证数据格式和结构。 验证有助于尽早捕获数据不一致的情况,并确保 WASM 模块以预期格式接收数据。

当您选择图处理节点时,操作体验会自动创建节点连接。 创建图形后,无法修改连接。

数据流终结点

数据流图通过数据流终结点连接到外部系统。 终结点类型确定它是否可用作源和/或目标:

MQTT 终结点

MQTT 终结点可以充当源和目标。 它们连接到 MQTT 代理,包括:

  • Azure IoT 操作本地 MQTT 代理(在每个数据流中都是必需的)
  • Azure 事件网格 MQTT
  • 自定义 MQTT 代理

有关详细配置信息,请参阅配置 MQTT 数据流终结点

Kafka 终结点

Kafka 终结点可以充当源和目标。 它们连接到与 Kafka 兼容的系统,包括:

  • Azure 事件中心(与 Kafka 兼容)
  • Apache Kafka 群集
  • Confluent Cloud

有关详细配置信息,请参阅配置 Azure 事件中心和 Kafka 数据流终结点

存储终结点

存储终结点只能用作目标。 它们连接到云存储系统进行长期数据保留和分析:

  • Azure Data Lake Storage
  • Microsoft Fabric OneLake
  • 本地存储

存储终结点通常需要输出架构设置来定义数据序列化格式。

注册表终结点

注册表终结点提供对容器注册表的访问权限,用于拉取 WASM 模块和图定义。 它们不直接在数据流中使用,而是由图形处理节点引用。

有关详细配置信息,请参阅配置注册表终结点