Dela via


Spring Cloud Azure-stöd för Spring Cloud Stream

Spring Cloud Stream är ett ramverk för att skapa mycket skalbara händelsedrivna mikrotjänster som är anslutna till delade meddelandesystem.

Ramverket tillhandahåller en flexibel programmeringsmodell som bygger på redan etablerade och välbekanta Spring-idiom och bästa praxis. Dessa metodtips omfattar stöd för beständiga pub/sub-semantik, konsumentgrupper och tillståndskänsliga partitioner.

Aktuella implementeringar av pärm är:

Spring Cloud Stream Binder för Azure Event Hubs

Viktiga begrepp

Spring Cloud Stream Binder för Azure Event Hubs tillhandahåller bindningsimplementeringen för Spring Cloud Stream-ramverket. Den här implementeringen använder Spring Integration Event Hubs-kanalkort i grunden. Från designens perspektiv liknar Event Hubs Kafka. Event Hubs kan också nås via Kafka API. Om ditt projekt har ett nära beroende av Kafka API kan du prova Events Hub med Kafka API Sample

Konsumentgrupp

Event Hubs har liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Medan Kafka lagrar alla incheckade förskjutningar i asynkron meddelandekö måste du lagra förskjutningar av Event Hubs-meddelanden som bearbetas manuellt. Event Hubs SDK tillhandahåller funktionen för att lagra sådana förskjutningar i Azure Storage.

Stöd för partitionering

Event Hubs har ett liknande koncept för fysisk partition som Kafka. Men till skillnad från Kafkas automatiska ombalansering mellan konsumenter och partitioner tillhandahåller Event Hubs ett slags förebyggande läge. Lagringskontot fungerar som ett lån för att avgöra vilken konsument som äger vilken partition. När en ny konsument startar försöker den stjäla vissa partitioner från de mest belastade konsumenterna för att uppnå arbetsbelastningsbalansen.

För att ange belastningsutjämningsstrategin tillhandahålls egenskaper för spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Mer information finns i avsnittet Konsumentegenskaper.

Stöd för Batch-konsument

Spring Cloud Azure Stream Event Hubs binder stöder Spring Cloud Stream Batch Consumer-funktion.

Om du vill arbeta med batch-konsumentläget anger du egenskapen spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode till true. När det är aktiverat tas ett meddelande med en nyttolast av en lista över batchade händelser emot och skickas till funktionen Consumer. Varje meddelanderubrik konverteras också till en lista, där innehållet är det associerade rubrikvärdet som parsas från varje händelse. De gemensamma huvudena för partitions-ID, checkpointer och de senaste egenskaperna visas som ett enda värde eftersom hela batchen med händelser delar samma värde. Mer information finns i meddelanderubrikerna Event Hubs i Spring Cloud Azure-stöd för Spring Integration.

Not

Kontrollpunktshuvudet finns bara när MANUAL kontrollpunktsläge används.

Kontrollpunkter för batchkonsumenten stöder två lägen: BATCH och MANUAL. BATCH läget är ett automatiskt kontrollpunktsläge för att kontrollera hela batchen med händelser tillsammans när bindemedlet tar emot dem. MANUAL läge är att kontrollera händelserna av användare. När den används skickas Checkpointer till meddelandehuvudet och användarna kan använda den för att göra kontrollpunkter.

Du kan ange batchstorleken genom att ange egenskaperna max-size och max-wait-time som har prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Egenskapen max-size är nödvändig och egenskapen max-wait-time är valfri. Mer information finns i avsnittet Konsumentegenskaper.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Du kan också använda Spring Cloud Azure Stream Event Hubs Starter, som du ser i följande exempel för Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguration

Pärmen innehåller följande tre delar av konfigurationsalternativen:

Egenskaper för anslutningskonfiguration

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Event Hubs.

Not

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.

Anslutningskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Egenskap Typ Beskrivning
spring.cloud.azure.eventhubs.enabled boolesk Om en Azure Event Hubs är aktiverad.
spring.cloud.azure.eventhubs.connection-string Sträng Anslutningssträngsvärde för Event Hubs-namnområde.
spring.cloud.azure.eventhubs.namespace Sträng Event Hubs-namnområdesvärde, som är prefixet för det fullständiga domännamnet. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Sträng Domännamn för ett Namnområdesvärde för Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Sträng Anpassad slutpunktsadress.

Dricks

Vanliga konfigurationsalternativ för Azure Service SDK kan konfigureras även för Spring Cloud Azure Stream Event Hubs-pärmen. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.eventhubs..

Pärmen stöder också Spring Could Azure Resource Manager- som standard. Mer information om hur du hämtar anslutningssträngen med säkerhetsobjekt som inte beviljas med Data relaterade roller finns i avsnittet Grundläggande användning i Spring Could Azure Resource Manager.

Konfigurationsegenskaper för kontrollpunkt

Det här avsnittet innehåller konfigurationsalternativen för Storage Blobs-tjänsten, som används för att bevara partitionsägarskap och kontrollpunktsinformation.

Not

Från version 4.0.0, när egenskapen för spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists inte aktiveras manuellt, skapas ingen Lagringscontainer automatiskt med namnet från spring.cloud.stream.bindings.binding-name.destination.

Kontrollpunktskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Egenskap Typ Beskrivning
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolesk Om du vill tillåta att containrar skapas om de inte finns.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Sträng Namn på lagringskontot.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Sträng Åtkomstnyckel för lagringskonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Sträng Namn på lagringscontainer.

Dricks

Vanliga konfigurationsalternativ för Azure Service SDK kan även konfigureras för Lagringsblobkontrollpunktsarkiv. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.eventhubs.processor.checkpoint-store.

Konfigurationsegenskaper för Azure Event Hubs-bindning

Följande alternativ är indelade i fyra avsnitt: Konsumentegenskaper, Avancerade konsumentkonfigurationer, Producentegenskaper och Avancerade producentkonfigurationer.

Konsumentegenskaper

Dessa egenskaper exponeras via EventHubsConsumerProperties.

Not

För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Event Hubs inställningsvärden för alla kanaler, i formatet spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Konfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Egenskap Typ Beskrivning
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode (Kontrollpunkt) Kontrollpunktsläge som används när konsumenten bestämmer hur ett kontrollpunktsmeddelande ska visas
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Heltal Bestämmer mängden meddelande för varje partition för att göra en kontrollpunkt. Börjar gälla endast när PARTITION_COUNT kontrollpunktsläge används.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Varaktighet Bestämmer tidsintervallet för att göra en kontrollpunkt. Börjar gälla endast när TIME kontrollpunktsläge används.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-storlek Heltal Det maximala antalet händelser i en batch. Krävs för batch-konsumentläge.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Varaktighet Den maximala tidsperioden för batchförbrukning. Börjar gälla endast när batch-konsumentläget är aktiverat och är valfritt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Varaktighet Varaktighet för intervalltid för uppdatering.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LastbalanseringStrategi Belastningsutjämningsstrategin.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Varaktighet Tidsåtgången efter vilken ägarskapet för partitionen upphör att gälla.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolesk Om händelseprocessorn ska begära information om den senaste begärda händelsen på den associerade partitionen och spåra den informationen när händelser tas emot.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Heltal Antalet som används av konsumenten för att kontrollera antalet händelser som händelsehubbens konsument aktivt tar emot och köar lokalt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mappa med nyckeln som partitions-ID och värden för StartPositionProperties Kartan som innehåller den händelseposition som ska användas för varje partition om en kontrollpunkt för partitionen inte finns i kontrollpunktsarkivet. Den här kartan är bortkopplad från partitions-ID:t.

Not

Konfigurationen initial-partition-event-position accepterar en map för att ange den första positionen för varje händelsehubb. Därför är dess nyckel partitions-ID och värdet är StartPositionProperties, som innehåller egenskaper för offset, sekvensnummer, köad datumtid och huruvida inkluderande. Du kan till exempel ange den som

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Avancerad konsumentkonfiguration

Ovanstående anslutning, kontrollpunktoch vanliga Azure SDK-klient konfiguration stöder anpassning för varje bindemedelskonsument, som du kan konfigurera med prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Producentegenskaper

Dessa egenskaper exponeras via EventHubsProducerProperties.

Not

För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Event Hubs inställningsvärden för alla kanaler, i formatet spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Producentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Egenskap Typ Beskrivning
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolesk Switch-flaggan för synkronisering av producent. Om det är sant väntar producenten på ett svar efter en sändningsåtgärd.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout lång Hur lång tid det går att vänta på ett svar efter en sändningsåtgärd. Börjar gälla endast när en synkroniseringsproducent är aktiverad.
Avancerad producentkonfiguration

Ovanstående anslutning och vanliga Azure SDK-klient konfiguration stöder anpassning för varje pärmproducent, som du kan konfigurera med prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Grundläggande användning

Skicka och ta emot meddelanden från/till Event Hubs

  1. Fyll i konfigurationsalternativen med information om autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

      Not

      Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

    • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definiera leverantör och konsument.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Stöd för partitionering

En PartitionSupplier med partitionsinformation från användaren skapas för att konfigurera partitionsinformationen om meddelandet som ska skickas. Följande flödesschema visar processen för att hämta olika prioriteringar för partitions-ID och nyckel:

diagram som visar ett flödesschema för partitioneringsstödprocessen.

Stöd för Batch-konsument

  1. Ange konfigurationsalternativen för batchen enligt följande exempel:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definiera leverantör och konsument.

    För kontrollpunktsläge som BATCHkan du använda följande kod för att skicka meddelanden och använda i batchar.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    För kontrollpunktsläge som MANUALkan du använda följande kod för att skicka meddelanden och använda/kontrollpunkt i batchar.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Not

I batchförbrukningsläget är standardinnehållstypen för Spring Cloud Stream-pärm application/json, så se till att meddelandets nyttolast är anpassad till innehållstypen. När du till exempel använder standardinnehållstypen för application/json för att ta emot meddelanden med String nyttolast ska nyttolasten vara JSON String, omgiven av dubbla citattecken för den ursprungliga String texten. För text/plain innehållstyp kan det vara ett String objekt direkt. Mer information finns i Spring Cloud Stream Content Type Negotiation.

Hantera felmeddelanden

  • Hantera utgående bindningsfelmeddelanden

    Som standard skapar Spring Integration en global felkanal med namnet errorChannel. Konfigurera följande meddelandeslutpunkt för att hantera utgående bindningsfelmeddelanden.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Hantera inkommande bindningsfelmeddelanden

    Spring Cloud Stream Event Hubs Binder stöder en lösning för att hantera fel för inkommande meddelandebindningar: felhanterare.

    Felhanterare:

    Spring Cloud Stream exponerar en mekanism som du kan använda för att tillhandahålla en anpassad felhanterare genom att lägga till en Consumer som accepterar ErrorMessage instanser. Mer information finns i Hantera felmeddelanden i Spring Cloud Stream-dokumentationen.

    • Felhanterare för bindningsstandard

      Konfigurera en enda Consumer böna för att använda alla inkommande bindningsfelmeddelanden. Följande standardfunktion prenumererar på varje inkommande bindningsfelkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange egenskapen spring.cloud.stream.default.error-handler-definition till funktionsnamnet.

    • Bindningsspecifik felhanterare

      Konfigurera en Consumer böna för att använda de specifika inkommande bindningsfelmeddelandena. Följande funktion prenumererar på den specifika felkanalen för inkommande bindning och har högre prioritet än felhanteraren för bindningsstandard:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange egenskapen spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition till funktionsnamnet.

Meddelandehuvuden för Event Hubs

De grundläggande meddelandehuvuden som stöds finns i meddelanderubrikerna Event Hubs i Spring Cloud Azure-stöd för Spring Integration.

Stöd för flera pärmar

Anslutning till flera Event Hubs-namnområden stöds också med hjälp av flera pärmar. Det här exemplet tar en anslutningssträng som exempel. Autentiseringsuppgifter för tjänstens huvudnamn och hanterade identiteter stöds också. Du kan ange relaterade egenskaper i varje bindemedels miljöinställningar.

  1. Om du vill använda flera pärmar med Event Hubs konfigurerar du följande egenskaper i din application.yml-fil:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Not

    Den tidigare programfilen visar hur du konfigurerar en enda standardsökare för programmet till alla bindningar. Om du vill konfigurera polleraren för en specifik bindning kan du använda en konfiguration som spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

    Not

    Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

  2. Vi behöver definiera två leverantörer och två konsumenter:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Resursetablering

Event Hubs binder stöder etablering av händelsehubb och konsumentgrupp. Användarna kan använda följande egenskaper för att aktivera etablering.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

Prover

Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.

Spring Cloud Stream Binder för Azure Service Bus

Viktiga begrepp

Spring Cloud Stream Binder för Azure Service Bus tillhandahåller bindningsimplementeringen för Spring Cloud Stream Framework. Den här implementeringen använder Spring Integration Service Bus-kanalkort i grunden.

Schemalagt meddelande

Den här pärmen stöder sändning av meddelanden till ett ämne för fördröjd bearbetning. Användare kan skicka schemalagda meddelanden med rubrik x-delay uttrycker i millisekunder en fördröjningstid för meddelandet. Meddelandet levereras till respektive avsnitt efter x-delay millisekunder.

Konsumentgrupp

Service Bus-ämnet ger liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Den här pärmen förlitar sig på Subscription av ett ämne för att fungera som en konsumentgrupp.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Du kan också använda Spring Cloud Azure Stream Service Bus Starter, som du ser i följande exempel för Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguration

Pärmen innehåller följande två delar av konfigurationsalternativen:

Egenskaper för anslutningskonfiguration

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Service Bus.

Not

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.

Anslutningskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:

Egenskap Typ Beskrivning
spring.cloud.azure.servicebus.enabled boolesk Om en Azure Service Bus är aktiverad.
spring.cloud.azure.servicebus.connection-string Sträng Anslutningssträngsvärde för Service Bus-namnområde.
spring.cloud.azure.servicebus.custom-endpoint-address Sträng Den anpassade slutpunktsadress som ska användas vid anslutning till Service Bus.
spring.cloud.azure.servicebus.namespace Sträng Service Bus-namnområdesvärde, som är prefixet för FQDN. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Sträng Domännamn för ett Azure Service Bus-namnområdesvärde.

Not

Vanliga konfigurationsalternativ för Azure Service SDK kan konfigureras även för Spring Cloud Azure Stream Service Bus-pärmen. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.servicebus..

Pärmen stöder också Spring Could Azure Resource Manager- som standard. Mer information om hur du hämtar anslutningssträngen med säkerhetsobjekt som inte beviljas med Data relaterade roller finns i avsnittet Grundläggande användning i Spring Could Azure Resource Manager.

Konfigurationsegenskaper för Azure Service Bus-bindning

Följande alternativ är indelade i fyra avsnitt: Konsumentegenskaper, Avancerade konsumentkonfigurationer, Producentegenskaper och Avancerade producentkonfigurationer.

Konsumentegenskaper

Dessa egenskaper exponeras via ServiceBusConsumerProperties.

Not

För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Service Bus inställningsvärden för alla kanaler, i formatet spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Konsumentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:

Egenskap Typ Standard Beskrivning
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolesk falsk Om de misslyckade meddelandena dirigeras till DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Heltal 1 Maximalt antal samtidiga meddelanden som Service Bus-processorklienten ska bearbeta. När sessionen är aktiverad gäller den för varje session.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Heltal noll Maximalt antal samtidiga sessioner som ska bearbetas vid en viss tidpunkt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolesk noll Om sessionen är aktiverad.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-idle-timeout Varaktighet noll Anger den maximala tiden (varaktighet) för att vänta tills ett meddelande tas emot för den aktiva sessionen.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Heltal 0 Antalet prefetch för Service Bus-processorklienten.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Underkö ingen Typen av underkö som ska anslutas till.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Varaktighet 5 m Hur lång tid det går att fortsätta förnya låset automatiskt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Mottagningsläget för Service Bus-processorklienten.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolesk sann Om meddelanden ska regleras automatiskt. Om det anges som falskt läggs ett meddelandehuvud för Checkpointer till för att göra det möjligt för utvecklare att reglera meddelanden manuellt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-storlek-i-megabyte Lång 1024 Den maximala storleken på kön/ämnet i megabyte, vilket är storleken på det minne som allokerats för kön/ämnet.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Varaktighet P10675199DT2H48M5.4775807S. (10675199 dagar, 2 timmar, 48 minuter, 5 sekunder och 477 millisekunder) Varaktigheten efter vilken meddelandet upphör att gälla, från och med när meddelandet skickas till Service Bus.

Viktig

När du använder Azure Resource Manager (ARM) måste du konfigurera egenskapen spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Mer information finns i exemplet servicebus-queue-binder-arm på GitHub.

Avancerad konsumentkonfiguration

Ovanstående anslutning och vanliga Azure SDK-klient konfiguration stöder anpassning för varje bindemedelskonsument, som du kan konfigurera med prefixet spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Producentegenskaper

Dessa egenskaper exponeras via ServiceBusProducerProperties.

Not

För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Service Bus inställningsvärden för alla kanaler, i formatet spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Producentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:

Egenskap Typ Standard Beskrivning
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolesk falsk Växla flagga för synkronisering av producent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout lång 10 000 Timeout-värde för sändning av producent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType noll Service Bus-entitetstypen för producenten, som krävs för den bindande producenten.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes Lång 1024 Den maximala storleken på kön/ämnet i megabyte, vilket är storleken på det minne som allokerats för kön/ämnet.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Varaktighet P10675199DT2H48M5.4775807S. (10675199 dagar, 2 timmar, 48 minuter, 5 sekunder och 477 millisekunder) Varaktigheten efter vilken meddelandet upphör att gälla, från och med när meddelandet skickas till Service Bus.

Viktig

När du använder bindningsproducenten måste egenskapen för spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type konfigureras.

Avancerad producentkonfiguration

Ovanstående anslutning och vanliga Azure SDK-klient konfiguration stöder anpassning för varje pärmproducent, som du kan konfigurera med prefixet spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Grundläggande användning

Skicka och ta emot meddelanden från/till Service Bus

  1. Fyll i konfigurationsalternativen med information om autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

      Not

      Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

    • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definiera leverantör och konsument.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Stöd för partitionsnyckel

Pärmen stöder Service Bus-partitionering genom att tillåta inställning av partitionsnyckel och sessions-ID i meddelandehuvudet. I det här avsnittet beskrivs hur du anger partitionsnyckel för meddelanden.

Spring Cloud Stream tillhandahåller en spEL-uttrycksegenskap för partitionsnyckeln spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Du kan till exempel ange den här egenskapen som "'partitionKey-' + headers[<message-header-key>]" och lägga till ett huvud med namnet message-header-key. Spring Cloud Stream använder värdet för det här huvudet när uttrycket utvärderas för att tilldela en partitionsnyckel. Följande kod ger en exempelproducent:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Sessionsstöd

Pärmen stöder meddelandesessioner i Service Bus. Sessions-ID för ett meddelande kan anges via meddelanderubriken.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Not

Enligt Service Bus-partitioneringhar sessions-ID högre prioritet än partitionsnyckel. Så när både ServiceBusMessageHeaders#SESSION_ID och ServiceBusMessageHeaders#PARTITION_KEY huvuden anges används värdet för sessions-ID:t så småningom för att skriva över värdet för partitionsnyckeln.

Hantera felmeddelanden

  • Hantera utgående bindningsfelmeddelanden

    Som standard skapar Spring Integration en global felkanal med namnet errorChannel. Konfigurera följande meddelandeslutpunkt för att hantera utgående bindningsfel.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Hantera inkommande bindningsfelmeddelanden

    Spring Cloud Stream Service Bus Binder stöder två lösningar för att hantera fel för inkommande meddelandebindningar: binder-felhanteraren och hanterare.

    Binder-felhanterare:

    Standardhanteraren för bindemedelsfel hanterar den inkommande bindningen. Du använder den här hanteraren för att skicka misslyckade meddelanden till kön med obeställbara meddelanden när spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected är aktiverat. Annars avbryts de misslyckade meddelandena. Hanteraren för bindemedelsfel är ömsesidigt uteslutande med andra angivna felhanterare.

    Felhanterare:

    Spring Cloud Stream exponerar en mekanism som du kan använda för att tillhandahålla en anpassad felhanterare genom att lägga till en Consumer som accepterar ErrorMessage instanser. Mer information finns i Hantera felmeddelanden i Spring Cloud Stream-dokumentationen.

    • Felhanterare för bindningsstandard

      Konfigurera en enda Consumer böna för att använda alla inkommande bindningsfelmeddelanden. Följande standardfunktion prenumererar på varje inkommande bindningsfelkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange egenskapen spring.cloud.stream.default.error-handler-definition till funktionsnamnet.

    • Bindningsspecifik felhanterare

      Konfigurera en Consumer böna för att använda de specifika inkommande bindningsfelmeddelandena. Följande funktion prenumererar på den specifika inkommande bindningsfelkanalen med högre prioritet än felhanteraren för bindningsstandard.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange egenskapen spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition till funktionsnamnet.

Service Bus-meddelandehuvuden

De grundläggande meddelandehuvuden som stöds finns i avsnittet Service Bus-meddelandehuvuden i Spring Cloud Azure-stöd för Spring Integration.

Not

När du ställer in partitionsnyckeln är prioriteten för meddelandehuvudet högre än spring cloud stream-egenskapen. Så spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression börjar gälla endast när ingen av ServiceBusMessageHeaders#SESSION_ID- och ServiceBusMessageHeaders#PARTITION_KEY-rubrikerna har konfigurerats.

Stöd för flera pärmar

Anslutning till flera Service Bus-namnområden stöds också med hjälp av flera pärmar. Det här exemplet tar anslutningssträngen som exempel. Autentiseringsuppgifter för tjänstens huvudnamn och hanterade identiteter stöds också, användare kan ange relaterade egenskaper i varje pärms miljöinställningar.

  1. Om du vill använda flera bindemedel för ServiceBus konfigurerar du följande egenskaper i din application.yml-fil:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Not

    Den tidigare programfilen visar hur du konfigurerar en enda standardsökare för programmet till alla bindningar. Om du vill konfigurera polleraren för en specifik bindning kan du använda en konfiguration som spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

    Not

    Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

  2. vi behöver definiera två leverantörer och två konsumenter

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Resursetablering

Service Bus Binder stöder etablering av kö, ämne och prenumeration. Användarna kan använda följande egenskaper för att aktivera etablering.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

Anpassa Service Bus-klientegenskaper

Utvecklare kan använda AzureServiceClientBuilderCustomizer för att anpassa Service Bus-klientegenskaper. I följande exempel anpassas egenskapen sessionIdleTimeout i ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Prover

Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.