Delen via


Quickstart: Gebeurtenissen verzenden naar of ontvangen van event hubs met behulp van Python

In deze quickstart leert u hoe u gebeurtenissen verzendt naar en ontvangt van een Event Hub met behulp van het Python-pakket azure-eventhub .

Vereisten

Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.

Zorg ervoor dat u aan de volgende vereisten voldoet om deze quickstart te voltooien:

  • Microsoft Azure-abonnement: meld u aan voor een gratis proefversie als u er nog geen hebt.
  • Python 3.8 of hoger: Controleren of pip is geïnstalleerd en bijgewerkt.
  • Visual Studio Code (aanbevolen): of gebruik een andere IDE van uw keuze.
  • Event Hubs-naamruimte en Event Hub: volg deze handleiding om deze te maken in Azure Portal.

De pakketten installeren om gebeurtenissen te verzenden

Als u de Python-pakketten voor Event Hubs wilt installeren, opent u een opdrachtprompt met Python in het pad. Verander van map naar de map waarin u uw voorbeelden wilt bewaren.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

De app verifiëren bij Azure

In deze quickstart ziet u twee manieren om verbinding te maken met Azure Event Hubs:

  • Wachtwoordloos. Gebruik uw beveiligingsprincipaal in Microsoft Entra ID en op rollen gebaseerd toegangsbeheer (RBAC) om verbinding te maken met een Event Hubs-naamruimte. U hoeft zich geen zorgen te maken over het gebruik van vastgelegde verbindingsreeksen in uw code, in een configuratiebestand of in beveiligde opslag, zoals Azure Key Vault.
  • Verbindingsreeks. Gebruik een verbindingsreeks om verbinding te maken met een Event Hubs-naamruimte. Als u nog niet eerder met Azure werkt, kunt u de verbindingsreeks optie gemakkelijker volgen.

We raden u aan de optie zonder wachtwoord te gebruiken in echte toepassingen en productieomgevingen. Zie Service Bus-verificatie en -autorisatie enverbindingen zonder wachtwoord voor Azure-services voor meer informatie.

Rollen toewijzen aan uw Microsoft Entra-gebruiker

Wanneer u lokaal ontwikkelt, moet u ervoor zorgen dat het gebruikersaccount dat verbinding maakt met Azure Event Hubs over de juiste machtigingen beschikt. U hebt de rol Azure Event Hubs-gegevenseigenaar nodig om berichten te verzenden en te ontvangen. Als u uzelf deze rol wilt toewijzen, hebt u de rol Gebruikerstoegangsbeheerder of een andere rol nodig die de Microsoft.Authorization/roleAssignments/write actie bevat. U kunt Azure RBAC-rollen toewijzen aan een gebruiker met behulp van Azure Portal, Azure CLI of Azure PowerShell. Zie de pagina Scope voor Azure RBAC begrijpen voor meer informatie.

In het volgende voorbeeld wordt de Azure Event Hubs Data Owner rol toegewezen aan uw gebruikersaccount, dat volledige toegang biedt tot Azure Event Hubs-resources. Volg in een echt scenario het principe van minimale bevoegdheden om gebruikers alleen de minimale machtigingen te geven die nodig zijn voor een veiligere productieomgeving.

Ingebouwde Azure-rollen voor Azure Event Hubs

Voor Azure Event Hubs is het beheer van naamruimten en alle gerelateerde resources via Azure Portal en de Azure Resource Management-API al beveiligd met behulp van het Azure RBAC-model. Azure biedt de volgende ingebouwde rollen voor het autoriseren van toegang tot een Event Hubs-naamruimte:

Als u een aangepaste rol wilt maken, raadpleegt u Rechten die vereist zijn voor Event Hubs-bewerkingen.

Belangrijk

In de meeste gevallen duurt het een paar minuten voordat de roltoewijzing in Azure is doorgegeven. In zeldzame gevallen kan het maximaal acht minuten duren. Als u verificatiefouten ontvangt wanneer u de code voor het eerst uitvoert, wacht u even en probeert u het opnieuw.

  1. Zoek in Azure Portal uw Event Hubs-naamruimte met behulp van de hoofdzoekbalk of linkernavigatiebalk.

  2. Selecteer op de overzichtspagina toegangsbeheer (IAM) in het linkermenu.

  3. Selecteer op de pagina Toegangsbeheer (IAM) het tabblad Roltoewijzingen .

  4. Selecteer + Toevoegen in het bovenste menu. Selecteer vervolgens Roltoewijzing toevoegen.

    Schermopname die laat zien hoe u een rol toewijst.

  5. Gebruik het zoekvak om de resultaten te filteren op de gewenste rol. In dit voorbeeld, zoek naar Azure Event Hubs Data Owner en selecteer het overeenkomende resultaat. Kies vervolgens Volgende.

  6. Selecteer onder Toegang toewijzen de optie Gebruiker, groep of service-principal. Kies vervolgens + Leden selecteren.

  7. Zoek in het dialoogvenster naar de gebruikersnaam van Microsoft Entra (meestal uw user@domain e-mailadres). Kies Selecteren onder aan het dialoogvenster.

  8. Selecteer Controleren en toewijzen om naar de laatste pagina te gaan. Selecteer Beoordelen en opnieuw toewijzen om het proces te voltooien.

Gebeurtenissen verzenden

In deze sectie maakt u een Python-script om gebeurtenissen te verzenden naar de Event Hub die u eerder hebt gemaakt.

  1. Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.

  2. Maak een script met de naam send.py. Met dit script wordt een batch van gebeurtenissen verzonden naar de Event Hub die u eerder hebt gemaakt.

  3. Plak de volgende code in send.py:

    Gebruik in de code echte waarden om de volgende tijdelijke aanduidingen te vervangen:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - U ziet de volledig gekwalificeerde naam op de overzichtspagina van de naamruimte. Deze moet de volgende indeling hebben: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Naam van de Event Hub.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Notitie

    Zie de pagina GitHub send_async.py voor voorbeelden van andere opties voor het asynchroon verzenden van gebeurtenissen naar een Event Hub met behulp van een verbindingsreeks. De weergegeven patronen zijn ook van toepassing op het verzenden van gebeurtenissen zonder wachtwoord.

Gebeurtenissen ontvangen

In deze quickstart wordt Azure Blob Storage gebruikt als controleopslag. De opslag voor controlepunten wordt gebruikt om controlepunten vast te leggen (dat wil zeggen, de laatste leesposities).

Volg deze aanbevelingen wanneer u Azure Blob Storage als controlepuntarchief gebruikt:

  • Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
  • Gebruik het opslagaccount niet voor iets anders.
  • Gebruik de container niet voor iets anders.
  • Maak het opslagaccount in dezelfde regio als de geïmplementeerde toepassing. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.

Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.

  • Hiërarchische naamruimte
  • Blob zacht verwijderen
  • Versiebeheer

Een Azure-opslagaccount en een blobcontainer maken

Maak een Azure-opslagaccount met daarin een blobcontainer door de volgende stappen te volgen:

  1. Een Azure Storage-account maken
  2. Maak een blobcontainer.
  3. Authenticeren bij de blobcontainer.

Zorg ervoor dat u de verbindingsreeks en de containernaam vastlegt voor later gebruik in de receive-code.

Wanneer u lokaal ontwikkelt, moet u ervoor zorgen dat het gebruikersaccount dat toegang heeft tot blobgegevens over de juiste machtigingen beschikt. U hebt Inzender voor Opslagblobgegevens nodig om blobgegevens te lezen en schrijven. Als u uzelf deze rol wilt toewijzen, moet u de rol Beheerder voor gebruikerstoegang of een andere rol met de actie Microsoft.Authorization/roleAssignments/write krijgen toegewezen. U kunt Azure RBAC-rollen toewijzen aan een gebruiker met behulp van Azure Portal, Azure CLI of Azure PowerShell. Zie Inzicht in het bereik voor Azure RBAC voor meer informatie.

In dit scenario wijst u machtigingen toe aan uw gebruikersaccount, dat is afgestemd op het opslagaccount, om het principe van minimale bevoegdheden te volgen. Deze procedure biedt gebruikers alleen de minimale machtigingen die nodig zijn en maakt veiligere productieomgevingen.

In het volgende voorbeeld wordt de rol Inzender voor opslagblobgegevens toegewezen aan uw gebruikersaccount, dat zowel lees- als schrijftoegang biedt tot blobgegevens in uw opslagaccount.

Belangrijk

In de meeste gevallen duurt het een paar minuten voordat de roltoewijzing in Azure is doorgegeven. In zeldzame gevallen kan het maximaal acht minuten duren. Als u verificatiefouten ontvangt wanneer u de code voor het eerst uitvoert, wacht u even en probeert u het opnieuw.

  1. Zoek uw opslagaccount in Azure Portal met behulp van de hoofdzoekbalk of linkernavigatiebalk.

  2. Selecteer op de pagina opslagaccount toegangsbeheer (IAM) in het menu aan de linkerkant.

  3. Selecteer op de pagina Toegangsbeheer (IAM) het tabblad Roltoewijzingen .

  4. Selecteer + Toevoegen in het bovenste menu. Selecteer vervolgens Roltoewijzing toevoegen.

    Schermopname die laat zien hoe u een opslagaccountrol toewijst.

  5. Gebruik het zoekvak om de resultaten te filteren op de gewenste rol. Zoek voor dit voorbeeld naar Inzender voor Opslagblobgegevens. Selecteer het overeenkomende resultaat en kies vervolgens Volgende.

  6. Selecteer onder Toegang toewijzen de optie Gebruiker, groep of service-principal en kies vervolgens + Leden selecteren.

  7. Zoek in het dialoogvenster naar uw Microsoft Entra-gebruikersnaam (meestal uw user@domain e-mailadres) en kies Vervolgens onderaan het dialoogvenster Selecteren .

  8. Selecteer Controleren en toewijzen om naar de laatste pagina te gaan. Selecteer Beoordelen en opnieuw toewijzen om het proces te voltooien.

De pakketten installeren om gebeurtenissen te ontvangen

Voor de ontvangende zijde moet u een of meer pakketten installeren. In deze quickstart gebruikt u Azure Blob Storage om controlepunten vast te leggen, zodat het programma de gebeurtenissen die het programma al heeft gelezen niet leest. Het voert regelmatig metadata-checkpoints uit op ontvangen berichten in een blob. Met deze aanpak kunt u later eenvoudig berichten blijven ontvangen vanaf de plek waar u was gebleven.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Een Python-script maken om gebeurtenissen te ontvangen

In deze sectie maakt u een Python-script om gebeurtenissen van uw Event Hub te ontvangen:

  1. Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.

  2. Maak een script met de naam recv.py.

  3. Plak de volgende code in recv.py:

    Gebruik in de code echte waarden om de volgende tijdelijke aanduidingen te vervangen:

    • BLOB_STORAGE_ACCOUNT_URL - Deze waarde moet de volgende notatie hebben: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Naam van de blobcontainer in het Azure-opslagaccount.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - U ziet de volledig gekwalificeerde naam op de overzichtspagina van de naamruimte. Deze moet de volgende indeling hebben: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Naam van de Event Hub.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Notitie

    Zie de GitHub-pagina recv_with_checkpoint_store_async.py voor voorbeelden van andere opties voor het ontvangen van gebeurtenissen van een Event Hub asynchroon met behulp van een verbindingsreeks. De weergegeven patronen zijn ook van toepassing op het ontvangen van gebeurtenissen zonder wachtwoord.

De ontvangende app uitvoeren

  1. Start een opdrachtprompt.

  2. Voer de volgende opdracht uit en meld u aan met het account dat is toegevoegd aan de rol Azure Event Hubs-gegevenseigenaar in de Event Hubs-naamruimte en de rol Inzender voor opslagblobgegevens in het Azure-opslagaccount .

    az login
    
  3. Ga naar de map met het receive.py-bestand en voer de volgende opdracht uit:

    python recv.py
    

De verzendende app uitvoeren

  1. Start een opdrachtprompt.

  2. Voer de volgende opdracht uit en meld u aan met het account dat is toegevoegd aan de rol Azure Event Hubs-gegevenseigenaar in de Event Hubs-naamruimte en de rol Inzender voor opslagblobgegevens in het Azure-opslagaccount .

    az login
    
  3. Ga naar de map met de send.py en voer deze opdracht uit:

    python send.py
    

In het ontvangstvenster worden de berichten weergegeven die naar de Event Hub zijn verzonden.

Probleemoplossing

Als er geen gebeurtenissen worden weergegeven in het ontvangervenster of als de code een fout rapporteert, kunt u de volgende tips voor probleemoplossing proberen:

  • Als u geen resultaten van recy.py ziet, voert u send.py meerdere keren uit.

  • Als er fouten over 'coroutine' worden weergegeven bij het gebruik van de wachtwoordloze code (met referenties), zorg ervoor dat u importeert vanuit azure.identity.aio.

  • Als u 'Niet-afgesloten clientsessie' ziet met code zonder wachtwoord maar met inloggegevens, moet u de inloggegevens sluiten nadat u klaar bent. Zie Async-referenties voor meer informatie.

  • Als u autorisatiefouten met recv.py ziet bij het openen van opslag, controleert u of u de stappen in Een Azure-opslagaccount en een blobcontainer hebt gevolgd en de rol Inzender voor opslagblobgegevens hebt toegewezen aan de service-principal.

  • Als u gebeurtenissen met verschillende partitie-id's ontvangt, wordt dit resultaat verwacht. Partities zijn een mechanisme voor gegevensordening. Ze hebben betrekking op de mate van downstreamparallelheid die is vereist bij het gebruik van toepassingen. Het aantal partities in een Event Hub houdt rechtstreeks verband met het aantal verwachte gelijktijdige lezers. Zie Meer informatie over partities voor meer informatie.

Volgende stappen

In deze quickstart hebt u asynchroon gebeurtenissen verzonden en ontvangen. Ga naar de pagina GitHub sync_samples om te leren hoe u gebeurtenissen synchroon verzendt en ontvangt.

Bekijk meer voorbeelden en geavanceerde scenario's in de Azure Event Hubs-clientbibliotheek voor Python-voorbeelden.