Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Azure Event Hubs är en strömningstjänst för stordata och händelseinmatningstjänst som kan ta emot och bearbeta flera miljoner händelser per sekund. Event Hubs kan bearbeta och lagra händelser, data eller telemetri som producerats av distribuerade program och enheter. Data som skickas till en händelsehubb kan omvandlas och lagras med valfri provider för realtidsanalys eller batchbearbetnings-/lagringsadapter. En detaljerad översikt över Event Hubs finns i Översikt över Event Hubs och Event Hubs-funktioner.
Den här snabbstarten beskriver hur du skriver Go-program för att skicka händelser till eller ta emot händelser från en händelsehubb.
Anmärkning
Den här snabbstarten baseras på exempel på GitHub på https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Avsnittet Skicka händelser baseras på exemplet example_producing_events_test.go och den mottagna baseras på exemplet example_processor_test.go . Koden förenklas för snabbstarten och alla detaljerade kommentarer tas bort, så titta på exemplen för mer information och förklaringar.
Förutsättningar
För att slutföra den här snabbstarten, behöver du följande förhandskrav:
- Gå installerad lokalt. Följ dessa instruktioner om det behövs.
- Ett aktivt Azure-konto. Om du inte har en Azure-prenumeration, skapa ett gratis konto innan du börjar.
- Skapa ett Event Hubs-namnområde och en händelsehubb. Använd Azure-portalen för att skapa ett namnområde av typen Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln.
Skicka händelser
Det här avsnittet visar hur du skapar ett Go-program för att skicka händelser till en händelsehubb.
Installera Go-paketet
Hämta Go-paketet för Event Hubs enligt följande exempel.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Kod för att skicka händelser till en händelsehubb
Här är koden för att skicka händelser till en händelsehubb. De viktigaste stegen i koden är:
- Skapa en Event Hubs-producentklient med hjälp av en anslutningssträng till Event Hubs-namnområdet och händelsehubbens namn.
- Skapa ett batchobjekt och lägg till exempelhändelser i batchen.
- Skicka batchen med händelser till de önskade händelserna.
Viktigt!
Ersätt NAMESPACE CONNECTION STRING med anslutningssträngen till Event Hubs-namnområdet och EVENT HUB NAME med händelsehubbens namn i exempelkoden.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Kör inte programmet än. Du måste först köra mottagarappen och sedan avsändarappen.
Ta emot händelser
Skapa ett lagringskonto och en container
Tillstånd som leasingavtal på partitioner och kontroller i händelser delas mellan mottagare genom en Azure Storage-container. Du kan skapa ett lagringskonto och en container med Go SDK, men du kan också skapa ett genom att följa anvisningarna i Om Azure-lagringskonton.
Följ dessa rekommendationer när du använder Azure Blob Storage som kontrollpunktslager:
- Använd en separat container för varje konsumentgrupp. Du kan använda samma lagringskonto, men använda en container per grupp.
- Använd inte lagringskontot för något annat.
- Använd inte containern för något annat.
- Skapa lagringskontot i samma region som det distribuerade programmet. Om programmet är installerat lokalt, försök att välja den region som är närmast.
På sidan Lagringskonto i Azure Portal i avsnittet Blob Service kontrollerar du att följande inställningar är inaktiverade.
- Hierarkisk namnrymd
- Mjuk borttagning av blob
- Versionshantering
Go-paket
Om du vill ta emot meddelandena hämtar du Go-paketen för Event Hubs enligt följande exempel.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Kod för att ta emot händelser från en händelsehubb
Här är koden för att ta emot händelser från en händelsehubb. De viktigaste stegen i koden är:
- Kontrollera ett kontrollpunktsarkivobjekt som representerar Azure Blob Storage som används av händelsehubben för kontrollpunkter.
- Skapa en Event Hubs-konsumentklient med hjälp av en anslutningssträng till Event Hubs-namnområdet och händelsehubbens namn.
- Skapa en händelseprocessor med hjälp av klientobjektet och kontrollpunktsarkivobjektet. Processorn tar emot och bearbetar händelser.
- För varje partition i händelsehubben skapar du en partitionsklient med processEvents som funktion för att bearbeta händelser.
- Kör alla partitionsklienter för att ta emot och bearbeta händelser.
Viktigt!
Ersätt följande platshållarvärden med faktiska värden:
-
AZURE STORAGE CONNECTION STRINGmed anslutningssträngen för ditt Azure-lagringskonto -
BLOB CONTAINER NAMEmed namnet på blobcontainern som du skapade i lagringskontot -
NAMESPACE CONNECTION STRINGmed anslutningssträngen för Event Hubs-namnområdet -
EVENT HUB NAMEmed händelsehubbens namn i exempelkoden.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Köra mottagar- och avsändarappar
Kör mottagarappen först.
Kör avsändarappen.
Vänta en minut för att se följande utdata i mottagarfönstret.
Processing 2 event(s) Event received with body hello Event received with body world
Nästa steg
Se exempel på GitHub på https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.