你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
重要
数据流图的 WebAssembly (WASM) 开发处于 预览状态。 此功能有限制,不适用于生产工作负荷。
有关适用于 Beta 版、预览版或尚未正式发布的 Azure 功能的法律条款,请参阅 适用于 Microsoft azure 预览版的补充使用条款 。
本文介绍如何为 Azure IoT作数据流图开发自定义 WebAssembly (WASM) 模块和图形定义。 在 Rust 或 Python 中创建模块以实现自定义处理逻辑。 定义图形配置,指定模块如何连接到完整的处理工作流。
重要
数据流图目前仅支持 MQTT、Kafka 和 OpenTelemetry 终结点。 不支持其他终结点类型,例如 Data Lake、Microsoft Fabric OneLake、Azure 数据资源管理器和本地存储。 有关详细信息,请参阅 已知问题。
概述
Azure IoT作数据流图通过实现为 WebAssembly 模块的可配置运算符处理流数据。 每个操作员在维护时态顺序时处理时间戳数据,从而启用具有确定性结果的实时分析。
主要优势
- 实时处理:以一致的低延迟处理流数据
- 事件时间语义:根据事件发生的时间处理数据,而不是处理事件时的数据
- 容错:内置支持处理故障并确保数据一致性
- 可伸缩性:在维护订单保证的同时跨多个节点分配处理
- 多语言支持:使用一致的接口在 Rust 或 Python 中开发
体系结构基础
数据流图基于 “及时数据流 ”计算模型构建,该模型源自Microsoft Research 的 Naiad 项目。 此方法可确保:
- 确定性处理:同一输入始终生成相同的输出
- 进度跟踪:系统知道计算何时完成
- 分布式协调:多个处理节点保持同步
为什么及时数据流?
传统的流处理系统存在一些挑战。 无序数据意味着事件可以晚于预期到达。 部分结果使得计算完成时很难知道。 同步分布式处理时发生协调问题。
通过以下方法及时数据流解决这些问题:
时间戳和进度跟踪
每个数据项都带有表示其逻辑时间的时间戳。 系统通过时间戳跟踪进度,启用多个关键功能:
- 确定性处理:相同的输入始终生成相同的输出
- 完全一次语义:没有重复处理或错过处理
- 水印:知道何时没有更多数据将到达给定时间
混合逻辑时钟
时间戳机制使用混合方法:
pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}
混合逻辑时钟方法可确保多种功能:
- 因果排序:效果遵循原因
- 进度保证:系统知道何时完成处理
- 分布式协调:多个节点保持同步
了解运算符和模块
了解运算符和模块之间的区别对于 WASM 开发至关重要:
运营商
运算符是基于 “及时”数据流运算符的基本处理单元。 每个运算符类型都提供特定用途:
- 映射:转换每个数据项(如转换温度单位)
- 筛选器:仅允许某些数据项根据条件传递(例如删除无效读取)
- 分支:根据条件将数据路由到不同的路径(例如分离温度和湿度数据)
- 累积:在时间范围内收集和聚合数据(例如计算统计摘要)
- 连接:在保留临时顺序时合并多个数据流
- 延迟:通过推进时间戳来控制计时
模块
模块是运算符逻辑作为 WASM 代码的实现。 单个模块可以实现多个运算符类型。 例如,温度模块可能提供:
- 用于单元转换的地图运算符
- 用于阈值检查的筛选器运算符
- 路由决策的分支操作员
- 统计聚合的累积运算符
关系
图形定义、模块和运算符之间的关系遵循特定模式:
Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
通过分离,可以:
- 模块重用:在不同的图形配置中部署同一 WASM 模块
- 独立版本控制:在不重新生成模块的情况下更新图形定义
- 动态配置:将不同的参数传递给同一模块以执行不同的行为
先决条件
选择开发语言并设置所需的工具:
- 
              Rust 工具链:使用: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
- 
              WASM 目标:添加: rustup target add wasm32-wasip2
- 
              生成工具:使用: cargo install wasm-tools --version '=1.201.0' --locked
配置开发环境
WASM Rust SDK 通过自定义 Azure DevOps 注册表提供。 通过设置以下环境变量来配置访问权限:
export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"
export CARGO_NET_GIT_FETCH_WITH_CLI=true
将以下环境变量添加到 shell 配置文件进行持久访问:
echo 'export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"' >> ~/.bashrc
echo 'export CARGO_NET_GIT_FETCH_WITH_CLI=true' >> ~/.bashrc
source ~/.bashrc
创建项目
首先,为操作员模块创建新的项目目录。 项目结构取决于所选语言。
cargo new --lib temperature-converter
cd temperature-converter
配置 Cargo.toml
              Cargo.toml编辑文件以包含 WASM SDK 和其他库的依赖项:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
tinykube_wasm_sdk = { version = "0.2.0", registry = "azure-vscode-tinykube" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
介绍的关键依赖项:
- 
              wit-bindgen:从 WebAssembly 接口类型(WIT)定义生成 Rust 绑定,使代码能够与 WASM 运行时进行交互
- 
              tinykube_wasm_sdk:Azure IoT Operations SDK 提供用于日志记录、指标和状态管理的操作员宏(#[map_operator]#[filter_operator]等)和主机 API
- 
              serde+serde_json:用于分析和生成数据有效负载的 JSON 处理库;default-features = false针对 WASM 大小约束进行优化
- 
              crate-type = ["cdylib"]:将 Rust 库编译为 C 兼容的动态库,这是 WASM 模块生成所必需的
创建简单模块
创建一个简单的模块,用于将温度从摄氏度转换为华氏度。 此示例演示 Rust 和 Python 实现的基本结构和处理逻辑。
// src/lib.rs
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::macros::map_operator;
use serde_json::{json, Value};
// Import the generated types from wit-bindgen
use crate::tinykube_graph::processor::types::{DataModel, ModuleConfiguration, BufferOrBytes};
fn temperature_converter_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked");
    true
}
#[map_operator(init = "temperature_converter_init")]
fn temperature_converter(input: DataModel) -> DataModel {
    let DataModel::Message(mut result) = input else {
        return input;
    };
    let payload = &result.payload.read();
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["value"]["temperature"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0;
                data["value"] = json!({
                    "temperature_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });
                
                if let Ok(output_str) = serde_json::to_string(&data) {
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }
    DataModel::Message(result)
}
生成模块
根据开发工作流和环境要求在本地开发生成或容器化生成之间进行选择。
本地生成
直接在开发计算机上生成,以便在开发期间进行最快的迭代,并且需要完全控制生成环境时。
# Build WASM module
cargo build --release --target wasm32-wasip2
# Find your module  
ls target/wasm32-wasip2/release/*.wasm
Docker 生成
使用预配置所有依赖项和架构的容器化环境进行生成。 这些 Docker 映像在不同的环境中提供一致的生成,非常适合 CI/CD 管道。
Rust Docker 生成器在 Azure IoT作示例存储库中维护,并包含所有必要的依赖项。 有关详细文档,请参阅 Rust Docker 生成器使用情况。
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Docker 生成选项:
- 
              --app-name:必须与 Rust 箱名称匹配Cargo.toml
- 
              --build-mode:为优化的生成或release具有符号的开发生成选择debug(默认值)
更多示例
有关综合示例,请参阅示例存储库中的 Rust 示例 。 完整的实现包括:
- 映射运算符:数据转换和转换逻辑
- 筛选器运算符:条件数据处理和验证
- 分支运算符:基于数据内容的多路径路由
- 累积运算符:时间窗口聚合和统计处理
- 延迟运算符:基于时间的处理控件
这些示例演示显示每个运算符类型的完整结构(包括正确的错误处理和日志记录模式)的工作实现。
SDK 参考和 API
WASM Rust SDK 提供全面的开发工具:
运算符宏
use tinykube_wasm_sdk::macros::{map_operator, filter_operator, branch_operator};
use tinykube_wasm_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> DataModel {
    // Transform logic here
}
// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> bool {
    // Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> bool {
    // Return true for "True" arm, false for "False" arm
}
模块配置参数
WASM 运算符可以通过传递给ModuleConfiguration函数的结构接收运行时配置参数init。 这些参数在图形定义中定义,并允许运行时自定义,而无需重新生成模块。
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}
有关在图形定义中定义配置参数的详细信息,请参阅 模块配置参数。
主机 API
使用 SDK 处理分布式服务:
持久数据的状态存储:
use tinykube_wasm_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value  
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
结构化日志记录:
use tinykube_wasm_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
OpenTelemetry 兼容的指标:
use tinykube_wasm_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
ONNX 推理与 WASM (预览版)
若要在模块中嵌入和运行小型 ONNX 模型进行带内推理,请参阅 WebAssembly 数据流图中的“运行 ONNX 推理”。 本文介绍使用模块打包模型、在图形定义中启用 wasi-nn 功能以及预览限制。
WebAssembly 接口类型 (WIT)
所有运算符都实现使用 WebAssembly 接口类型(WIT)定义的标准化接口。 WIT 提供与语言无关的接口定义,以确保 WASM 模块与主机运行时之间的兼容性。
示例 存储库中提供了 Azure IoT作的完整 WIT 架构。 这些架构定义开发 WASM 模块时将使用的所有接口、类型和数据结构。
数据模型和接口
所有 WASM 运算符都使用使用 WebAssembly 接口类型定义的标准化数据模型(WIT):
核心数据模型
// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}
// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}
WIT 接口定义
每个运算符类型实现特定的 WIT 接口:
// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> data-model;
}
interface filter {
    use types.{data-model};
    process: func(message: data-model) -> bool;
}
interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> bool;
}
interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> data-model;
}
图形定义和 WASM 集成
图形定义定义定义 WASM 模块如何连接到处理工作流。 它们指定创建完整数据处理管道的作、连接和参数。
有关创建和配置图形定义(包括简单和复杂工作流的详细示例)的综合信息,请参阅 配置数据流图的 WebAssembly 图形定义。
图形定义指南中介绍的关键主题:
- 图形定义结构:了解 YAML 架构和所需组件
- 简单图示例:基本三阶段温度转换管道
- 复杂图形示例:使用分支和聚合进行多传感器处理
- 模块配置参数:WASM 运算符的运行时自定义
- 注册表部署:将图形定义打包和存储为 OCI 项目
后续步骤
- 请参阅 Azure IoT Operations WASM 示例 存储库中的完整示例和高级模式。
- 了解如何通过 数据流图在 Use WebAssembly 中部署模块。
- 在 “配置数据流终结点”中配置数据流终结点。