Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
U kunt een Event Hub zo configureren dat de gegevens die naar een Event Hub worden verzonden, worden vastgelegd in een Azure-opslagaccount of Azure Data Lake Storage Gen 1 of Gen 2. In dit artikel leest u hoe u Python-code schrijft om gebeurtenissen naar een Event Hub te verzenden en de vastgelegde gegevens uit Azure Blob Storage te lezen. Zie het overzicht van de Event Hubs Capture-functie voor meer informatie over deze functie.
In deze quickstart wordt de Azure Python SDK gebruikt om de capture-functie te demonstreren. De sender.py-app verzendt gesimuleerde omgevingstelemetrie naar Event Hubs in JSON-indeling. De Event Hub is geconfigureerd voor het gebruik van de functie Capture om deze gegevens in batches naar Blob Storage te schrijven. De capturereader.py-app leest deze blobs en maakt een toevoegbestand voor elk apparaat. De app schrijft de gegevens vervolgens naar CSV-bestanden.
In deze snelstart, gaat u het volgende doen:
- Maak een Azure Blob Storage-account en -container in Azure Portal.
- Maak een Event Hubs-naamruimte met behulp van Azure Portal.
- Maak een Event Hub met de functie Capture ingeschakeld en verbind deze met uw opslagaccount.
- Gegevens verzenden naar uw Event Hub met behulp van een Python-script.
- Bestanden lezen en verwerken vanuit Event Hubs Capture met behulp van een ander Python-script.
Vereiste voorwaarden
Python 3.8 of hoger, waarbij PIP is geïnstalleerd en bijgewerkt.
Een Azure-abonnement. Als u nog geen abonnement hebt, maakt u een gratis account voordat u begint.
Een actieve Event Hubs-naamruimte en de Event Hub. Maak een Event Hubs-naamruimte en een Event Hub binnen de naamruimte. Noteer de naam van de Event Hubs-naamruimte, de naam van de Event Hub en de primaire toegangssleutel voor de naamruimte. Zie Een Event Hubs-verbindingsreeks verkrijgen om de toegangssleutel te verkrijgen. De standaardsleutelnaam is RootManageSharedAccessKey. Voor deze quickstart hebt u alleen de primaire sleutel nodig. U hebt de verbindingsreeks niet nodig.
Een Azure-opslagaccount, een blobcontainer in het opslagaccount en een verbindingsreeks voor het opslagaccount. Als u deze items niet hebt, voert u de volgende stappen uit:
- Een Azure-opslagaccount maken
- Een blobcontainer maken in het opslagaccount
- De verbindingsreeks ophalen voor het opslagaccount
Zorg ervoor dat u de verbindingsreeks en containernaam opneemt voor later gebruik in deze quickstart.
Capture-functie inschakelen voor de Event Hub
Schakel de functie Capture in voor de Event Hub. Volg hiervoor de instructies in Event Hubs Capture inschakelen met behulp van Azure Portal. Selecteer het opslagaccount en de blobcontainer die u in de vorige stap hebt gemaakt. Selecteer Avro voor de serialisatie-indeling van uitvoerevenementen.
Een Python-script maken om gebeurtenissen naar uw Event Hub te verzenden
In deze sectie maakt u een Python-script waarmee 200 gebeurtenissen (10 apparaten * 20 gebeurtenissen) worden verzonden naar een Event Hub. Deze gebeurtenissen zijn een voorbeeld van omgevingslezing die wordt verzonden in JSON-indeling.
Open uw favoriete Python-editor, zoals Visual Studio Code.
Maak een script met de naam sender.py.
Plak de volgende code in sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()Vervang de volgende waarden in de scripts:
- Vervang
EVENT HUBS NAMESPACE CONNECTION STRINGdoor de verbindingsreeks voor uw Event Hubs-naamruimte. - Vervang door
EVENT HUB NAMEde naam van uw Event Hub.
- Vervang
Voer het script uit om gebeurtenissen naar de Event Hub te verzenden.
In Azure Portal kunt u controleren of de Event Hub de berichten heeft ontvangen. Schakel over naar de weergave Berichten in de sectie Metrische gegevens . Vernieuw de pagina om de grafiek bij te werken. Het kan enkele seconden duren voordat de pagina weergeeft dat de berichten zijn ontvangen.
Een Python-script maken om uw Capture-bestanden te lezen
In dit voorbeeld worden de vastgelegde gegevens opgeslagen in Azure Blob Storage. Het script in deze sectie leest de vastgelegde gegevensbestanden uit uw Azure-opslagaccount en genereert CSV-bestanden die u gemakkelijk kunt openen en weergeven. U ziet 10 bestanden in de huidige werkmap van de toepassing. Deze bestanden bevatten de omgevingsmetingen voor de 10 apparaten.
Maak in de Python-editor een script met de naam capturereader.py. Dit script leest de vastgelegde bestanden en maakt een bestand voor elk apparaat om de gegevens alleen voor dat apparaat te schrijven.
Plak de volgende code in capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()Vervang
AZURE STORAGE CONNECTION STRINGdoor de verbindingsreeks voor uw Azure-opslagaccount. De naam van de container die u in deze quickstart hebt gemaakt, is capture. Als u een andere naam voor de container hebt gebruikt, vervangt u capture door de naam van de container in het opslagaccount.
De scripts uitvoeren
Open een opdrachtprompt met Python in het pad en voer vervolgens deze opdrachten uit om vereiste Python-pakketten te installeren:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3Wijzig uw map in de map waarin u sender.py en capturereader.py hebt opgeslagen en voer deze opdracht uit:
python sender.pyMet deze opdracht wordt een nieuw Python-proces gestart om de afzender uit te voeren.
Wacht enkele minuten totdat de opname is uitgevoerd en voer vervolgens de volgende opdracht in het oorspronkelijke opdrachtvenster in:
python capturereader.pyDeze captureprocessor gebruikt de lokale map om alle blobs te downloaden uit het opslagaccount en de container. Het verwerkt bestanden die niet leeg zijn en schrijft de resultaten als CSV-bestanden naar de lokale map.
Volgende stappen
Bekijk Python-voorbeelden op GitHub.