你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本快速入门介绍了如何使用 Azure.Messaging.EventHubs .NET 库向事件中心发送事件,然后从事件中心接收事件。
注意
快速入门可供你快速了解该服务。 如果已熟悉该服务,可能需要在 GitHub 上的 .NET SDK 存储库中查看事件中心的 .NET 示例: GitHub 上的事件中心示例、 GitHub 上的事件处理器示例。
Prerequisites
如果不熟悉 Azure 事件中心,请在阅读本快速入门之前先参阅 事件中心概述。
若要完成本快速入门,需要具备以下先决条件:
- Microsoft Azure 订阅。 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。 如果没有现成的 Azure 帐户,可以注册免费试用版。
-
Microsoft Visual Studio 2022。 Azure 事件中心客户端库利用 C# 8.0 中引入的新功能。 仍然可以使用之前的 C# 语言版本的库,但新语法不可用。 若要使用完整语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译,并将语言版本设置为
latest。 如果使用 Visual Studio,Visual Studio 2022 之前的版本与生成 C# 8.0 项目所需的工具将不兼容。 可在此处下载 Visual Studio 2022(包括免费的 Community Edition)。 - 创建事件中心命名空间和事件中心。 第一步是使用 Azure 门户在命名空间中创建事件中心命名空间和事件中心。 然后,获取应用程序与事件中心通信所需的管理凭据。 要创建命名空间和事件中心,请参阅 快速入门:使用 Azure 门户创建事件中心。
向 Azure 验证应用
本快速入门介绍了连接到 Azure 事件中心的两种方法:
- 无密码(Microsoft Entra 身份验证)
- 连接字符串
第一个选项演示如何在 Azure Microsoft Entra ID 和基于角色的访问控制(RBAC) 中使用安全主体连接到事件中心命名空间。 无需担心代码、配置文件或安全存储(如 Azure 密钥保管库)中存在硬编码的连接字符串。
第二个选项展示了如何使用 连接字符串 连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。
向 Microsoft Entra 用户分配角色
在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 需要 Azure 事件中心数据所有者 角色来发送和接收消息。 若要将此角色分配给自己,您需要 "User Access Administrator" 角色或其他包含 Microsoft.Authorization/roleAssignments/write 操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅 “了解 Azure RBAC”页的范围 。
以下示例将“Azure Event Hubs Data Owner”角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。
Azure 事件中心的内置 Azure 角色
对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下内置角色,用于授权访问事件中心命名空间:
- Azure 事件中心数据所有者:允许数据访问事件中心命名空间及其实体(队列、主题、订阅和筛选器)。
- Azure 事件中心数据发送者:使用此角色可以提供对事件中心命名空间及其实体的发送者访问权限。
- Azure 事件中心数据接收者:使用此角色可以提供对事件中心命名空间及其实体的接收者访问权限。
如果要创建自定义角色,请参阅事件中心操作所需的权限。
重要说明
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中,使用主搜索栏或左侧导航栏找到你的事件中心命名空间。
在概述页面上,从左侧菜单中选择“访问控制(IAM)”。
在“访问控制(IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。
使用搜索框筛选结果来找到所需角色。 对于此示例,请搜索
Azure Event Hubs Data Owner并选择匹配的结果。 然后选择“下一步”。在“ 分配访问权限”下,选择“ 用户”、“组”或服务主体。 然后选择 “+ 选择成员”。
在对话框中,搜索Microsoft Entra 用户名(通常是 user@domain 电子邮件地址)。 选择对话框底部的 “选择 ”。
选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。
启动 Visual Studio 并登录到 Azure
可以使用以下步骤授权对服务总线命名空间的访问:
启动 Visual Studio。 如果看到“入门”窗口,请在右窗格中选择“在不使用代码的情况下继续”链接。
选择 Visual Studio 右上角的“登录”按钮。
使用你之前为其分配角色的 Microsoft Entra 帐户登录。
向事件中心发送事件
本部分介绍了如何创建 .NET Core 控制台应用程序以向你创建的事件中心发送事件。
创建控制台应用程序
如果 Visual Studio 2022 已打开,请依次选择菜单上的“文件”、“新建”、“项目”。 否则,如果看到弹出窗口,请启动 Visual Studio 2022 并选择“新建项目”。
在“创建新项目”对话框中,执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。
选择“C#”作为编程语言。
选择“控制台”作为应用程序类型。
从结果列表中选择“控制台应用程序”。
然后,选择“下一步”。
输入 EventHubsSender 作为项目名称、EventHubsQuickStart 作为解决方案名称,然后选择“下一步”。
在“其他信息”页上,选择“创建”。
向项目添加 NuGet 包
在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”。
运行以下命令以安装 Azure.Messaging.EventHubs 和 Azure.Identity NuGet 包。 按 ENTER 运行第二个命令。
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity
编写代码以将事件发送到事件中心
将
Program.cs文件中的现有代码替换为以下示例代码。 然后,将EventHubProducerClient参数的<EVENT_HUB_NAMESPACE>和<HUB_NAME>占位符值替换为事件中心命名空间和事件中心的名称。 例如"spehubns0309.servicebus.windows.net"和"spehub"。下面是代码中的重要步骤:
- 使用命名空间和事件中心名称创建 EventHubProducerClient 对象。
- 对 EventHubProducerClient 对象调用 CreateBatchAsync 方法,创建一个 EventDataBatch 对象。
- 使用 EventDataBatch.TryAdd 方法将事件添加到批处理中。
- 使用 EventHubProducerClient.SendAsync 方法将这批消息发送到事件中心。
using Azure.Identity; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using System.Text; // number of events to be sent to the event hub int numOfEvents = 3; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. // TODO: Replace the <EVENT_HUB_NAMESPACE> and <HUB_NAME> placeholder values EventHubProducerClient producerClient = new EventHubProducerClient( "<EVENT_HUB_NAMESPACE>.servicebus.windows.net", "<HUB_NAME>", new DefaultAzureCredential()); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); for (int i = 1; i <= numOfEvents; i++) { if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event {i}")))) { // if it is too large for the batch throw new Exception($"Event {i} is too large for the batch and cannot be sent."); } } try { // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine($"A batch of {numOfEvents} events has been published."); Console.ReadLine(); } finally { await producerClient.DisposeAsync(); }
生成项目并确保没有错误。
运行程序并等待出现确认消息。
A batch of 3 events has been published.注意
如果使用 Microsoft Entra 身份验证时出现错误“InvalidIssuer:令牌颁发者无效”,则可能是因为使用了错误的 Microsoft Entra 租户 ID。 在代码中,将“new DefaultAzureCredential()”替换为“new DefaultAzureCredential(new DefaultAzureCredentialOptions {TenantId = ”xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx“})以显式指定Microsoft Entra Tenant ID。
重要说明
如果使用无密码(Microsoft Entra 的基于角色的访问控制)身份验证,请选择 “工具”,然后选择“ 选项”。 在“选项”窗口中,展开“Azure 服务身份验证”,然后选择“帐户选择”。 请确认你正在使用已添加到该事件中心命名空间上的 Azure 事件中心数据所有者角色的帐户。
在 Azure 门户中 的事件中心命名空间 页上,“ 消息 ”图表中会显示三条传入消息。 根据需要刷新页面以更新图表。 可能需要在几秒钟后才会显示已收到消息。
注意
有关包含更详细注释的完整源代码,请参阅 GitHub 上的此文件
从事件中心接收事件
本部分介绍如何编写一个使用事件处理程序从事件中心接收事件的 .NET Core 控制台应用程序。 事件处理器简化了从事件中心接收事件的过程。
创建 Azure 存储帐户和 Blob 容器
本快速入门使用 Azure 存储作为检查点存储。 按照以下步骤创建 Azure 存储帐户。
- 创建 Azure 存储帐户
- 创建一个 blob 容器
- 使用 Microsoft Entra ID(无密码)身份验证或命名空间的连接字符串对 blob 容器进行身份验证。
使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:
- 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
- 不要将存储帐户用于任何其他用途。
- 不要将容器用于任何其他用途。
- 在部署的应用程序所在的同一区域中创建存储帐户。 如果应用程序位于本地,请尝试选择最近的区域。
在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。
- 分层命名空间
- Blob 软删除
- 版本控制
在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需要具备存储 Blob 数据参与者角色才能读写 Blob 数据。 若要为自己分配此角色,需要拥有 “用户访问管理员” 角色,或拥有另一个包含 Microsoft.Authorization/roleAssignments/write 操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅了解 Azure RBAC 的范围。
在此方案中,将权限分配给用户帐户(作用域为存储帐户)以遵循 最低特权原则。 这种做法只为用户提供所需的最低权限,并创建更安全的生产环境。
以下示例将 存储 Blob 数据参与者 角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。
重要说明
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中,使用主搜索栏或左侧导航找到你的存储帐户。
在存储帐户页上,从左侧菜单中选择 “访问控制”(IAM )。
在“访问控制(IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。
使用搜索框筛选结果来找到所需角色。 对于此示例,搜索“存储 Blob 数据参与者”。 选择匹配的结果,然后选择 “下一步”。
在“将访问权限分配到”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索 Microsoft Entra 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。
选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。
为接收器创建项目
- 在“解决方案资源管理器”窗口中,右键单击“EventHubQuickStart”解决方案,指向“添加”,然后选择“新建项目”。
- 依次选择“控制台应用程序”、“下一步”。
- 输入 EventHubsReceiver 作为“项目名称”,然后选择“创建”。
- 在“解决方案资源管理器”窗口中,右键单击“EventHubsReceiver”并选择“设为启动项目”。
向项目添加 NuGet 包
在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”。
在“包管理器控制台”窗口中,确认为“默认项目”选择了“EventHubsReceiver”。 如果不是,请使用下拉列表选择“EventHubsReceiver”。
运行以下命令,以安装 Azure.Messaging.EventHubs 和 Azure.Identity NuGet 包。 按 ENTER 运行最新命令。
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity
更新代码
将 Program.cs 的内容替换为以下代码:
将
Program.cs文件中的现有代码替换为以下示例代码。 然后,替换BlobContainerClientURI 的<STORAGE_ACCOUNT_NAME>和<BLOB_CONTAINER_NAME>占位符值。 同时替换EventProcessorClient的<EVENT_HUB_NAMESPACE>和<HUB_NAME>占位符值。下面是代码中的重要步骤:
- 使用事件中心命名空间和事件中心名称创建 EventProcessorClient 对象。 需要在先前创建的 Azure 存储中为容器生成 BlobContainerClient 对象。
- 为 EventProcessorClient 对象的 ProcessEventAsync 和 ProcessErrorAsync 事件指定处理程序。
- 通过对 EventProcessorClient 对象调用 StartProcessingAsync,开始处理事件。
- 通过在 EventProcessorClient 对象上调用 StopProcessingAsync 在 30 秒后停止处理事件。
using Azure.Identity; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using System.Text; // Create a blob container client that the event processor will use // TODO: Replace <STORAGE_ACCOUNT_NAME> and <BLOB_CONTAINER_NAME> with actual names BlobContainerClient storageClient = new BlobContainerClient( new Uri("https://<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<BLOB_CONTAINER_NAME>"), new DefaultAzureCredential()); // Create an event processor client to process events in the event hub // TODO: Replace the <EVENT_HUBS_NAMESPACE> and <HUB_NAME> placeholder values var processor = new EventProcessorClient( storageClient, EventHubConsumerClient.DefaultConsumerGroupName, "<EVENT_HUB_NAMESPACE>.servicebus.windows.net", "<HUB_NAME>", new DefaultAzureCredential()); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); Console.ReadLine(); return Task.CompletedTask; } Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); Console.ReadLine(); return Task.CompletedTask; }
生成项目并确保没有错误。
注意
有关包含更详细注释的完整源代码,请参阅 GitHub 上的此文件。
运行接收器应用程序。
应会看到一条消息,指出已收到事件。 看到收到的事件消息后按 Enter。
Received event: Event 1 Received event: Event 2 Received event: Event 3这些事件是前面通过运行发送器程序发送到事件中心的三个事件。
在 Azure 门户,可以验证事件中心向接收应用程序发送的三封传出的邮件。 刷新页面以更新图表。 可能需要在几秒钟后才会显示已收到消息。
基于事件中心 SDK 的应用程序的架构验证
使用基于事件中心 SDK 的应用程序流式传输数据时,可以使用 Azure 架构注册表来执行架构验证。 事件中心的 Azure 架构注册表提供了一个用于管理架构的集中式存储库,你可以将新的或现有的应用程序与架构注册表无缝连接。
若要了解详细信息,请参阅使用事件中心 SDK 验证架构。
示例和参考
本快速入门分步介绍了如何实现场景:向事件中心发送一批事件,然后接收这些事件。 有关更多示例,请选择以下链接。
有关完整的 .NET 库参考,请参阅 SDK 文档。
清理资源
删除具有事件中心命名空间的资源组,或仅删除命名空间(如果要保留资源组)。
相关内容
参阅以下教程: