你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

EventProcessorClient 类

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClient

public class EventProcessorClient

EventProcessorClient 提供了一种便捷机制,用于在使用者组的上下文中使用事件中心的所有分区中的事件。 基于事件处理程序的应用程序包含一个或多个 EventProcessorClient () 这些实例设置为使用同一事件中心的事件,使用者组用于平衡不同实例之间的工作负载,并在处理事件时跟踪进度。 根据运行的实例数,每个 EventProcessorClient 可能拥有零个或多个分区,以平衡所有实例之间的工作负荷。

示例:构造 EventProcessorClient

下面的示例使用内存CheckpointStore中,但 azure-messaging-eventhubs-checkpointstore-blob 提供由Azure Blob 存储支持的检查点存储。 此外, fullyQualifiedNamespace 是事件中心命名空间的主机名。 通过 Azure 门户导航到事件中心命名空间后,它列在“概要”面板下。 使用的凭据是 DefaultAzureCredential 因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。 consumerGroup导航到事件中心实例,并选择“实体”面板下的“使用者组”即可找到 。 consumerGroup 是必需的。 使用的凭据是 DefaultAzureCredential 因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

方法摘要

修饰符和类型 方法和描述
String getIdentifier()

标识符是为此事件处理程序实例提供的唯一名称。

synchronized boolean isRunning()

true如果事件处理器正在运行,则返回 。

synchronized void start()

开始处理此事件处理程序可以拥有的事件中心的所有分区的事件,并为每个分区分配专用 PartitionProcessor 的 。

synchronized void stop()

停止处理此事件处理程序拥有的所有分区的事件。

方法继承自 java.lang.Object

方法详细信息

getIdentifier

public String getIdentifier()

标识符是为此事件处理程序实例提供的唯一名称。

Returns:

此事件处理器的标识符。

isRunning

public synchronized boolean isRunning()

true如果事件处理器正在运行,则返回 。 如果事件处理器已在运行,则调用 start() 不起作用。

Returns:

true 如果事件处理器正在运行,则为 。

start

public synchronized void start()

开始处理此事件处理程序可以拥有的事件中心的所有分区的事件,并为每个分区分配专用 PartitionProcessor 的 。 如果事件中心上的同一使用者组还有其他事件处理器处于活动状态,则它们之间将共同负责分区。

如果此事件处理器已在运行,则将忽略对 start 的后续调用。 调用 后 stop() 调用 start 将重启此事件处理程序。

启动处理器以使用来自所有分区的事件

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop()

停止处理此事件处理程序拥有的所有分区的事件。 所有 PartitionProcessor 资源都将关闭,任何打开的资源都将关闭。

如果事件处理器未运行,将忽略对停止的后续调用。

停止处理器

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

适用于