Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Apache Kafka är en distribuerad strömningsplattform för att skapa strömmande datapipelines i realtid som på ett tillförlitligt sätt flyttar data mellan system eller program. Kafka Connect är ett verktyg för skalbar och tillförlitlig strömning av data mellan Apache Kafka och andra datasystem. Kusto Kafka-sink fungerar som kopplingen från Kafka och kräver inte användning av kod. Ladda ned sink connector jar-filen från Git-lagringsplatsen eller Confluent Connector Hub.
Den här artikeln visar hur du matar in data med Kafka med hjälp av en fristående Docker-konfiguration för att förenkla konfigurationen av Kafka-klustret och Kafka-anslutningsklustret.
Mer information finns i Git-lagringsplatsen och versionsspecifika detaljer.
Förutsättningar
- Ett Azure-abonnemang. Skapa ett kostnadsfritt Azure-konto.
 - Ett Azure Data Explorer-kluster och en databas med standardprinciperna för cachelagring och kvarhållning.
 - Azure CLI.
 - Docker och Docker Compose.
 
Skapa ett huvudnamn för Microsoft Entra-tjänsten
Microsoft Entra-tjänstens huvudnamn kan skapas via Azure-portalen eller programmässigt, som i följande exempel.
Tjänstens huvudnamn är den identitet som används av anslutningsappen för att skriva data i tabellen i Kusto. Du beviljar behörigheter för tjänstens behörighetshanterare för åtkomst till Kusto-resurser.
Logga in på din Azure-prenumeration via Azure CLI. Autentisera sedan i webbläsaren.
az loginVälj den prenumeration som ska vara värd för huvudkontot. Det här steget behövs när du har flera prenumerationer.
az account set --subscription YOUR_SUBSCRIPTION_GUIDSkapa tjänstens huvudnamn. I det här exemplet kallas
my-service-principaltjänstens huvudnamn .az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}Från de returnerade JSON-data kopierar du
appId,passwordochtenantför framtida användning.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Du har skapat ditt Microsoft Entra-program och tjänstens huvudnamn.
Skapa en måltabell
Skapa en tabell med
Stormsnamnet från frågemiljön med följande kommando:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)Skapa motsvarande tabellmappning
Storms_CSV_Mappingför inmatade data med hjälp av följande kommando:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'Skapa en inmatningsbatchprincip i tabellen för konfigurerbar svarstid för inmatning i kö.
Tips/Råd
Inmatningsbatchprincipen är en prestandaoptimerare och innehåller tre parametrar. Det första villkoret som uppfylls utlöser inmatning i tabellen.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'Använd tjänstens huvudnamn från Skapa ett Microsoft Entra-tjänsthuvudnamn för att bevilja behörighet att arbeta med databasen.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Kör labbet
Följande labb är utformat för att ge dig erfarenhet av att börja skapa data, konfigurera Kafka-anslutningsappen och strömma dessa data. Du kan sedan titta på den insamlade datan.
Klona git-lagringsplatsen
Klona labbets git-lagringsplats.
Skapa en lokal katalog på datorn.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-holKlona arkivet.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Innehållet i den klonade lagringsplatsen
Kör följande kommando för att visa innehållet i den klonade lagringsplatsen:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Det här resultatet av den här sökningen är:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go
Granska filerna på den klonade lagringsplatsen
I följande avsnitt förklaras de viktiga delarna av filerna i filträdet.
adx-sink-config.json
Den här filen innehåller Kusto mottagaregenskapsfilen där du uppdaterar specifika konfigurationsdetaljer.
{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}
Ersätt värdena för följande attribut enligt konfigurationen: aad.auth.authority, aad.auth.appid, , aad.auth.appkeykusto.tables.topics.mapping(databasnamnet), kusto.ingestion.urloch kusto.query.url.
Anslutningsprogram – Dockerfile
Den här filen har kommandon för att generera docker-avbildningen för anslutningsinstansen. Den innehåller nedladdningen av anslutningsappen från git-lagringsplatsens versionskatalog.
Stormeventsproducentkatalog
Den här katalogen har ett Go-program som läser en lokal "StormEvents.csv" -fil och publicerar data till ett Kafka-ämne.
docker-compose.yaml
version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv
Starta containrarna
Starta containrarna i en terminal:
docker-compose upProducentprogrammet börjar att skicka händelser till ämnet
storm-events. Du bör se loggar som liknar följande loggar:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....Kontrollera loggarna genom att köra följande kommando i en separat terminal:
docker-compose logs -f | grep kusto-connect
Starta anslutningsappen
Använd ett Kafka Connect REST-anrop för att starta anslutningsappen.
Starta sink-uppgiften i en separat terminal med följande kommando:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectorsKontrollera statusen genom att köra följande kommando i en separat terminal:
curl http://localhost:8083/connectors/storm/status
Anslutningsappen börjar köa inmatningsprocesser.
Anmärkning
Om du har problem med logganslutningarna, rapportera ett problem.
Hanterad identitet
Som standard använder Kafka-anslutningen applikationsmetoden för autentisering under inmatning. Så här autentiserar du med hanterad identitet:
Tilldela klustret en hanterad identitet och ge ditt lagringskonto läsbehörighet. Mer information finns i Mata in data med hanterad identitetsautentisering.
I din adx-sink-config.json-fil anger du
aad.auth.strategytillmanaged_identityoch ser till attaad.auth.appidär inställd på managed identity-klient-ID:t för applikationen.Använd en tjänsttoken för privata instanser i stället för tjänstens huvudnamn för Microsoft Entra.
Anmärkning
När du använder en hanterad identitet appId och tenant härleds från samtalswebbplatsens kontext och password behövs inte.
Fråga efter och granska data
Bekräfta datainmatning
När data har anlänt till
Stormstabellen bekräftar du dataöverföringen genom att kontrollera radantalet:Storms | countBekräfta att det inte finns några fel i inmatningsprocessen:
.show ingestion failuresNär du ser data kan du prova några frågor.
Fråga efter data
Kör följande fråga för att se alla poster:
Storms | take 10Använd
whereochprojectför att filtrera specifika data:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventIdAnvänd operatorn
summarize:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
              
              
            
Fler frågeexempel och vägledning finns i Skriva frågor i KQL - och Kusto Query Language-dokumentationen.
Återställ
Gör följande för att återställa:
- Stoppa containrarna (
docker-compose down -v) - Ta bort (
drop table Storms) - 
              
StormsÅterskapa tabellen - Återskapa tabellmappning
 - Starta om containrar (
docker-compose up) 
Rensa resurser
Om du vill ta bort Azure Data Explorer-resurserna använder du az kusto cluster delete (kusto extension) eller az kusto database delete (kusto extension):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
Du kan också ta bort klustret och databasen via Azure-portalen. Mer information finns i Ta bort ett Azure Data Explorer-kluster och Ta bort en databas i Azure Data Explorer.
Justera Kafka Sink-anslutningsappen
Anpassa Kafka Sink-anslutningen så att den fungerar med batchningspolicy för inmatning:
- Justera storleksgränsen för Kafka-sink 
flush.size.bytesbörja från 1 MB och öka med steg om 10 MB eller 100 MB. - När du använder Kafka Sink aggregeras data två gånger. På anslutningssidan aggregeras data enligt inställningarna för tömning och på tjänstsidan enligt batchningpolicyn. Om batchtiden är för kort för att data inte kan matas in av både connector och tjänsten, måste batchtiden ökas. Ange batchstorleken till 1 GB och öka eller minska med 100 MB efter behov. Om tömningsstorleken till exempel är 1 MB och batchprincipstorleken är 100 MB, aggregerar Kafka Sink-anslutningsappen data till en batch på 100 MB. Den batchen matas sedan in av tjänsten. Om batchprinciptiden är 20 sekunder och Kafka Sink-anslutningen töms 50 MB under en 20-sekundersperiod matar tjänsten in en 50 MB-batch.
 - Du kan skala genom att lägga till instanser och Kafka-partitioner. Öka 
tasks.maxtill antalet partitioner. Skapa en partition om du har tillräckligt med data för att skapa en blob med inställningensflush.size.bytesstorlek. Om blobben är mindre bearbetas batchen när den når tidsgränsen, så partitionen får inte tillräckligt med dataflöde. Ett stort antal partitioner innebär mer bearbetningskostnader. 
Relaterat innehåll
- Läs mer om arkitektur för stordata.
 - Lär dig hur du matar in JSON-formaterade exempeldata i Azure Data Explorer.
 - Läs mer med Kafka-labb: