Dela via


Aspire Apache Kafka integrering

Inkluderar:Värdintegrering ingår Värdintegrering &–& integrering ingår integration

Apache Kafka är en plattform för distribuerad händelseströmning med öppen källkod. Det är användbart för att bygga datapipelines i realtid och strömmande applikationer. Med integreringen AspireApache Kafka kan du ansluta till befintliga Kafka-instanser eller skapa nya instanser från .NET med docker.io/confluentinc/confluent-local containeravbildningen.

Värdintegrering

Apache Kafka värdintegration modellerar en Kafka-server som typen KafkaServerResource. Om du vill komma åt den här typen installerar du 📦Aspire. Hosting.Kafka NuGet-paketet i AppHost-projektet och lägg sedan till det med byggaren.

dotnet add package Aspire.Hosting.Kafka

Mer information finns i dotnet add package eller Hantera paketberoenden i .NET applikationer.

Lägg till Kafka-serverresurs

I Ditt AppHost-projekt anropar du AddKafka instansen builder för att lägga till en Kafka-serverresurs:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

När Aspire en containeravbildning läggs till i AppHost, som du ser i föregående exempel med avbildningen docker.io/confluentinc/confluent-local , skapas en ny Kafka-serverinstans på den lokala datorn. En referens till Kafka-servern (variabeln kafka) läggs till i ExampleProject. Kafka-serverresursen innehåller standardportar

Metoden WithReference konfigurerar en anslutning i ExampleProject med namnet "kafka". Mer information finns i Livscykel för containerresurser.

Tips

Om du hellre vill ansluta till en befintlig Kafka-server anropar du AddConnectionString i stället. Mer information finns i Referera till befintliga resurser.

Lägg till Kafka-användargränssnitt

Om du vill lägga till Kafka-användargränssnittet till Kafka-serverresursen anropar du metoden WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka-användargränssnittet är ett kostnadsfritt webbgränssnitt med öppen källkod för att övervaka och hantera Apache Kafka kluster. Aspire lägger till en annan containeravbildning docker.io/kafbat/kafka-ui i AppHost som kör Kafka-användargränssnittet.

Ändra värdporten för Kafka-användargränssnittet

Om du vill ändra värdporten för Kafka-användargränssnittet kedjar du ett anrop till metoden WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka-användargränssnittet är tillgängligt på http://localhost:9100 i föregående exempel.

Lägga till Kafka-serverresurs med datavolym

Om du vill lägga till en datavolym i Kafka-serverresursen anropar du metoden WithDataVolume på Kafka-serverresursen:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Datavolymen används för att bevara Kafka-serverdata utanför containerns livscykel. Datavolymen monteras på sökvägen /var/lib/kafka/data i Kafka-servercontainern och när en name parameter inte anges, genereras namnet slumpvis. Mer information om datavolymer samt varför de föredras framför bindmountsfinns i dokumentet Docker Volymer.

Lägg till Kafka-serverresurs med databindningspunkt

Om du vill lägga till en databindningsmontering till Kafka-serverresursen anropar du metoden WithDataBindMount:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Viktig

Data bind-mounts har begränsade funktioner jämfört med volymer, som erbjuder bättre prestanda, portabilitet och säkerhet, vilket gör dem mer lämpliga för produktionsmiljöer. Bindningsmonteringar tillåter dock direkt åtkomst och ändring av filer i värdsystemet, perfekt för utveckling och testning där realtidsändringar behövs.

Databindningsmonteringar förlitar sig på värddatorns filsystem för att bevara Kafka-serverdata mellan omstarter av containrar. Databindningspunkten monteras i sökvägen C:\Kafka\Data på Windows (eller /Kafka/Data på Unix) på värddatorn i Kafka-serverns container. Mer information om bindningar av data finns i dokumentationen för Docker: Monteringar av bindningar.

Genomföra hälsokontroller för integrationer

Kafka-hostingintegration lägger automatiskt till en hälsokontroll för Kafka-serverresursen. Hälsokontrollen verifierar att en Kafka-producent med det angivna anslutningsnamnet kan ansluta och bevara ett ämne till Kafka-servern.

Hostningsintegrationen förlitar sig på 📦 AspNetCore.HealthChecks.Kafka NuGet-paketet.

Arbeta med större Kafka-kluster

Aspire Kafka-integreringen distribuerar en container från confluentinc/confluent-local-avbildningen till din lokala containervärd. Den här avbildningen innehåller ett enkelt Apache Kafka kluster som körs i KRaft-läge och som inte kräver någon ytterligare konfiguration. Det är idealiskt för att utveckla och testa producenter och konsumenter. Den här avbildningen är dock endast avsedd för lokala experiment och stöds inte av Confluent. Det rekommenderas inte för produktionsmiljöer och du kan behöva ett mer robust Kafka-kluster för testning och mellanlagring. Du kan till exempel behöva mer än en meddelandebroker i ditt kluster.

Mer information om konfigurationen av Kafka-kluster finns i Kafka-dokumentationen . När dina tekniker har skapat och konfigurerat Kafka behöver du bara tillhandahålla dess plats med hjälp av en anslutningssträng till Aspire.

I följande AppHost-kod används en lokal container i körningsläge. Vid andra tillfällen tillhandahåller en anslutningssträng URL:er och portnummer för Kafka-koordinatorerna:

var kafka = builder.ExecutionContext.IsRunMode
    ? builder.AddKafka("kafka").WithKafkaUI()
    : builder.AddConnectionString("kafka");

Anmärkning

Mer information om anslutningssträngar i Aspire finns i Lägg till befintliga resurser med anslutningssträngar i Azure

AppHost löser anslutningssträngen från konfigurationsnyckeln ConnectionStrings__kafka . Du kan ange den här nyckeln genom att lägga till anslutningssträngen i AppHosts appsettings.json-fil , men den här metoden lagrar anslutningssträngen i oformaterad text och skyddar den inte. I stället kan du använda en miljövariabel eller en användarhemlighet för att lagra anslutningssträngen:

dotnet user-secrets set "ConnectionStrings:kafka" "kafka-broker-1.contoso.com:9092,kafka-broker-2.contoso.com:9092,kafka-broker-3.contoso.com:9092"

Anmärkning

Mer information om hur du skyddar apphemligheter finns i Säker lagring av apphemligheter under utveckling i ASP.NET Core

Client integrering

Kom igång med AspireApache Kafka-integreringen genom att installera 📦Aspire. Confluent.Kafka NuGet-paket i det klientkrävande projektet, det vill säga projektet för det program som använder Apache Kafka-klienten.

dotnet add package Aspire.Confluent.Kafka

Lägg till Kafka-producent

Anropa Program.cs-tilläggsmetoden i AddKafkaProducer-filen för ditt klientprojekt för att registrera en IProducer<TKey, TValue> för användning via beroendeinjektionscontainern. Metoden tar två generiska parametrar som motsvarar typen av nyckel och typen av meddelande som ska skickas till meddelandemäklaren. Dessa allmänna parametrar används av AddKafkaProducer för att skapa en instans av ProducerBuilder<TKey, TValue>. Den här metoden använder även parametern anslutningsnamn.

builder.AddKafkaProducer<string, string>("kafka");

Tips

Parametern connectionName måste matcha namnet som används när du lägger till Kafka-resursen i AppHost-projektet. Med andra ord, när du anropar AddKafka och anger ett namn på kafka ska samma namn användas när du anropar AddKafkaProducer. Mer information finns i Lägg till Kafka-serverresurs.

Du kan sedan hämta IProducer<TKey, TValue>-instansen med hjälp av beroendeinjektion. Om du till exempel vill hämta producenten från en IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Mer information om arbetare finns i Worker-tjänster i .NET.

Lägg till Kafka-konsument

Om du vill registrera en IConsumer<TKey, TValue> för användning via containern för beroendeinjektion anropar du AddKafkaConsumer-tilläggsmetoden i Program.cs-filen för ditt klientprojekt. Metoden tar två generiska parametrar som motsvarar typen av nyckel och typen av meddelande som ska tas emot från mäklaren. Dessa allmänna parametrar används av AddKafkaConsumer för att skapa en instans av ConsumerBuilder<TKey, TValue>. Den här metoden använder även parametern anslutningsnamn.

builder.AddKafkaConsumer<string, string>("kafka");

Tips

Parametern connectionName måste matcha namnet som används när du lägger till Kafka-resursen i AppHost-projektet. Med andra ord, när du anropar AddKafka och anger ett namn på kafka ska samma namn användas när du anropar AddKafkaComsumer. Mer information finns i Lägg till Kafka-serverresurs.

Du kan sedan hämta IConsumer<TKey, TValue>-instansen med hjälp av beroendeinjektion. Om du till exempel vill hämta en konsument från IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Lägg till nyckelade Kafka-producenter eller konsumenter

Det kan finnas situationer där du vill registrera flera producent- eller konsumentinstanser med olika anslutningsnamn. Om du vill registrera nyckelade Kafka-producenter eller konsumenter anropar du lämpligt API:

För mer information om nyckelade tjänster, se .NET beroendeinjektion: Nyckelade tjänster.

Konfiguration

Den AspireApache Kafka integreringen innehåller flera alternativ för att konfigurera anslutningen baserat på kraven och konventionerna i ditt projekt.

Använda en anslutningssträng

När du använder en anslutningssträng från ConnectionStrings-konfigurationsavsnittet kan du ange namnet på anslutningssträngen när du anropar builder.AddKafkaProducer() eller builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka");

Sedan hämtas anslutningssträngen från ConnectionStrings-konfigurationsavsnittet:

{
  "ConnectionStrings": {
    "kafka": "broker:9092"
  }
}

Anslutningssträngsvärdet anges till egenskapen BootstrapServers för den producerade IProducer<TKey, TValue>- eller IConsumer<TKey, TValue>-instansen. Mer information finns i BootstrapServers.

Använda konfigurationsprovidrar

Aspire Apache Kafka-integreringen stöder Microsoft.Extensions.Configuration. Den läser in KafkaProducerSettings eller KafkaConsumerSettings från konfigurationen med hjälp av Aspire:Confluent:Kafka:Producer respektive Aspire.Confluent:Kafka:Consumer nycklar. Följande kodfragment är ett exempel på en appsettings.json fil som konfigurerar några av alternativen:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Egenskaperna för Config för konfigurationsavsnitten både Aspire:Confluent:Kafka:Producer och Aspire.Confluent:Kafka:Consumer binder till instanser av ProducerConfig och ConsumerConfigrespektive.

Confluent.Kafka.Consumer<TKey, TValue> kräver att egenskapen ClientId ställs in för att låta brokern spåra förbrukade meddelandeförskjutningar.

Det fullständiga schemat för Kafka-klientintegrering JSON finns i Aspire. Confluent.Kafka/ConfigurationSchema.json.

Använda namngiven konfiguration

Integreringen AspireApache Kafka stöder namngiven konfiguration, vilket gör att du kan konfigurera flera instanser av samma resurstyp med olika inställningar. Den namngivna konfigurationen använder anslutningsnamnet som en nyckel under huvudkonfigurationsavsnittet.

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "kafka1": {
            "DisableHealthChecks": false,
            "Config": {
              "Acks": "All"
            }
          },
          "kafka2": {
            "DisableHealthChecks": true,
            "Config": {
              "Acks": "Leader"
            }
          }
        }
      }
    }
  }
}

I det här exemplet kan anslutningsnamnen kafka1 och kafka2 användas när du anropar AddKafkaProducer eller AddKafkaConsumer:

builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");

Den namngivna konfigurationen har företräde framför konfigurationen på den översta nivån. Om båda anges åsidosätter inställningarna från den namngivna konfigurationen inställningarna på den översta nivån.

Använd inline-delegater

Det finns flera interna ombud tillgängliga för att konfigurera olika alternativ.

KonfigureraKafkaProducerSettings och KafkaConsumerSettings

Du kan skicka delegaten Action<KafkaProducerSettings> configureSettings för att konfigurera vissa eller alla alternativ direkt i koden, till exempel för att inaktivera hälsokontroller från koden.

builder.AddKafkaProducer<string, string>(
    "kafka", 
    static settings => settings.DisableHealthChecks = true);

Du kan konfigurera en konsument direkt i koden.

builder.AddKafkaConsumer<string, string>(
    "kafka",
    static settings => settings.DisableHealthChecks = true);
Konfigurera ProducerBuilder<TKey, TValue> och ConsumerBuilder<TKey, TValue>

För att konfigurera Confluent.Kafka kompilatorer, skicka en Action<ProducerBuilder<TKey, TValue>> (eller Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "kafka",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

När du registrerar producenter och konsumenter kan du skicka en Action<IServiceProvider, ProducerBuilder<TKey, TValue>> eller Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> om du behöver komma åt en tjänst som är registrerad i DI-containern:

Tänk dig följande exempel på producentregistrering:

builder.AddKafkaProducer<string, MyMessage>(
    "kafka",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Mer information finns i dokumentationen om ProducerBuilder<TKey, TValue> och ConsumerBuilder<TKey, TValue> API.

Client hälsokontroller för integrering

Integreringar aktiverar som standard Aspirehälsokontroller för alla tjänster. Mer information finns i Aspire översikten över integreringar.

Integreringen AspireApache Kafka hanterar följande hälsokontrollscenarier:

Observerbarhet och telemetri

Aspire integreringar konfigurerar automatiskt konfigurationer för loggning, spårning och mått, som ibland kallas grundpelarna för observerbarhet. Mer information om integreringsobservabilitet och telemetri finns i Aspire översikten över integreringar. Beroende på säkerhetskopieringstjänsten kanske vissa integreringar bara stöder vissa av dessa funktioner. Vissa integreringar stöder till exempel loggning och spårning, men inte mått. Telemetrifunktioner kan också inaktiveras med hjälp av de tekniker som visas i avsnittet Configuration.

Skogsavverkning

Aspire Apache Kafka-integreringen använder följande loggkategorier:

  • Aspire.Confluent.Kafka

Spårning

Den AspireApache Kafka integreringen genererar inte distribuerade spårningar.

Mätvärden

Integreringen AspireApache Kafka genererar följande mått med hjälp av OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received

Se även