Delen via


Uw Apache Spark-toepassing verbinden met Azure Event Hubs

In deze zelfstudie leert u hoe u uw Spark-toepassing verbindt met Event Hubs voor realtime streaming. Dankzij deze integratie maakt u streaming mogelijk zonder dat u uw protocolclients hoeft te wijzigen of uw eigen Kafka- of Zookeeper-clusters hoeft uit te voeren. Voor deze zelfstudie is Apache Spark v2.4+ en Apache Kafka v2.0+ vereist.

Notitie

Dit voorbeeld is beschikbaar op GitHub

In deze zelfstudie leert u het volgende:

  • Een Event Hubs-naamruimte maken
  • Het voorbeeldproject klonen
  • Spark uitvoeren
  • Lezen uit Event Hubs voor Kafka
  • Schrijven naar Event Hubs voor Kafka

Vereiste voorwaarden

Voordat u aan deze zelfstudie begint, moet u ervoor zorgen dat u het volgende hebt:

Notitie

De Spark-Kafka-adapter is bijgewerkt ter ondersteuning van Kafka v2.0 vanaf Spark v2.4. In eerdere versies van Spark ondersteunde de adapter Kafka v0.10 en hoger, maar was specifiek afhankelijk van Kafka v0.10 API's. Omdat Event Hubs voor Kafka geen ondersteuning biedt voor Kafka v0.10, worden de Spark-Kafka-adapters van versies van Spark vóór v2.4 niet ondersteund door Event Hubs voor Kafka-ecosystemen.

Een Event Hubs-naamruimte maken

Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie de instructies in Get an Event Hubs connection string.

Het voorbeeldproject klonen

Kloon de Azure Event Hubs-opslagplaats en navigeer naar de tutorials/spark submap:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Lezen uit Event Hubs voor Kafka

Met enkele configuratiewijzigingen kunt u beginnen met lezen vanuit Event Hubs voor Kafka. Werk BOOTSTRAP_SERVERS en EH_SASL bij met details uit uw naamruimte en u kunt beginnen met streamen met Event Hubs zoals u dat zou doen met Kafka. Zie het bestand sparkConsumer.scala op GitHub voor de volledige voorbeeldcode.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Als u een foutmelding ontvangt die lijkt op de volgende fout, voeg dan .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") toe aan de spark.readStream oproep en probeer het opnieuw.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Schrijven naar Event Hubs voor Kafka

U kunt ook schrijven naar Event Hubs op dezelfde manier als u naar Kafka schrijft. Vergeet niet om uw configuratie bij te werken om BOOTSTRAP_SERVERS te wijzigen en EH_SASL met informatie uit uw Event Hubs-naamruimte. Zie het bestand sparkProducer.scala op GitHub voor de volledige voorbeeldcode.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Volgende stappen

Zie de volgende artikelen voor meer informatie over Event Hubs en Event Hubs voor Kafka: