Delen via


Quickstart: Event Hubs-gegevens vastleggen in Azure Storage en lezen met behulp van Python (azure-eventhub)

U kunt een Event Hub zo configureren dat de gegevens die naar een Event Hub worden verzonden, worden vastgelegd in een Azure-opslagaccount of Azure Data Lake Storage Gen 1 of Gen 2. In dit artikel leest u hoe u Python-code schrijft om gebeurtenissen naar een Event Hub te verzenden en de vastgelegde gegevens uit Azure Blob Storage te lezen. Zie het overzicht van de Event Hubs Capture-functie voor meer informatie over deze functie.

In deze quickstart wordt de Azure Python SDK gebruikt om de capture-functie te demonstreren. De sender.py-app verzendt gesimuleerde omgevingstelemetrie naar Event Hubs in JSON-indeling. De Event Hub is geconfigureerd voor het gebruik van de functie Capture om deze gegevens in batches naar Blob Storage te schrijven. De capturereader.py-app leest deze blobs en maakt een toevoegbestand voor elk apparaat. De app schrijft de gegevens vervolgens naar CSV-bestanden.

In deze snelstart, gaat u het volgende doen:

  • Maak een Azure Blob Storage-account en -container in Azure Portal.
  • Maak een Event Hubs-naamruimte met behulp van Azure Portal.
  • Maak een Event Hub met de functie Capture ingeschakeld en verbind deze met uw opslagaccount.
  • Gegevens verzenden naar uw Event Hub met behulp van een Python-script.
  • Bestanden lezen en verwerken vanuit Event Hubs Capture met behulp van een ander Python-script.

Vereiste voorwaarden

Capture-functie inschakelen voor de Event Hub

Schakel de functie Capture in voor de Event Hub. Volg hiervoor de instructies in Event Hubs Capture inschakelen met behulp van Azure Portal. Selecteer het opslagaccount en de blobcontainer die u in de vorige stap hebt gemaakt. Selecteer Avro voor de serialisatie-indeling van uitvoerevenementen.

Een Python-script maken om gebeurtenissen naar uw Event Hub te verzenden

In deze sectie maakt u een Python-script waarmee 200 gebeurtenissen (10 apparaten * 20 gebeurtenissen) worden verzonden naar een Event Hub. Deze gebeurtenissen zijn een voorbeeld van omgevingslezing die wordt verzonden in JSON-indeling.

  1. Open uw favoriete Python-editor, zoals Visual Studio Code.

  2. Maak een script met de naam sender.py.

  3. Plak de volgende code in sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Vervang de volgende waarden in de scripts:

    • Vervang EVENT HUBS NAMESPACE CONNECTION STRING door de verbindingsreeks voor uw Event Hubs-naamruimte.
    • Vervang door EVENT HUB NAME de naam van uw Event Hub.
  5. Voer het script uit om gebeurtenissen naar de Event Hub te verzenden.

  6. In Azure Portal kunt u controleren of de Event Hub de berichten heeft ontvangen. Schakel over naar de weergave Berichten in de sectie Metrische gegevens . Vernieuw de pagina om de grafiek bij te werken. Het kan enkele seconden duren voordat de pagina weergeeft dat de berichten zijn ontvangen.

    Controleer of de Event Hub de berichten heeft ontvangen

Een Python-script maken om uw Capture-bestanden te lezen

In dit voorbeeld worden de vastgelegde gegevens opgeslagen in Azure Blob Storage. Het script in deze sectie leest de vastgelegde gegevensbestanden uit uw Azure-opslagaccount en genereert CSV-bestanden die u gemakkelijk kunt openen en weergeven. U ziet 10 bestanden in de huidige werkmap van de toepassing. Deze bestanden bevatten de omgevingsmetingen voor de 10 apparaten.

  1. Maak in de Python-editor een script met de naam capturereader.py. Dit script leest de vastgelegde bestanden en maakt een bestand voor elk apparaat om de gegevens alleen voor dat apparaat te schrijven.

  2. Plak de volgende code in capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Vervang AZURE STORAGE CONNECTION STRING door de verbindingsreeks voor uw Azure-opslagaccount. De naam van de container die u in deze quickstart hebt gemaakt, is capture. Als u een andere naam voor de container hebt gebruikt, vervangt u capture door de naam van de container in het opslagaccount.

De scripts uitvoeren

  1. Open een opdrachtprompt met Python in het pad en voer vervolgens deze opdrachten uit om vereiste Python-pakketten te installeren:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Wijzig uw map in de map waarin u sender.py en capturereader.py hebt opgeslagen en voer deze opdracht uit:

    python sender.py
    

    Met deze opdracht wordt een nieuw Python-proces gestart om de afzender uit te voeren.

  3. Wacht enkele minuten totdat de opname is uitgevoerd en voer vervolgens de volgende opdracht in het oorspronkelijke opdrachtvenster in:

    python capturereader.py
    

    Deze captureprocessor gebruikt de lokale map om alle blobs te downloaden uit het opslagaccount en de container. Het verwerkt bestanden die niet leeg zijn en schrijft de resultaten als CSV-bestanden naar de lokale map.

Volgende stappen

Bekijk Python-voorbeelden op GitHub.