Edit

Share via


Aspire Apache Kafka integration

Includes: Hosting integration included Hosting integration —&— Client integration included Client integration

Apache Kafka is an open-source distributed event streaming platform. It's useful for building real-time data pipelines and streaming applications. The Aspire Apache Kafka integration enables you to connect to existing Kafka instances, or create new instances from .NET with the docker.io/confluentinc/confluent-local container image.

Hosting integration

The Apache Kafka hosting integration models a Kafka server as the KafkaServerResource type. To access this type, install the 📦 Aspire.Hosting.Kafka NuGet package in the AppHost project, then add it with the builder.

dotnet add package Aspire.Hosting.Kafka

For more information, see dotnet add package or Manage package dependencies in .NET applications.

Add Kafka server resource

In your AppHost project, call AddKafka on the builder instance to add a Kafka server resource:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

When Aspire adds a container image to the AppHost, as shown in the preceding example with the docker.io/confluentinc/confluent-local image, it creates a new Kafka server instance on your local machine. A reference to your Kafka server (the kafka variable) is added to the ExampleProject. The Kafka server resource includes default ports

The WithReference method configures a connection in the ExampleProject named "kafka". For more information, see Container resource lifecycle.

Tip

If you'd rather connect to an existing Kafka server, call AddConnectionString instead. For more information, see Reference existing resources.

Add Kafka UI

To add the Kafka UI to the Kafka server resource, call the WithKafkaUI method:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

The Kafka UI is a free, open-source web UI to monitor and manage Apache Kafka clusters. Aspire adds another container image docker.io/kafbat/kafka-ui to the AppHost that runs the Kafka UI.

Change the Kafka UI host port

To change the Kafka UI host port, chain a call to the WithHostPort method:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

The Kafka UI is accessible at http://localhost:9100 in the preceding example.

Add Kafka server resource with data volume

To add a data volume to the Kafka server resource, call the WithDataVolume method on the Kafka server resource:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

The data volume is used to persist the Kafka server data outside the lifecycle of its container. The data volume is mounted at the /var/lib/kafka/data path in the Kafka server container and when a name parameter isn't provided, the name is generated at random. For more information on data volumes and details on why they're preferred over bind mounts, see Docker docs: Volumes.

Add Kafka server resource with data bind mount

To add a data bind mount to the Kafka server resource, call the WithDataBindMount method:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Important

Data bind mounts have limited functionality compared to volumes, which offer better performance, portability, and security, making them more suitable for production environments. However, bind mounts allow direct access and modification of files on the host system, ideal for development and testing where real-time changes are needed.

Data bind mounts rely on the host machine's filesystem to persist the Kafka server data across container restarts. The data bind mount is mounted at the C:\Kafka\Data on Windows (or /Kafka/Data on Unix) path on the host machine in the Kafka server container. For more information on data bind mounts, see Docker docs: Bind mounts.

Hosting integration health checks

The Kafka hosting integration automatically adds a health check for the Kafka server resource. The health check verifies that a Kafka producer with the specified connection name is able to connect and persist a topic to the Kafka server.

The hosting integration relies on the 📦 AspNetCore.HealthChecks.Kafka NuGet package.

Work with larger Kafka clusters

The Aspire Kafka integration deploys a container from the confluentinc/confluent-local image to your local container host. This image provides a simple Apache Kafka cluster that runs in KRaft mode and requires no further configuration. It's ideal for developing and testing producers and consumers. However, this image is for local experimentation only and isn't supported by Confluent. It isn't recommended for production environments and you may require a more robust Kafka cluster for testing and staging. For example, you may require more that one message broker in your cluster.

See the Kafka documentation for details of Kafka cluster setup. Once your engineers have created and configured Kafka, you only need to provide it's location to Aspire by using a connection string.

In the following AppHost code, a local container is used in run mode. At other times, a connection string provides URLs and port numbers for the Kafka brokers:

var kafka = builder.ExecutionContext.IsRunMode
    ? builder.AddKafka("kafka").WithKafkaUI()
    : builder.AddConnectionString("kafka");

Note

For more information about connection strings in Aspire, see Add existing Azure resources with connection strings

The AppHost resolves the connection string from the ConnectionStrings__kafka configuration key. It's possible to set this key by adding the connection string to the AppHost's appsettings.json file but this method stores the connection string in plain text and doesn't protect it. Instead, you can use an environment variable or a user secret to store the connection string:

dotnet user-secrets set "ConnectionStrings:kafka" "kafka-broker-1.contoso.com:9092,kafka-broker-2.contoso.com:9092,kafka-broker-3.contoso.com:9092"

Note

For more information about protecting app secrets, see Safe storage of app secrets in development in ASP.NET Core

Client integration

To get started with the Aspire Apache Kafka integration, install the 📦 Aspire.Confluent.Kafka NuGet package in the client-consuming project, that is, the project for the application that uses the Apache Kafka client.

dotnet add package Aspire.Confluent.Kafka

Add Kafka producer

In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters are used by AddKafkaProducer to create an instance of ProducerBuilder<TKey, TValue>. This method also takes connection name parameter.

builder.AddKafkaProducer<string, string>("kafka");

Tip

The connectionName parameter must match the name used when adding the Kafka resource in the AppHost project. In other words, when you call AddKafka and provide a name of kafka that same name should be used when calling AddKafkaProducer. For more information, see Add Kafka server resource.

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection. For example, to retrieve the producer from an IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

For more information on workers, see Worker services in .NET.

Add Kafka consumer

To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method in the Program.cs file of your client-consuming project. The method takes two generic parameters corresponding to the type of the key and the type of the message to receive from the broker. These generic parameters are used by AddKafkaConsumer to create an instance of ConsumerBuilder<TKey, TValue>. This method also takes connection name parameter.

builder.AddKafkaConsumer<string, string>("kafka");

Tip

The connectionName parameter must match the name used when adding the Kafka resource in the AppHost project. In other words, when you call AddKafka and provide a name of kafka that same name should be used when calling AddKafkaComsumer. For more information, see Add Kafka server resource.

You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection. For example, to retrieve the consumer from an IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Add keyed Kafka producers or consumers

There might be situations where you want to register multiple producer or consumer instances with different connection names. To register keyed Kafka producers or consumers, call the appropriate API:

For more information on keyed services, see .NET dependency injection: Keyed services.

Configuration

The Aspire Apache Kafka integration provides multiple options to configure the connection based on the requirements and conventions of your project.

Use a connection string

When using a connection string from the ConnectionStrings configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer() or builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka");

Then the connection string is retrieved from the ConnectionStrings configuration section:

{
  "ConnectionStrings": {
    "kafka": "broker:9092"
  }
}

The connection string value is set to the BootstrapServers property of the produced IProducer<TKey, TValue> or IConsumer<TKey, TValue> instance. For more information, see BootstrapServers.

Use configuration providers

The Aspire Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration by respectively using the Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer keys. The following snippet is an example of a appsettings.json file that configures some of the options:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

The Config properties of both Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer configuration sections respectively bind to instances of ProducerConfig and ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requires the ClientId property to be set to let the broker track consumed message offsets.

For the complete Kafka client integration JSON schema, see Aspire.Confluent.Kafka/ConfigurationSchema.json.

Use named configuration

The Aspire Apache Kafka integration supports named configuration, which allows you to configure multiple instances of the same resource type with different settings. The named configuration uses the connection name as a key under the main configuration section.

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "kafka1": {
            "DisableHealthChecks": false,
            "Config": {
              "Acks": "All"
            }
          },
          "kafka2": {
            "DisableHealthChecks": true,
            "Config": {
              "Acks": "Leader"
            }
          }
        }
      }
    }
  }
}

In this example, the kafka1 and kafka2 connection names can be used when calling AddKafkaProducer or AddKafkaConsumer:

builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");

Named configuration takes precedence over the top-level configuration. If both are provided, the settings from the named configuration override the top-level settings.

Use inline delegates

There are several inline delegates available to configure various options.

ConfigureKafkaProducerSettings and KafkaConsumerSettings

You can pass the Action<KafkaProducerSettings> configureSettings delegate to set up some or all the options inline, for example to disable health checks from code:

builder.AddKafkaProducer<string, string>(
    "kafka", 
    static settings => settings.DisableHealthChecks = true);

You can configure inline a consumer from code:

builder.AddKafkaConsumer<string, string>(
    "kafka",
    static settings => settings.DisableHealthChecks = true);
Configure ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue>

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>> (or Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "kafka",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

When registering producers and consumers, if you need to access a service registered in the DI container, you can pass an Action<IServiceProvider, ProducerBuilder<TKey, TValue>> or Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> respectively:

Consider the following producer registration example:

builder.AddKafkaProducer<string, MyMessage>(
    "kafka",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

For more information, see ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue> API documentation.

Client integration health checks

By default, Aspire integrations enable health checks for all services. For more information, see Aspire integrations overview.

The Aspire Apache Kafka integration handles the following health check scenarios:

Observability and telemetry

Aspire integrations automatically set up Logging, Tracing, and Metrics configurations, which are sometimes known as the pillars of observability. For more information about integration observability and telemetry, see Aspire integrations overview. Depending on the backing service, some integrations may only support some of these features. For example, some integrations support logging and tracing, but not metrics. Telemetry features can also be disabled using the techniques presented in the Configuration section.

Logging

The Aspire Apache Kafka integration uses the following log categories:

  • Aspire.Confluent.Kafka

Tracing

The Aspire Apache Kafka integration dos not emit distributed traces.

Metrics

The Aspire Apache Kafka integration emits the following metrics using OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received

See also