Dela via


Ansluta ditt Apache Spark-program till Azure Event Hubs

Den här självstudien beskriver hur du ansluter ditt Spark-program till Event Hubs för direktuppspelning i realtid. Den här integreringen möjliggör strömning utan att du behöver ändra protokollklienter eller köra dina egna Kafka- eller Zookeeper-kluster. Den här handledningen kräver Apache Spark v2.4+ och Apache Kafka v2.0+.

Anmärkning

Det här exemplet finns på GitHub

I den här tutorialen lär du dig följande:

  • Skapa ett Event Hubs-namnområde
  • Klona exempelprojektet
  • Kör Spark
  • Läs från Event Hubs för Kafka
  • Skriv till Event Hubs för Kafka

Förutsättningar

Innan du påbörjar den här självstudien kontrollerar du att du har:

Anmärkning

Spark-Kafka-adaptern uppdaterades för att stödja Kafka v2.0 från och med Spark v2.4. I tidigare versioner av Spark stödde adaptern Kafka v0.10 och senare, men förlitade sig specifikt på Api:er för Kafka v0.10. Eftersom Event Hubs för Kafka inte stöder Kafka v0.10 stöds inte Spark-Kafka-korten från versioner av Spark före v2.4 av Event Hubs for Kafka Ecosystems.

Skapa ett Event Hubs-namnområde

En Event Hubs-namnrymd krävs för att skicka och ta emot från Event Hubs-tjänster. Se Skapa en händelsehubb för instruktioner för att skapa ett namnområde och en händelsehubb. Hämta Event Hubs-anslutningssträngen och fullständigt domännamn (FQDN) för senare användning. Anvisningar finns i avsnittet om att hämta en Event Hubs-anslutningssträng.

Klona exempelprojektet

Klona Azure Event Hubs-lagringsplatsen och gå till undermappen tutorials/spark :

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Läs från Event Hubs för Kafka

Med några konfigurationsändringar kan du börja läsa från Event Hubs för Kafka. Uppdatera BOOTSTRAP_SERVERS och EH_SASL med information från ditt namnområde och du kan börja strömma med Event Hubs på samma sätt som med Kafka. Den fullständiga exempelkoden finns i filen sparkConsumer.scala på GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Om du får ett fel som liknar följande fel lägger du till .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") i anropet spark.readStream och försöker igen.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Skriv till Event Hubs för Kafka

Du kan också skriva till Event Hubs på samma sätt som du skriver till Kafka. Glöm inte att uppdatera konfigurationen för att ändra BOOTSTRAP_SERVERS och EH_SASL med information från Event Hubs-namnområdet. Den fullständiga exempelkoden finns i filen sparkProducer.scala på GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Nästa steg

Mer information om Event Hubs och Event Hubs för Kafka finns i följande artiklar: