Edit

Share via


Develop WebAssembly (WASM) modules and graph definitions for data flow graphs (preview)

Important

WebAssembly (WASM) development for data flow graphs is in preview. This feature has limitations and isn't for production workloads.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or not yet released into general availability.

This article shows you how to develop custom WebAssembly (WASM) modules and graph definitions for Azure IoT Operations data flow graphs. Create modules in Rust or Python to implement custom processing logic. Define graph configurations that specify how your modules connect into complete processing workflows.

Important

Data flow graphs currently only support MQTT, Kafka, and OpenTelemetry endpoints. Other endpoint types like Data Lake, Microsoft Fabric OneLake, Azure Data Explorer, and Local Storage are not supported. For more information, see Known issues.

Overview

Azure IoT Operations data flow graphs process streaming data through configurable operators implemented as WebAssembly modules. Each operator processes timestamped data while maintaining temporal ordering, enabling real-time analytics with deterministic results.

Key benefits

  • Real-time processing: Handle streaming data with consistent low latency
  • Event-time semantics: Process data based on when events occurred, not when they're processed
  • Fault tolerance: Built-in support for handling failures and ensuring data consistency
  • Scalability: Distribute processing across multiple nodes while maintaining order guarantees
  • Multi-language support: Develop in Rust or Python with consistent interfaces

Architecture foundation

Data flow graphs build on the Timely dataflow computational model, which originated from Microsoft Research's Naiad project. This approach ensures:

  • Deterministic processing: Same input always produces the same output
  • Progress tracking: The system knows when computations are complete
  • Distributed coordination: Multiple processing nodes stay synchronized

Why timely dataflow?

Traditional stream processing systems have several challenges. Out-of-order data means events can arrive later than expected. Partial results make it hard to know when computations finish. Coordination issues happen when synchronizing distributed processing.

Timely dataflow solves these problems through:

Timestamps and progress tracking

Every data item carries a timestamp representing its logical time. The system tracks progress through timestamps, enabling several key capabilities:

  • Deterministic processing: Same input always produces same output
  • Exactly once semantics: No duplicate or missed processing
  • Watermarks: Know when no more data will arrive for a given time

Hybrid logical clock

The timestamp mechanism uses a hybrid approach:

pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}

The hybrid logical clock approach ensures several capabilities:

  • Causal ordering: Effects follow causes
  • Progress guarantees: The system knows when processing is complete
  • Distributed coordination: Multiple nodes stay synchronized

Understand operators and modules

Understanding the distinction between operators and modules is essential for WASM development:

Operators

Operators are the fundamental processing units based on Timely dataflow operators. Each operator type serves a specific purpose:

  • Map: Transform each data item (such as converting temperature units)
  • Filter: Allow only certain data items to pass through based on conditions (such as removing invalid readings)
  • Branch: Route data to different paths based on conditions (such as separating temperature and humidity data)
  • Accumulate: Collect and aggregate data within time windows (such as computing statistical summaries)
  • Concatenate: Merge multiple data streams while preserving temporal order
  • Delay: Control timing by advancing timestamps

Modules

Modules are the implementation of operator logic as WASM code. A single module can implement multiple operator types. For example, a temperature module might provide:

  • A map operator for unit conversion
  • A filter operator for threshold checking
  • A branch operator for routing decisions
  • An accumulate operator for statistical aggregation

The relationship

The relationship between graph definitions, modules, and operators follows a specific pattern:

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

The separation allows you to:

  • Module reuse: Deploy the same WASM module in different graph configurations
  • Independent versioning: Update graph definitions without rebuilding modules
  • Dynamic configuration: Pass different parameters to the same module for different behaviors

Prerequisites

Choose your development language and set up the required tools:

  • Rust toolchain: Install with:
    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
    
  • WASM target: Add with:
    rustup target add wasm32-wasip2
    
  • Build tools: Install with:
    cargo install wasm-tools --version '=1.201.0' --locked
    

Configure development environment

The WASM Rust SDK is available through a custom Azure DevOps registry. Configure access by setting these environment variables:

export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"
export CARGO_NET_GIT_FETCH_WITH_CLI=true

Add the following environment variables to your shell profile for persistent access:

echo 'export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"' >> ~/.bashrc
echo 'export CARGO_NET_GIT_FETCH_WITH_CLI=true' >> ~/.bashrc
source ~/.bashrc

Create project

Start by creating a new project directory for your operator module. The project structure depends on your chosen language.

cargo new --lib temperature-converter
cd temperature-converter

Configure Cargo.toml

Edit the Cargo.toml file to include dependencies for the WASM SDK and other libraries:

[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"

[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"

# Azure IoT Operations WASM SDK - provides operator macros and host APIs
tinykube_wasm_sdk = { version = "0.2.0", registry = "azure-vscode-tinykube" }

# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }

[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]

Key dependencies explained:

  • wit-bindgen: Generates Rust bindings from WebAssembly Interface Types (WIT) definitions, enabling your code to interface with the WASM runtime
  • tinykube_wasm_sdk: Azure IoT Operations SDK providing operator macros (#[map_operator], #[filter_operator], etc.) and host APIs for logging, metrics, and state management
  • serde + serde_json: JSON processing libraries for parsing and generating data payloads; default-features = false optimizes for WASM size constraints
  • crate-type = ["cdylib"]: Compiles the Rust library as a C-compatible dynamic library, which is required for WASM module generation

Create a simple module

Create a simple module that converts temperature from Celsius to Fahrenheit. This example demonstrates the basic structure and processing logic for both Rust and Python implementations.

// src/lib.rs
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::macros::map_operator;
use serde_json::{json, Value};

// Import the generated types from wit-bindgen
use crate::tinykube_graph::processor::types::{DataModel, ModuleConfiguration, BufferOrBytes};

fn temperature_converter_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked");
    true
}

#[map_operator(init = "temperature_converter_init")]
fn temperature_converter(input: DataModel) -> DataModel {
    let DataModel::Message(mut result) = input else {
        return input;
    };

    let payload = &result.payload.read();
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["value"]["temperature"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0;
                data["value"] = json!({
                    "temperature_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });
                
                if let Ok(output_str) = serde_json::to_string(&data) {
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }

    DataModel::Message(result)
}

Build module

Choose between local development builds or containerized builds based on your development workflow and environment requirements.

Local build

Build directly on your development machine for fastest iteration during development and when you need full control over the build environment.

# Build WASM module
cargo build --release --target wasm32-wasip2

# Find your module  
ls target/wasm32-wasip2/release/*.wasm

Docker build

Build using containerized environments with all dependencies and schemas preconfigured. These Docker images provide consistent builds across different environments and are ideal for CI/CD pipelines.

The Rust Docker builder is maintained in the Azure IoT Operations samples repository and includes all necessary dependencies. For detailed documentation, see Rust Docker builder usage.

# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter

# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug

Docker build options:

  • --app-name: Must match your Rust crate name from Cargo.toml
  • --build-mode: Choose release (default) for optimized builds or debug for development builds with symbols

More examples

For comprehensive examples, see the Rust examples in the samples repository. Complete implementations include:

  • Map operators: Data transformation and conversion logic
  • Filter operators: Conditional data processing and validation
  • Branch operators: Multi-path routing based on data content
  • Accumulate operators: Time-windowed aggregation and statistical processing
  • Delay operators: Time-based processing control

The examples demonstrate working implementations that show the complete structure for each operator type, including proper error handling and logging patterns.

SDK reference and APIs

The WASM Rust SDK provides comprehensive development tools:

Operator macros

use tinykube_wasm_sdk::macros::{map_operator, filter_operator, branch_operator};
use tinykube_wasm_sdk::{DataModel, HybridLogicalClock};

// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> DataModel {
    // Transform logic here
}

// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> bool {
    // Return true to pass data through, false to filter out
}

// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> bool {
    // Return true for "True" arm, false for "False" arm
}

Module configuration parameters

Your WASM operators can receive runtime configuration parameters through the ModuleConfiguration struct passed to the init function. These parameters are defined in the graph definition and allow runtime customization without rebuilding modules.

use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::ModuleConfiguration;

fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}

For detailed information about defining configuration parameters in graph definitions, see Module configuration parameters.

Host APIs

Use the SDK to work with distributed services:

State store for persistent data:

use tinykube_wasm_sdk::state_store;

// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;

// Get value  
let response = state_store::get(key.as_bytes(), None)?;

// Delete key
state_store::del(key.as_bytes(), None, None)?;

Structured logging:

use tinykube_wasm_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

OpenTelemetry-compatible metrics:

use tinykube_wasm_sdk::metrics;

// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;

// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;

ONNX inference with WASM (preview)

To embed and run small ONNX models inside your modules for in-band inference, see Run ONNX inference in WebAssembly data flow graphs. That article covers packaging models with modules, enabling the wasi-nn feature in graph definitions, and preview limitations.

WebAssembly Interface Types (WIT)

All operators implement standardized interfaces defined using WebAssembly Interface Types (WIT). WIT provides language-agnostic interface definitions that ensure compatibility between WASM modules and the host runtime.

The complete WIT schemas for Azure IoT Operations are available in the samples repository. These schemas define all the interfaces, types, and data structures you'll work with when developing WASM modules.

Data model and interfaces

All WASM operators work with standardized data models defined using WebAssembly Interface Types (WIT):

Core data model

// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}

// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}

// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}

WIT interface definitions

Each operator type implements a specific WIT interface:

// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> data-model;
}

interface filter {
    use types.{data-model};
    process: func(message: data-model) -> bool;
}

interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> bool;
}

interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> data-model;
}

Graph definitions and WASM integration

Graph definitions define how your WASM modules connect to processing workflows. They specify the operations, connections, and parameters that create complete data processing pipelines.

For comprehensive information about creating and configuring graph definitions, including detailed examples of simple and complex workflows, see Configure WebAssembly graph definitions for data flow graphs.

Key topics covered in the graph definitions guide:

  • Graph definition structure: Understanding the YAML schema and required components
  • Simple graph example: Basic three-stage temperature conversion pipeline
  • Complex graph example: Multi-sensor processing with branching and aggregation
  • Module configuration parameters: Runtime customization of WASM operators
  • Registry deployment: Packaging and storing graph definitions as OCI artifacts

Next steps