Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In deze zelfstudie leert u hoe u uw Spark-toepassing verbindt met Event Hubs voor realtime streaming. Dankzij deze integratie maakt u streaming mogelijk zonder dat u uw protocolclients hoeft te wijzigen of uw eigen Kafka- of Zookeeper-clusters hoeft uit te voeren. Voor deze zelfstudie is Apache Spark v2.4+ en Apache Kafka v2.0+ vereist.
Notitie
Dit voorbeeld is beschikbaar op GitHub
In deze zelfstudie leert u het volgende:
- Een Event Hubs-naamruimte maken
- Het voorbeeldproject klonen
- Spark uitvoeren
- Lezen uit Event Hubs voor Kafka
- Schrijven naar Event Hubs voor Kafka
Vereiste voorwaarden
Voordat u aan deze zelfstudie begint, moet u ervoor zorgen dat u het volgende hebt:
- Azure-abonnement. Maak een gratis account aan als u er nog geen hebt, .
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
Notitie
De Spark-Kafka-adapter is bijgewerkt ter ondersteuning van Kafka v2.0 vanaf Spark v2.4. In eerdere versies van Spark ondersteunde de adapter Kafka v0.10 en hoger, maar was specifiek afhankelijk van Kafka v0.10 API's. Omdat Event Hubs voor Kafka geen ondersteuning biedt voor Kafka v0.10, worden de Spark-Kafka-adapters van versies van Spark vóór v2.4 niet ondersteund door Event Hubs voor Kafka-ecosystemen.
Een Event Hubs-naamruimte maken
Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie de instructies in Get an Event Hubs connection string.
Het voorbeeldproject klonen
Kloon de Azure Event Hubs-opslagplaats en navigeer naar de tutorials/spark submap:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Lezen uit Event Hubs voor Kafka
Met enkele configuratiewijzigingen kunt u beginnen met lezen vanuit Event Hubs voor Kafka. Werk BOOTSTRAP_SERVERS en EH_SASL bij met details uit uw naamruimte en u kunt beginnen met streamen met Event Hubs zoals u dat zou doen met Kafka. Zie het bestand sparkConsumer.scala op GitHub voor de volledige voorbeeldcode.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Als u een foutmelding ontvangt die lijkt op de volgende fout, voeg dan .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") toe aan de spark.readStream oproep en probeer het opnieuw.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Schrijven naar Event Hubs voor Kafka
U kunt ook schrijven naar Event Hubs op dezelfde manier als u naar Kafka schrijft. Vergeet niet om uw configuratie bij te werken om BOOTSTRAP_SERVERS te wijzigen en EH_SASL met informatie uit uw Event Hubs-naamruimte. Zie het bestand sparkProducer.scala op GitHub voor de volledige voorbeeldcode.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Volgende stappen
Zie de volgende artikelen voor meer informatie over Event Hubs en Event Hubs voor Kafka: