包含: -
Client 集成
Apache Kafka 是开源分布式事件流式处理平台。 它可用于生成实时数据管道和流式处理应用程序。
Aspire
Apache Kafka 集成使你能够连接到现有的 Kafka 实例,或使用 .NET从 docker.io/confluentinc/confluent-local 创建新实例。
托管集成
Apache Kafka 托管集成将 Kafka 服务器建模为 KafkaServerResource 类型。 若要访问此类型,请安装 .📦AspireAppHost 项目中的 Hosting.Kafka NuGet 包,然后将其添加到生成器中。
dotnet add package Aspire.Hosting.Kafka
有关详细信息,请参阅 dotnet add package 或 在.NET应用程序中管理包依赖项。
添加 Kafka 服务器资源
在 AppHost 项目中,调用AddKafkabuilder实例以添加 Kafka 服务器资源:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
当Aspire将容器镜像添加到AppHost时,如前面的示例docker.io/confluentinc/confluent-local所示,会在本地计算机上创建新的 Kafka 服务器实例。 对 Kafka 服务器的引用(kafka 变量)已被添加到 ExampleProject。 Kafka 服务器资源包括默认端口
WithReference 方法在 ExampleProject 中配置一个名为 "kafka"的连接。 有关详细信息,请参阅 容器资源生命周期。
提示
如果想要连接到现有的 Kafka 服务器,请改为调用 AddConnectionString。 有关详细信息,请参阅 引用现有资源。
添加 Kafka UI
若要将 Kafka UI 添加到 Kafka 服务器资源,请调用 WithKafkaUI 以下方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI 是一个免费的开源 Web UI,用于监视和管理 Apache Kafka 群集。
Aspire 将另一个容器映像 docker.io/kafbat/kafka-ui 添加到运行 Kafka UI 的 AppHost。
更改 Kafka UI 主机端口
若要更改 Kafka UI 主机端口,请链式调用 WithHostPort 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
在前面的示例中,Kafka UI 可以在 http://localhost:9100 访问。
添加包含数据卷的 Kafka 服务器资源
若要向 Kafka 服务器资源添加数据卷,请在 Kafka 服务器资源上调用 WithDataVolume 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
数据卷用于在容器的生命周期之外保留 Kafka 服务器数据。 数据卷装载在 Kafka 服务器容器中的 /var/lib/kafka/data 路径上,如果未提供 name 参数,则会随机生成名称。 有关数据卷以及它们为何优于绑定挂载的更多信息,请参阅Docker 文档:卷。
使用数据绑定挂载添加 Kafka 服务器资源
若要将数据绑定挂载添加到 Kafka 服务器资源,请调用 WithDataBindMount 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
重要
与卷相比,数据 绑定装载 的功能有限,这些 卷提供更好的性能、可移植性和安全性,使它们更适用于生产环境。 但是,绑定装载允许直接访问和修改主机系统上的文件,非常适合在需要实时更改的情况下进行开发和测试。
数据绑定挂载依赖于主机的文件系统,以在容器重启时持久化 Kafka 服务器数据。 数据绑定装载安装在主机上的 Kafka 服务器容器中,路径为 Windows 的 C:\Kafka\Data(或 /Kafka/Data上的 Unix)。 有关数据绑定装载的详细信息,请参阅 Docker 文档:绑定装载。
托管集成运行状况检查
Kafka 托管集成会自动为 Kafka 服务器资源添加运行状况检查。 运行状况检查验证具有指定连接名称的 Kafka 生成者是否能够连接到 Kafka 服务器并将主题持久化到 Kafka 服务器。
托管集成依赖于 📦 AspNetCore.HealthChecks.Kafka NuGet 包。
使用大型 Kafka 集群
Aspire Kafka 集成将容器从 confluentinc/confluent-local 映像部署到本地容器主机。 此映像提供在 Apache Kafka运行的简单群集,无需进一步配置。 它非常适合开发和测试生产者和消费者。 但是,此映像仅适用于本地试验,Confluent 不支持此映像。 不建议用于生产环境,可能需要更可靠的 Kafka 群集进行测试和暂存。 例如,可能需要群集中的多个消息中转站。
有关 Kafka 群集设置的详细信息,请参阅 Kafka 文档。 您的工程师创建并配置 Kafka 后,只需使用连接字符串向Aspire提供它的位置。
在以下 AppHost 代码中,本地容器在运行模式下使用。 在其他情况下,连接字符串为 Kafka 中转站提供 URL 和端口号:
var kafka = builder.ExecutionContext.IsRunMode
? builder.AddKafka("kafka").WithKafkaUI()
: builder.AddConnectionString("kafka");
注释
有关连接字符串 Aspire的详细信息,请参阅 使用连接字符串添加现有 Azure 资源
AppHost 从 ConnectionStrings__kafka 配置密钥中解析出连接字符串。 可以通过将连接字符串添加到 AppHost 的 appsettings.json 文件来设置此密钥,但此方法以纯文本存储连接字符串,并且不会保护它。 相反,可以使用环境变量或用户密码来存储连接字符串:
dotnet user-secrets set "ConnectionStrings:kafka" "kafka-broker-1.contoso.com:9092,kafka-broker-2.contoso.com:9092,kafka-broker-3.contoso.com:9092"
注释
有关保护应用机密的详细信息,请参阅开发ASP.NET Core中的应用机密的安全存储
Client 集成
若要开始 AspireApache Kafka 集成,请在为使用 📦 客户端的应用程序服务的项目中安装 Aspire NuGet 包。
dotnet add package Aspire.Confluent.Kafka
添加 Kafka 生成者
在客户端项目的 Program.cs 文件中,调用 AddKafkaProducer 扩展方法注册 IProducer<TKey, TValue>,以便通过依赖注入容器使用。 该方法采用两个对应于键类型的泛型参数和要发送到中转站的消息的类型。
AddKafkaProducer 使用这些泛型参数来创建 ProducerBuilder<TKey, TValue>实例。 此方法还采用连接名称参数。
builder.AddKafkaProducer<string, string>("kafka");
提示
参数 connectionName 必须与在 AppHost 项目中添加 Kafka 资源时使用的名称匹配。 换句话说,当你调用 AddKafka 并提供 kafka 的名称时,应该在调用 AddKafkaProducer时使用相同的名称。 有关详细信息,请参阅 添加 Kafka 服务器资源。
然后,可以使用依赖项注入检索 IProducer<TKey, TValue> 实例。 例如,若要从 IHostedService检索生产者:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
有关工人的详细信息,请参阅
添加 Kafka 使用者
若要注册 IConsumer<TKey, TValue> 以通过依赖项注入容器使用,请在客户端使用项目的 AddKafkaConsumer 文件中调用 Program.cs 扩展方法。 该方法采用两个对应于键类型的泛型参数,以及要从中转站接收的消息的类型。
AddKafkaConsumer 使用这些泛型参数来创建 ConsumerBuilder<TKey, TValue>实例。 此方法还采用连接名称参数。
builder.AddKafkaConsumer<string, string>("kafka");
提示
参数 connectionName 必须与在 AppHost 项目中添加 Kafka 资源时使用的名称匹配。 换句话说,当你调用 AddKafka 并提供 kafka 的名称时,应该在调用 AddKafkaComsumer时使用相同的名称。 有关详细信息,请参阅 添加 Kafka 服务器资源。
然后,可以使用依赖项注入检索 IConsumer<TKey, TValue> 实例。 例如,若要从 IHostedService检索使用者:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
添加关键 Kafka 制作者或使用者
在某些情况下,你可能想要使用不同的连接名称注册多个生成者或使用者实例。 若要注册密钥 Kafka 生成者或使用者,请调用相应的 API:
- AddKeyedKafkaProducer:注册带键的 Kafka 生产者。
- AddKeyedKafkaConsumer:注册关键 Kafka 使用者。
有关密钥服务的详细信息,请参阅 .NET 依赖关系注入:密钥服务。
配置
Aspire Apache Kafka 集成提供了多个选项,用于根据项目的要求和约定配置连接。
使用连接字符串
使用 ConnectionStrings 配置部分中的连接字符串时,可以在调用 builder.AddKafkaProducer() 或 builder.AddKafkaProducer()时提供连接字符串的名称:
builder.AddKafkaProducer<string, string>("kafka");
然后,从 ConnectionStrings 配置部分检索连接字符串:
{
"ConnectionStrings": {
"kafka": "broker:9092"
}
}
连接字符串值设置为生成的 BootstrapServers 或 IProducer<TKey, TValue> 实例的 IConsumer<TKey, TValue> 属性。 有关详细信息,请参阅 BootstrapServers。
使用配置供应商
Aspire
Apache Kafka 集成支持 Microsoft.Extensions.Configuration。 它分别使用 KafkaProducerSettings 和 KafkaConsumerSettings 键从配置加载 Aspire:Confluent:Kafka:Producer 和 Aspire.Confluent:Kafka:Consumer。 以下代码片段是一个 appsettings.json 文件的示例,用于配置某些选项:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Config 和 Aspire:Confluent:Kafka:Producer 配置部分的 Aspire.Confluent:Kafka:Consumer 属性分别绑定到 ProducerConfig 和 ConsumerConfig实例。
Confluent.Kafka.Consumer<TKey, TValue> 要求将 ClientId 属性设置为允许中转站跟踪已使用的消息偏移量。
有关完整的 Kafka 客户端集成 JSON 架构,请参阅 Aspire。Confluent.Kafka/ConfigurationSchema.json。
使用命名配置
集成 AspireApache Kafka 支持命名配置,通过该配置,可以使用不同的设置来配置同一资源类型的多个实例。 命名配置使用连接名称作为主配置部分下的密钥。
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"kafka1": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
},
"kafka2": {
"DisableHealthChecks": true,
"Config": {
"Acks": "Leader"
}
}
}
}
}
}
}
在此示例中,调用kafka1或kafka2时,可以使用AddKafkaProducer和AddKafkaConsumer连接名称。
builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");
命名配置优先于顶级配置。 如果同时提供这两种设置,则命名配置中的设置将替代顶级设置。
使用内联委托
有多个内联代理可用于配置各种选项。
配置KafkaProducerSettings 和 KafkaConsumerSettings
可以传递 Action<KafkaProducerSettings> configureSettings 委托来设置一些或所有内联选项,例如禁用代码中的运行状况检查:
builder.AddKafkaProducer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);
您可以从代码中直接配置消费者:
builder.AddKafkaConsumer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);
配置 ProducerBuilder<TKey, TValue> 和 ConsumerBuilder<TKey, TValue>
若要配置 Confluent.Kafka 生成器,请传递 Action<ProducerBuilder<TKey, TValue>>(或 Action<ConsumerBuilder<TKey, TValue>>):
builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
注册生产者和消费者时,如果需要访问在 DI 容器中注册的服务,可以分别传递 Action<IServiceProvider, ProducerBuilder<TKey, TValue>> 或 Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
请考虑以下生产者注册示例:
builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
有关详细信息,请参阅 ProducerBuilder<TKey, TValue> 和 ConsumerBuilder<TKey, TValue> API 文档。
Client 集成健康检查
默认情况下,Aspire 集成为所有服务启用 健康检查。 有关详细信息,请参阅 Aspire 集成概述。
Aspire Apache Kafka 集成负责处理以下健康检查场景:
- 添加
Aspire.Confluent.Kafka.Producer运行状况检查,当 KafkaProducerSettings.DisableHealthChecks 是false时。 - 添加
Aspire.Confluent.Kafka.Consumer运行状况检查,当 KafkaConsumerSettings.DisableHealthChecks 是false时。 - 与
/healthHTTP 终结点集成,该终结点指定所有已注册的健康检查都必须通过,才能让应用被视为准备好接受流量。
可观测性和遥测
Aspire 集成会自动设置日志记录、跟踪和指标配置,这些配置有时称为 可观测性支柱。 有关集成可观测性和遥测的详细信息,请参阅 Aspire 集成概述。 根据支持服务,某些集成可能仅支持其中一些功能。 例如,某些集成支持日志记录和跟踪,但不支持指标。 还可以使用 “配置” 部分中介绍的技术禁用遥测功能。
伐木
Aspire Apache Kafka 集成使用以下日志类别:
Aspire.Confluent.Kafka
追踪
Aspire Apache Kafka 集成不生成分布式追踪。
指标
Aspire Apache Kafka 集成通过 OpenTelemetry 发出以下指标:
Aspire.Confluent.Kafkamessaging.kafka.network.txmessaging.kafka.network.transmittedmessaging.kafka.network.rxmessaging.kafka.network.receivedmessaging.publish.messagesmessaging.kafka.message.transmittedmessaging.receive.messagesmessaging.kafka.message.received