你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本教程逐步讲解如何将 Spark 应用程序连接到事件中心进行实时流式处理。 此集成使流式处理变得更加便捷,无需更改协议客户端,也无需运行自己的 Kafka 或 Zookeeper 群集。 本教程需要 Apache Spark v2.4+ 和 Apache Kafka v2.0+。
注释
此示例在 GitHub 上可用
本教程中,您将学习如何:
- 创建事件中心命名空间
- 克隆示例项目
- 运行 Spark
- 从用于 Kafka 的事件中心读取
- 写入到用于 Kafka 的事件中心
先决条件
在开始本教程之前,请确保具备:
- Azure 订阅。 如果没有帐户,请创建一个免费帐户。
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
注释
自 Spark v2.4 起,Spark-Kafka 适配器已更新为支持 Kafka v2.0。 在以前版本的 Spark 中,适配器支持 Kafka v0.10 及更高版本,但专门依赖于 Kafka v0.10 API。 由于适用于 Kafka 的事件中心不支持 Kafka v0.10,因此适用于 Kafka 生态系统的事件中心不支持从 v2.4 之前的 Spark 版本 Spark-Kafka 适配器。
创建事件中心命名空间
要从任何事件中心服务发送和接收,必须具备一个事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅 创建事件中心 。 获取事件中心连接字符串和完全限定的域名(FQDN),供以后使用。 有关说明,请参阅 获取事件中心连接字符串。
克隆示例项目
克隆 Azure 事件中心存储库并导航到 tutorials/spark 子文件夹:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
从适用于 Kafka 的事件中心读取
通过一些配置更改,可以开始从 Kafka 的事件中心读取数据。 根据命名空间提供的详细信息更新 BOOTSTRAP_SERVERS 和 EH_SASL 以后,即可使用事件中心进行流式处理,就像使用 Kafka 一样。 有关完整的示例代码,请参阅 GitHub 上的 sparkConsumer.scala 文件。
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
如果您收到类似以下错误的信息,请在spark.readStream调用中添加.option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"),然后重试。
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
写入到用于 Kafka 的事件中心
也可向事件中心写入数据,所用方式与向 Kafka 写入数据一样。 不要忘记更新您的配置,以将来自事件中心命名空间的信息应用于 BOOTSTRAP_SERVERS 和 EH_SASL 。 有关完整的示例代码,请参阅 GitHub 上的 sparkProducer.scala 文件。
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
后续步骤
若要详细了解事件中心和适用于 Kafka 的事件中心,请参阅以下文章: