Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Viktigt!
WebAssembly -utveckling (WASM) för dataflödesdiagram är i förhandsversion. Den här funktionen har begränsningar och gäller inte för produktionsarbetsbelastningar.
Se kompletterande användningsvillkor för Förhandsversioner av Microsoft Azure för juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller som ännu inte har släppts i allmän tillgänglighet.
Den här artikeln visar hur du utvecklar anpassade WebAssembly-moduler (WASM) och grafdefinitioner för Azure IoT Operations-dataflödesdiagram. Skapa moduler i Rust eller Python för att implementera anpassad bearbetningslogik. Definiera grafkonfigurationer som anger hur dina moduler ansluter till fullständiga bearbetningsarbetsflöden.
Viktigt!
Dataflödesdiagram stöder för närvarande endast MQTT-, Kafka- och OpenTelemetry-slutpunkter. Andra slutpunktstyper som Data Lake, Microsoft Fabric OneLake, Azure Data Explorer och Local Storage stöds inte. Mer information finns i Kända problem.
Översikt
Dataflödesdiagram i Azure IoT Operations bearbetar strömmande data via konfigurerbara operatorer som implementeras som WebAssembly-moduler. Varje operator bearbetar tidsstämplade data samtidigt som tidsordningen bibehålls, vilket möjliggör realtidsanalys med deterministiska resultat.
Viktiga fördelar
- Realtidsbearbetning: Hantera strömmande data med konsekvent låg svarstid
- Händelse-tidssemantik: Bearbeta data baserat på när händelser inträffade, inte när de bearbetas
- Feltolerans: Inbyggt stöd för hantering av fel och säkerställa datakonsekvens
- Skalbarhet: Distribuera bearbetning över flera noder samtidigt som ordergarantier bibehålls
- Stöd för flera språk: Utveckla i Rust eller Python med konsekventa gränssnitt
Grund för arkitektur
Dataflödesdiagram bygger på beräkningsmodellen för dataflöde i tid , som kommer från Microsoft Researchs Naiad-projekt. Den här metoden säkerställer:
- Deterministisk bearbetning: Samma indata genererar alltid samma utdata
- Förloppsspårning: Systemet vet när beräkningen är klar
- Distribuerad samordning: Flera bearbetningsnoder förblir synkroniserade
Varför snabbt dataflöde?
Traditionella dataströmbearbetningssystem har flera utmaningar. Data som inte är i ordning innebär att händelser kan komma senare än förväntat. Partiella resultat gör det svårt att veta när beräkningar slutförs. Samordningsproblem uppstår vid synkronisering av distribuerad bearbetning.
Dataflödet i rätt tid löser dessa problem genom:
Tidsstämplar och förloppsspårning
Varje dataobjekt har en tidsstämpel som representerar dess logiska tid. Systemet spårar förloppet genom tidsstämplar, vilket möjliggör flera viktiga funktioner:
- Deterministisk bearbetning: Samma indata genererar alltid samma utdata
- Exakt en gång semantik: Ingen duplicerad eller missad bearbetning
- Vattenstämplar: Vet när inga fler data kommer att tas emot under en viss tid
Logisk hybridklocka
Tidsstämpelmekanismen använder en hybridmetod:
pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}
Metoden för logisk hybridklocka säkerställer flera funktioner:
- Kausal ordning: Effekter följer orsaker
- Förloppsgarantier: Systemet vet när bearbetningen är klar
- Distribuerad samordning: Flera noder förblir synkroniserade
Förstå operatorer och moduler
Att förstå skillnaden mellan operatorer och moduler är viktigt för WASM-utveckling:
Operatörer
Operatorer är de grundläggande bearbetningsenheterna som baseras på dataflödesoperatorer i tid. Varje operatortyp har ett specifikt syfte:
- Karta: Transformera varje dataobjekt (till exempel konvertera temperaturenheter)
- Filter: Tillåt endast vissa dataobjekt att passera baserat på villkor (till exempel att ta bort ogiltiga avläsningar)
- Gren: Dirigera data till olika sökvägar baserat på förhållanden (till exempel att separera temperatur- och luftfuktighetsdata)
- Ackumulera: Samla in och aggregera data inom tidsperioder (till exempel beräkning av statistiska sammanfattningar)
- Sammanfoga: Sammanfoga flera dataströmmar samtidigt som tidsordningen bevaras
- Fördröjning: Kontrollera tidsscheman genom att flytta fram tidsstämplar
Modules
Moduler är implementeringen av operatorlogik som WASM-kod. En enskild modul kan implementera flera operatortyper. En temperaturmodul kan till exempel ge:
- En kartoperator för enhetskonvertering
- En filteroperator för tröskelvärdeskontroll
- En grenoperator för routningsbeslut
- En ackumulerad operator för statistisk aggregering
Relationen
Relationen mellan grafdefinitioner, moduler och operatorer följer ett specifikt mönster:
Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Med separationen kan du:
- Återanvändning av modul: Distribuera samma WASM-modul i olika grafkonfigurationer
- Oberoende versionshantering: Uppdatera diagramdefinitioner utan att återskapa moduler
- Dynamisk konfiguration: Skicka olika parametrar till samma modul för olika beteenden
Förutsättningar
Välj ditt utvecklingsspråk och konfigurera de verktyg som krävs:
- 
              Rust-verktygskedja: Installera med: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
- 
              WASM-mål: Lägg till med: rustup target add wasm32-wasip2
- 
              Byggverktyg: Installera med: cargo install wasm-tools --version '=1.201.0' --locked
Konfigurera utvecklingsmiljön
WASM Rust SDK är tillgängligt via ett anpassat Azure DevOps-register. Konfigurera åtkomst genom att ange dessa miljövariabler:
export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"
export CARGO_NET_GIT_FETCH_WITH_CLI=true
Lägg till följande miljövariabler i din gränssnittsprofil för beständig åtkomst:
echo 'export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"' >> ~/.bashrc
echo 'export CARGO_NET_GIT_FETCH_WITH_CLI=true' >> ~/.bashrc
source ~/.bashrc
Skapa projekt
Börja med att skapa en ny projektkatalog för din operatörsmodul. Projektstrukturen beror på ditt valda språk.
cargo new --lib temperature-converter
cd temperature-converter
Konfigurera Cargo.toml
Redigera filen så att den Cargo.toml innehåller beroenden för WASM SDK och andra bibliotek:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
tinykube_wasm_sdk = { version = "0.2.0", registry = "azure-vscode-tinykube" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Viktiga beroenden förklaras:
- 
              wit-bindgen: Genererar Rust-bindningar från WebAssembly-gränssnittstyper (WIT)-definitioner, vilket möjliggör för koden att samverka med WASM-runtime.
- 
              tinykube_wasm_sdk: Azure IoT Operations SDK som tillhandahåller operatormakro (#[map_operator],#[filter_operator]osv.) och värd-API:er för loggning, mått och tillståndshantering
- 
              serde+serde_json: JSON-bearbetningsbibliotek för parsning och generering av datanyttolaster;default-features = falseoptimerar för WASM-storleksbegränsningar
- 
              crate-type = ["cdylib"]: Kompilerar Rust-biblioteket som ett C-kompatibelt dynamiskt bibliotek, vilket krävs för generering av WASM-moduler
Skapa en enkel modul
Skapa en enkel modul som konverterar temperatur från Celsius till Fahrenheit. Det här exemplet visar den grundläggande strukturen och bearbetningslogik för både Rust- och Python-implementeringar.
// src/lib.rs
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::macros::map_operator;
use serde_json::{json, Value};
// Import the generated types from wit-bindgen
use crate::tinykube_graph::processor::types::{DataModel, ModuleConfiguration, BufferOrBytes};
fn temperature_converter_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked");
    true
}
#[map_operator(init = "temperature_converter_init")]
fn temperature_converter(input: DataModel) -> DataModel {
    let DataModel::Message(mut result) = input else {
        return input;
    };
    let payload = &result.payload.read();
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["value"]["temperature"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0;
                data["value"] = json!({
                    "temperature_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });
                
                if let Ok(output_str) = serde_json::to_string(&data) {
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }
    DataModel::Message(result)
}
Byggmodul
Välj mellan lokala utvecklingsversioner eller containerbaserade byggen baserat på ditt utvecklingsarbetsflöde och dina miljökrav.
Lokal version
Skapa direkt på utvecklingsdatorn för snabbast iteration under utveckling och när du behöver fullständig kontroll över byggmiljön.
# Build WASM module
cargo build --release --target wasm32-wasip2
# Find your module  
ls target/wasm32-wasip2/release/*.wasm
Docker-version
Skapa med hjälp av containerbaserade miljöer med alla beroenden och scheman förkonfigurerade. Dessa Docker-avbildningar ger konsekventa byggen i olika miljöer och är idealiska för CI/CD-pipelines.
Rust Docker-byggaren underhålls i Azure IoT Operations-exempellagringsplatsen och innehåller alla nödvändiga beroenden. Detaljerad dokumentation finns i Användning av Rust Docker Builder.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Docker-byggalternativ:
- 
              --app-name: Måste matcha ditt Rust crate-namn frånCargo.toml
- 
              --build-mode: Väljrelease(standard) för optimerade versioner ellerdebugför utvecklingsversioner med symboler
Fler exempel
Omfattande exempel finns i Rust-exemplen i exempellagringsplatsen. Kompletta implementeringar omfattar:
- Kartoperatorer: Datatransformering och konverteringslogik
- Filteroperatorer: Bearbetning och validering av villkorsstyrda data
- Grenoperatorer: Routning med flera sökvägar baserat på datainnehåll
- Ackumulerade operatorer: Tidsfönsterbaserad aggregering och statistisk bearbetning
- Fördröjningsoperatorer: Tidsbaserad bearbetningskontroll
Exemplen visar fungerande implementeringar som visar den fullständiga strukturen för varje operatortyp, inklusive korrekt felhantering och loggningsmönster.
SDK-referens och API:er
WASM Rust SDK innehåller omfattande utvecklingsverktyg:
Operatormakro
use tinykube_wasm_sdk::macros::{map_operator, filter_operator, branch_operator};
use tinykube_wasm_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> DataModel {
    // Transform logic here
}
// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> bool {
    // Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> bool {
    // Return true for "True" arm, false for "False" arm
}
Modulkonfigurationsparametrar
Dina WASM-operatorer kan ta emot konfigurationsparametrar för körning via strukturen ModuleConfiguration som skickas till init funktionen. De här parametrarna definieras i grafdefinitionen och tillåter anpassning av körningsmiljön utan att modulerna återskapas.
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}
Detaljerad information om hur du definierar konfigurationsparametrar i grafdefinitioner finns i Modulkonfigurationsparametrar.
Värd-API:er
Använd SDK för att arbeta med distribuerade tjänster:
Tillståndslager för beständiga data:
use tinykube_wasm_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value  
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Strukturerad loggning:
use tinykube_wasm_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
OpenTelemetry-kompatibla mått:
use tinykube_wasm_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
ONNX-slutsatsdragning med WASM (förhandsversion)
Information om hur du bäddar in och kör små ONNX-modeller i dina moduler för in-band-slutsatsdragning finns i Köra ONNX-slutsatsdragning i WebAssembly-dataflödesdiagram. Den här artikeln beskriver paketeringsmodeller med moduler, aktivering av funktionen wasi-nn i grafdefinitioner och förhandsgranskningsbegränsningar.
WebAssembly-gränssnittstyper (WIT)
Alla operatorer implementerar standardiserade gränssnitt som definierats med hjälp av WebAssembly Interface Types (WIT). WIT tillhandahåller språkagnostiska gränssnittsdefinitioner som säkerställer kompatibilitet mellan WASM-moduler och värdkörningen.
De fullständiga WIT-schemana för Azure IoT Operations är tillgängliga på exempellagringsplatsen. Dessa scheman definierar alla gränssnitt, typer och datastrukturer som du arbetar med när du utvecklar WASM-moduler.
Datamodell och gränssnitt
Alla WASM-operatorer arbetar med standardiserade datamodeller som definierats med WebAssembly Interface Types (WIT):
Kärndatamodell
// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}
// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}
WIT-gränssnittsdefinitioner
Varje operatortyp implementerar ett specifikt WIT-gränssnitt:
// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> data-model;
}
interface filter {
    use types.{data-model};
    process: func(message: data-model) -> bool;
}
interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> bool;
}
interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> data-model;
}
Diagramdefinitioner och WASM-integrering
Diagramdefinitioner definierar hur dina WASM-moduler ansluter till bearbetningsarbetsflöden. De anger de åtgärder, anslutningar och parametrar som skapar fullständiga databehandlingspipelines.
Omfattande information om hur du skapar och konfigurerar grafdefinitioner, inklusive detaljerade exempel på enkla och komplexa arbetsflöden, finns i Konfigurera WebAssembly-grafdefinitioner för dataflödesdiagram.
Viktiga ämnen som beskrivs i guiden för grafdefinitioner:
- Diagramdefinitionsstruktur: Förstå YAML-schemat och nödvändiga komponenter
- Enkelt diagramexempel: Grundläggande pipeline för temperaturkonvertering i tre steg
- Komplext diagramexempel: Bearbetning av flera sensorer med förgrening och aggregering
- Modulkonfigurationsparametrar: Körningsanpassning av WASM-operatorer
- Registerdistribution: Paketera och lagra grafdefinitioner som OCI-artefakter
Nästa steg
- Se fullständiga exempel och avancerade mönster i AZURE IoT Operations WASM-exempellagringsplatsen .
- Lär dig hur du distribuerar dina moduler i Använda WebAssembly med dataflödesdiagram.
- Konfigurera dina dataflödesslutpunkter i Konfigurera dataflödesslutpunkter.