Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
In this walkthrough, you deploy a Dapr application to the cluster. The Dapr application consumes simulated MQTT data published to MQTT broker, applies a windowing function, and then publishes the result back to MQTT broker. The published output represents how high volume data can be aggregated on the edge to reduce message frequency and size. The Dapr application is stateless, and uses the MQTT broker state store to cache past values needed for the window calculations.
The Dapr application performs the following steps:
- Subscribes to the
sensor/datatopic for sensor data. - When data is receiving on the topic, it's published to the MQTT broker state store.
- Every 10 seconds, it fetches the data from the state store and calculates the min, max, mean, median, and 75th percentile values on any sensor data timestamped in the last 30 seconds.
- Data older than 30 seconds is expired from the state store.
- The result is published to the
sensor/window_datatopic in JSON format.
Note
This tutorial disables Dapr CloudEvents which enables it to publish and subscribe using raw MQTT.
Prerequisites
- Azure IoT Operations installed - Quickstart: Run Azure IoT Operations in GitHub Codespaces with K3s
- MQTT broker Dapr components installed - Install MQTT broker Dapr Components
Deploy the Dapr application
At this point, you can deploy the Dapr application. Registering the components doesn't deploy the associated binary that is packaged in a container. To deploy the binary along with your application, you can use a Deployment to group the containerized Dapr application and the two components together.
To start, create a yaml file that uses the following definitions:
| Component | Description |
|---|---|
volumes.mqtt-client-token |
The SAT used for authenticating the Dapr pluggable components with the MQTT broker and State Store |
volumes.aio-internal-ca-cert-chain |
The chain of trust to validate the MQTT broker TLS cert |
containers.mq-event-driven |
The prebuilt Dapr application container. |
Save the following deployment yaml to a file named
app.yaml:apiVersion: v1 kind: ServiceAccount metadata: name: dapr-client namespace: azure-iot-operations annotations: aio-broker-auth/group: dapr-workload --- apiVersion: apps/v1 kind: Deployment metadata: name: mq-event-driven-dapr namespace: azure-iot-operations spec: selector: matchLabels: app: mq-event-driven-dapr template: metadata: labels: app: mq-event-driven-dapr annotations: dapr.io/enabled: "true" dapr.io/inject-pluggable-components: "true" dapr.io/app-id: "mq-event-driven-dapr" dapr.io/app-port: "6001" dapr.io/app-protocol: "grpc" spec: serviceAccountName: dapr-client volumes: # SAT token used to authenticate between Dapr and the MQTT broker - name: mqtt-client-token projected: sources: - serviceAccountToken: path: mqtt-client-token audience: aio-internal expirationSeconds: 86400 # Certificate chain for Dapr to validate the MQTT broker - name: aio-ca-trust-bundle configMap: name: azure-iot-operations-aio-ca-trust-bundle containers: - name: mq-event-driven-dapr image: ghcr.io/azure-samples/explore-iot-operations/mq-event-driven-dapr:latestDeploy the application by running the following command:
kubectl apply -f app.yamlConfirm that the application deployed successfully. The pod should report all containers are ready after a short interval, as shown with the following command:
kubectl get pods -l app=mq-event-driven-dapr -n azure-iot-operationsWith the following output:
NAME READY STATUS RESTARTS AGE mq-event-driven-dapr 3/3 Running 0 30s
Deploy the simulator
Simulate test data by deploying a Kubernetes workload. It simulates a sensor by sending sample temperature, vibration, and pressure readings periodically to the MQTT broker using an MQTT client on the sensor/data topic.
Deploy the simulator from the Explore IoT Operations repository:
kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/tutorials/mq-event-driven-dapr/simulate-data.yamlConfirm the simulator is running correctly:
kubectl logs deployment/mqtt-publisher-deployment -n azure-iot-operations -fWith the following output:
Get:1 http://deb.debian.org/debian stable InRelease [151 kB] Get:2 http://deb.debian.org/debian stable-updates InRelease [52.1 kB] Get:3 http://deb.debian.org/debian-security stable-security InRelease [48.0 kB] Get:4 http://deb.debian.org/debian stable/main amd64 Packages [8780 kB] Get:5 http://deb.debian.org/debian stable-updates/main amd64 Packages [6668 B] Get:6 http://deb.debian.org/debian-security stable-security/main amd64 Packages [101 kB] Fetched 9139 kB in 3s (3570 kB/s) ... Messages published in the last 10 seconds: 10 Messages published in the last 10 seconds: 10 Messages published in the last 10 seconds: 10
Deploy an MQTT client
To verify the MQTT bridge is working, deploy an MQTT client to the cluster.
In a new file named
client.yaml, specify the client deployment:apiVersion: v1 kind: ServiceAccount metadata: name: mqtt-client namespace: azure-iot-operations --- apiVersion: v1 kind: Pod metadata: name: mqtt-client namespace: azure-iot-operations spec: serviceAccountName: mqtt-client containers: - image: alpine name: mqtt-client command: ["sh", "-c"] args: ["apk add mosquitto-clients mqttui && sleep infinity"] volumeMounts: - name: mqtt-client-token mountPath: /var/run/secrets/tokens - name: aio-ca-trust-bundle mountPath: /var/run/certs/aio-internal-ca-cert/ volumes: - name: mqtt-client-token projected: sources: - serviceAccountToken: path: mqtt-client-token audience: aio-internal expirationSeconds: 86400 - name: aio-ca-trust-bundle configMap: name: azure-iot-operations-aio-ca-trust-bundleApply the deployment file with kubectl:
kubectl apply -f client.yamlVerify output:
pod/mqtt-client created
Verify the Dapr application output
Open a shell to the Mosquitto client pod:
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- shSubscribe to the
sensor/window_datatopic to observe the published output from the Dapr application:mosquitto_sub -L mqtt://aio-broker/sensor/window_dataVerify the application is outputting a sliding windows calculation for the various sensors every 10 seconds:
{ "timestamp": "2023-11-16T21:59:53.939690+00:00", "window_size": 30, "temperature": { "min": 553.024, "max": 598.907, "mean": 576.4647857142858, "median": 577.4905, "75_per": 585.96125, "count": 28 }, "pressure": { "min": 290.605, "max": 299.781, "mean": 295.521, "median": 295.648, "75_per": 297.64050000000003, "count": 28 }, "vibration": { "min": 0.00124192, "max": 0.00491257, "mean": 0.0031171810714285715, "median": 0.003199235, "75_per": 0.0038769150000000003, "count": 28 } }
Optional - Create the Dapr application
This tutorial uses a prebuilt container of the Dapr application. If you would like to modify and build the code yourself, follow these steps:
Prerequisites
- Docker - for building the application container
- A Container registry - for hosting the application container
Build the application
Clone the Explore IoT Operations repository:
git clone https://github.com/Azure-Samples/explore-iot-operationsChange to the Dapr tutorial directory:
cd explore-iot-operations/tutorials/mq-event-driven-dapr/srcBuild the docker image:
docker build docker build . -t mq-event-driven-daprTo consume the application in your Kubernetes cluster, you need to push the image to a container registry such as the Azure Container Registry. You could also push to a local container registry such as minikube or Docker.
docker tag mq-event-driven-dapr <container-alias> docker push <container-alias>Update your
app.yamlto pull your newly created image.
Troubleshooting
If the application doesn't start or you see the containers in CrashLoopBackoff, the daprd container log often contains useful information.
Run the following command to view the logs for the daprd component:
kubectl logs -l app=mq-event-driven-dapr -n azure-iot-operations -c daprd