Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Lär dig hur du använder Apache Kafka-producent- och konsument-API:er med Kafka i HDInsight.
Med Kafka-producent-API:et kan program skicka dataströmmar till Kafka-klustret. Med Kafka Consumer API kan program läsa dataströmmar från klustret.
I den här tutorialen lär du dig följande:
- Förutsättningar
 - Förstå koden
 - Skapa och distribuera programmet
 - Kör programmet i klustret
 
Mer information om API:erna finns i Apache-dokumentationen om producent-API :et och konsument-API:et.
Förutsättningar
- Apache Kafka på HDInsight-kluster. Information om hur du skapar klustret finns i Börja med Apache Kafka i HDInsight.
 - Java Developer Kit (JDK) version 8 eller motsvarande, till exempel OpenJDK.
 - Apache Maven har installerats korrekt enligt Apache. Maven är ett projektbyggsystem för Java-projekt.
 - En SSH-klient som Putty. Mer information finns i Ansluta till HDInsight (Apache Hadoop) med hjälp av SSH.
 
Förstå koden
Exempelprogrammet finns på https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, i underkatalogen Producer-Consumer . Om du använder Enterprise Security Package (ESP) aktiverat Kafka-kluster bör du använda programversionen som finns i underkatalogen DomainJoined-Producer-Consumer .
Programmet består främst av fyra filer:
- 
              
pom.xml: Den här filen definierar projektberoenden, Java-version och paketeringsmetoder. - 
              
Producer.java: Den här filen skickar slumpmässiga meningar till Kafka med hjälp av producent-API:et. - 
              
Consumer.java: Den här filen använder konsument-API:et för att läsa data från Kafka och skicka dem till STDOUT. - 
              
AdminClientWrapper.java: Den här filen använder administratörs-API:et för att skapa, beskriva och ta bort Kafka-ämnen. - 
              
Run.java: Kommandoradsgränssnittet som används för att köra producent- och konsumentkoden. 
Pom.xml
Viktiga saker att förstå i pom.xml filen är:
Beroenden: Det här projektet förlitar sig på Kafka-producent- och konsument-API:er som tillhandahålls av
kafka-clientspaketet. Följande XML-kod definierar det här beroendet:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>Posten
${kafka.version}deklareras i<properties>..</properties>avsnittetpom.xmloch är konfigurerad för Kafka-versionen av HDInsight-klustret.Plugin-program: Maven-plugin-program har olika funktioner. I det här projektet används följande plugin-program:
- 
              
maven-compiler-plugin: Används för att ange javaversionen som används av projektet till 8. Det här är den version av Java som används av HDInsight 3.6. - 
              
maven-shade-plugin: Används för att generera en uber-jar som innehåller det här programmet samt eventuella beroenden. Det används också för att ange startpunkten för programmet, så att du kan köra Jar-filen direkt utan att behöva ange huvudklassen. 
- 
              
 
Producer.java
Producenten kommunicerar med Kafka-brokervärdarna (arbetsnoder) och skickar data till ett Kafka-tema. Följande kodfragment kommer från Producer.java-filen från GitHub-lagringsplatsen och visar hur du anger producentegenskaperna. För Enterprise Security-aktiverade kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Konsumenten kommunicerar med Kafka-koordinatorvärdarna (arbetsnoder) och läser poster i en loop. Följande kodfragment från filen Consumer.java anger konsumentegenskaperna. För Enterprise Security-aktiverade kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
I den här koden är konsumenten konfigurerad att läsa från början av ämnet (auto.offset.reset är inställd på earliest.)
Run.java
Filen Run.java innehåller ett kommandoradsgränssnitt som kör antingen producent- eller konsumentkoden. Du måste ange värdinformationen för Kafka-koordinatorn som en parameter. Du kan också inkludera ett grupp-ID-värde som används av konsumentprocessen. Om du skapar flera konsumentinstanser med samma grupp-ID kommer de att dela upp läsningen från ämnet.
Skapa och distribuera exemplet
Använda färdiga JAR-filer
Ladda ned jar-filerna från Azure-exemplet Kafka Kom igång. Om klustret är aktiverat för Enterprise Security Package (ESP) använder du kafka-producer-consumer-esp.jar. Använd kommandot nedan för att kopiera jar-filerna till klustret.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Skapa JAR-filerna från kod
Om du vill hoppa över det här steget kan fördefinierade jar-filer laddas ned från underkatalogen Prebuilt-Jars . Ladda ned kafka-producer-consumer.jar. Om ditt kluster har Enterprise Security Package (ESP) aktiverat, använd kafka-producer-consumer-esp.jar. Kör steg 3 för att kopiera jar-filen till HDInsight-klustret.
Ladda ned och extrahera exemplen från https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Ställ in din nuvarande katalog till platsen för
hdinsight-kafka-java-get-started\Producer-Consumerkatalogen. Om du använder ett Kafka-kluster aktiverat med Enterprise Security Package (ESP) bör du ange platsen somDomainJoined-Producer-Consumersubkatalog. Använd följande kommando för att skapa programmet:mvn clean packageDet här kommandot skapar en katalog med namnet
target, som innehåller en fil med namnetkafka-producer-consumer-1.0-SNAPSHOT.jar. För ESP-kluster kommer filen attkafka-producer-consumer-esp-1.0-SNAPSHOT.jarErsätt
sshusermed SSH-användaren för klustret och ersättCLUSTERNAMEmed namnet på klustret. Ange följande kommando för att kopierakafka-producer-consumer-1.0-SNAPSHOT.jarfilen till HDInsight-klustret. När du uppmanas att ange lösenordet för SSH-användaren.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Kör exemplet
Ersätt
sshusermed SSH-användaren för klustret och ersättCLUSTERNAMEmed namnet på klustret. Öppna en SSH-anslutning till klustret genom att ange följande kommando. Ange lösenordet för SSH-användarkontot om du uppmanas att göra det.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.netOm du vill hämta Kafka-koordinatorvärdarna ersätter du värdena för
<clustername>och<password>i följande kommando och kör det. Använd samma hölje för<clustername>som i Azure-portalen. Ersätt<password>med lösenordet för klusterinloggning och kör sedan:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);Anmärkning
Det här kommandot kräver Ambari-åtkomst. Om klustret ligger bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.
Skapa Kafka-ämne,
myTest, genom att ange följande kommando:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERSOm du vill köra producenten och skriva data till ämnet använder du följande kommando:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERSNär producenten är klar använder du följande kommando för att läsa från ämnet:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jarDe poster som har lästs, tillsammans med antalet poster, visas.
Använd Ctrl + C för att avsluta konsumenten.
Flera konsumenter
Kafka-konsumenter använder en konsumentgrupp när de läser poster. Att använda samma grupp med flera konsumenter resulterar i lastbalanserade läsningar från ett ämne. Varje konsument i gruppen får en del av posterna.
Konsumentprogrammet accepterar en parameter som används som grupp-ID. Följande kommando startar till exempel en konsument med ett grupp-ID för myGroup:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Använd Ctrl + C för att avsluta konsumenten.
Om du vill se hur den här processen fungerar använder du följande kommando:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Det här kommandot använder tmux för att dela upp terminalen i två kolumner. En konsument startas i varje kolumn, med ett och samma värde för grupp-ID. När användarna har läst klart kan du observera att var och en endast läser en del av posterna. Använd Ctrl + C två gånger för att avsluta tmux.
Förbrukning av klienter i samma grupp hanteras via partitionerna för ämnet. I det här kodexemplet har ämnet test som skapades tidigare åtta partitioner. Om du startar åtta konsumenter läser varje konsument poster från en enda partition för ämnet.
Viktigt!
Det kan inte finnas fler konsumentinstanser i en konsumentgrupp än partitioner. I det här exemplet kan en konsumentgrupp innehålla upp till åtta konsumenter eftersom det är antalet partitioner i ämnet. Eller så kan du ha flera konsumentgrupper, var och en med högst åtta konsumenter.
Poster som lagras i Kafka lagras i den ordning de tas emot i en partition. Skapa en konsumentgrupp där antalet konsumentinstanser matchar antalet partitioner för att säkerställa ordnad leverans av poster inom en partition. Skapa en konsumentgrupp med endast en konsumentinstans för att uppnå ordnad leverans av poster i ämnet.
Vanliga problem som har uppstått
Det går inte att skapa ämne Om ditt kluster har Enterprise Security Pack aktiverat använder du de färdigbyggda JAR-filerna för producent och konsument. ESP-jar-filen kan skapas från koden i underkatalogen
DomainJoined-Producer-Consumer. Producent- och konsumentegenskaperna har ytterligare en egenskapCommonClientConfigs.SECURITY_PROTOCOL_CONFIGför kluster som har ESP aktiverat.Fel i ESP-aktiverade kluster: Om produktions- och konsumtionsoperationer misslyckas och du använder ett ESP-aktiverat kluster, kontrollerar du att användaren
kafkafinns i alla Ranger-regler. Om den inte finns, lägg till den i alla Ranger-policyer.
Rensa resurser
Du kan ta bort resursgruppen för att rensa de resurser som skapats av denna handledning. När du tar bort resursgruppen raderas även det kopplade HDInsight-klustret och eventuella andra resurser som är associerade med resursgruppen.
Ta bort en resursgrupp med Azure Portal:
- I Azure Portal expanderar du menyn på vänster sida för att öppna tjänstemenyn och väljer sedan Resursgrupper för att visa listan med dina resursgrupper.
 - Leta reda på den resursgrupp du vill ta bort och högerklicka på knappen Mer (...) till höger om listan.
 - Välj Ta bort resursgrupp och bekräfta.
 
Nästa steg
I det här dokumentet har du lärt dig hur du använder Apache Kafka-producent- och konsument-API:et med Kafka i HDInsight. Använd följande för att lära dig mer om att arbeta med Kafka:
- Använda Kafka REST Proxy
 - Analysera Apache Kafka-loggar