Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Learn how to use the Apache Kafka Connect Azure IoT Hub connector to move data between Apache Kafka on HDInsight and Azure IoT Hub. In this document, you learn how to run the IoT Hub connector from an edge node in the cluster.
Kafka Connect API allows you to implement connectors that continuously pull data into Kafka, or push data from Kafka to another system. The Apache Kafka Connect Azure IoT Hub is a connector that pulls data from Azure IoT Hub into Kafka. It can also push data from Kafka to the IoT Hub.
When pulling from the IoT Hub, you use a source connector. When pushing to IoT Hub, you use a sink connector. The IoT Hub connector provides both the source and sink connectors.
The following diagram shows the data flow between Azure IoT Hub and Kafka on HDInsight when using the connector.
For more information on how to Connect API, see https://kafka.apache.org/documentation/#connect.
Prerequisites
An Apache Kafka cluster on HDInsight. For more information, see Kafka on HDInsight quickstart document.
An edge node in the Kafka cluster. For more information, see Use edge nodes with HDInsight document.
An SSH client. For more information, see Connect to HDInsight (Apache Hadoop) using SSH.
An Azure IoT Hub and device. For this article, consider using Connect Raspberry Pi online simulator to Azure IoT Hub.
Build the connector
Download the source for the connector from https://github.com/Azure/toketi-kafka-connect-iothub/ to your local environment.
From a command prompt, navigate to the
toketi-kafka-connect-iothub-masterdirectory. Then use the following command to build and package the project:sbt assemblyThe build takes a few minutes to complete. The command creates a file named
kafka-connect-iothub-assembly_2.11-0.7.0.jarin thetoketi-kafka-connect-iothub-master\target\scala-2.11directory for the project.
Install the connector
Upload the .jar file to the edge node of your Kafka on HDInsight cluster. Edit the following command by replacing
CLUSTERNAMEwith the actual name of your cluster. The default values for the SSH user account and name of edge node are used to modify as needed.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:Once the file copy completes, connect to the edge node using SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.netTo install the connector into the Kafka
libsdirectory, use the following command:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Keep your SSH connection active for the remaining steps.
Configure Apache Kafka
From your SSH connection to the edge node, use the following steps to configure Kafka to run the connector in standalone mode:
Set up password variable. Replace PASSWORD with the cluster login password, then enter the command:
export password='PASSWORD'Install the jq utility. jq makes it easier to process JSON documents returned from Ambari queries. Enter the following command:
sudo apt -y install jqGet the address of the Kafka brokers. There may be many brokers in your cluster, but you only need to reference one or two. To get the address of two broker hosts, use the following command:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERSCopy the values for later use. The value returned is similar to the following text:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092Get the address of the Apache Zookeeper nodes. There are several Zookeeper nodes in the cluster, but you only need to reference one or two. Use the following command to the store the addresses in the variable
KAFKAZKHOSTS:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`When you run the connector in standalone mode, the
/usr/hdp/current/kafka-broker/config/connect-standalone.propertiesfile is used to communicate with the Kafka brokers. To edit theconnect-standalone.propertiesfile, use the following command:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.propertiesMake the following edits:
Current value New value Comment bootstrap.servers=localhost:9092Replace the localhost:9092value with the broker hosts from the previous stepConfigures the standalone configuration for the edge node to find the Kafka brokers. key.converter=org.apache.kafka.connect.json.JsonConverterkey.converter=org.apache.kafka.connect.storage.StringConverterThis change allows you to test using the console producer included with Kafka. You may need different converters for other producers and consumers. For information on using other converter values, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.storage.StringConverterSame as given. N/A consumer.max.poll.records=10Add to end of file. This change is to prevent timeouts in the sink connector by limiting it to 10 records at a time. For more information, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. To save the file, use Ctrl + X, Y, and then Enter.
To create the topics used by the connector, use the following commands:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTSTo verify that the
iotinandiotouttopics exist, use the following command:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTSThe
iotintopic is used to receive messages from IoT Hub. Theiotouttopic is used to send messages to IoT Hub.
Get IoT Hub connection information
To retrieve IoT hub information used by the connector, use the following steps:
Get the Event Hub-compatible endpoint and Event Hub-compatible endpoint name for your IoT hub. To get this information, use one of the following methods:
From the Azure portal, use the following steps:
Navigate to your IoT Hub and select Endpoints.
From Built-in endpoints, select Events.
From Properties, copy the value of the following fields:
- Event Hub-compatible name
- Event Hub-compatible endpoint
- Partitions
Important
The endpoint value from the portal may contain extra text that is not needed in this example. Extract the text that matches this pattern
sb://<randomnamespace>.servicebus.windows.net/.
From the Azure CLI, use the following command:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"Replace
myhubnamewith the name of your IoT hub. The response is similar to the following text:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Get the shared access policy and key. For this example, use the service key. To get this information, use one of the following methods:
From the Azure portal, use the following steps:
- Select Shared access policies, and then select service.
- Copy the Primary key value.
- Copy the Connection string--primary key value.
From the Azure CLI, use the following command:
To get the primary key value, use the following command:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"Replace
myhubnamewith the name of your IoT hub. The response is the primary key to theservicepolicy for this hub.To get the connection string for the
servicepolicy, use the following command:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"Replace
myhubnamewith the name of your IoT hub. The response is the connection string for theservicepolicy.
Configure the source connection
To configure the source to work with your IoT Hub, perform the following actions from an SSH connection to the edge node:
Create a copy of the
connect-iot-source.propertiesfile in the/usr/hdp/current/kafka-broker/config/directory. To download the file from the toketi-kafka-connect-iothub project, use the following command:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.propertiesTo edit the
connect-iot-source.propertiesfile and add the IoT hub information, use the following command:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.propertiesIn the editor, find and change the following entries:
Current value Edit Kafka.Topic=PLACEHOLDERReplace PLACEHOLDERwithiotin. Messages received from IoT hub are placed in theiotintopic.IotHub.EventHubCompatibleName=PLACEHOLDERReplace PLACEHOLDERwith the Event Hub-compatible name.IotHub.EventHubCompatibleEndpoint=PLACEHOLDERReplace PLACEHOLDERwith the Event Hub-compatible endpoint.IotHub.AccessKeyName=PLACEHOLDERReplace PLACEHOLDERwithservice.IotHub.AccessKeyValue=PLACEHOLDERReplace PLACEHOLDERwith the primary key of theservicepolicy.IotHub.Partitions=PLACEHOLDERReplace PLACEHOLDERwith the number of partitions from the previous steps.IotHub.StartTime=PLACEHOLDERReplace PLACEHOLDERwith a UTC date. This date is when the connector starts checking for messages. The date format isyyyy-mm-ddThh:mm:ssZ.BatchSize=100Replace 100with5. This change causes the connector to read messages into Kafka once there are five new messages in IoT hub.For an example configuration, see Kafka Connect Source Connector for Azure IoT Hub.
To save changes, use Ctrl + X, Y, and then Enter.
For more information on configuring the connector source, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Configure the sink connection
To configure the sink connection to work with your IoT Hub, perform the following actions from an SSH connection to the edge node:
Create a copy of the
connect-iothub-sink.propertiesfile in the/usr/hdp/current/kafka-broker/config/directory. To download the file from the toketi-kafka-connect-iothub project, use the following command:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.propertiesTo edit the
connect-iothub-sink.propertiesfile and add the IoT hub information, use the following command:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.propertiesIn the editor, find and change the following entries:
Current value Edit topics=PLACEHOLDERReplace PLACEHOLDERwithiotout. Messages written toiotouttopic are forwarded to the IoT hub.IotHub.ConnectionString=PLACEHOLDERReplace PLACEHOLDERwith the connection string for theservicepolicy.For an example configuration, see Kafka Connect Sink Connector for Azure IoT Hub.
To save changes, use Ctrl + X, Y, and then Enter.
For more information on configuring the connector sink, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Start the source connector
To start the source connector, use the following command from an SSH connection to the edge node:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.propertiesOnce the connector starts, send messages to IoT hub from your device(s). As the connector reads messages from the IoT hub and stores them in the Kafka topic, it logs information to the console:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)Note
You may see several warnings as the connector starts. These warnings do not cause problems with receiving messages from IoT hub.
Stop the connector after a few minutes using Ctrl + C twice. It takes a few minutes for the connector to stop.
Start the sink connector
From an SSH connection to the edge node, use the following command to start the sink connector in standalone mode:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
As the connector runs, information similar to the following text is displayed:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Note
You may notice several warnings as the connector starts. You can safely ignore these.
Send messages
To send messages through the connector, use the following steps:
Open a second SSH session to the Kafka cluster:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.netGet the address of the Kafka brokers for the new ssh session. Replace PASSWORD with the cluster login password, then enter the command:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`To send messages to the
iotouttopic, use the following command:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotoutThis command doesn't return you to the normal Bash prompt. Instead, it sends keyboard input to the
iotouttopic.To send a message to your device, paste a JSON document into the SSH session for the
kafka-console-producer.Important
You must set the value of the
"deviceId"entry to the ID of your device. In the following example, the device is namedmyDeviceId:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}The schema for this JSON document is described in more detail at https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
If you're using the simulated Raspberry Pi device, and it's running, the device logs the following message.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
For more information on using the sink connector, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Next steps
In this document, you learned how to use the Apache Kafka Connect API to start the IoT Kafka Connector on HDInsight. Use the following links to discover other ways to work with Kafka: