Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Spring Cloud Stream är ett ramverk för att skapa mycket skalbara händelsedrivna mikrotjänster som är anslutna till delade meddelandesystem.
Ramverket tillhandahåller en flexibel programmeringsmodell som bygger på redan etablerade och välbekanta Spring-idiom och bästa praxis. Dessa metodtips omfattar stöd för beständiga pub/sub-semantik, konsumentgrupper och tillståndskänsliga partitioner.
Aktuella implementeringar av pärm är:
- 
              spring-cloud-azure-stream-binder-eventhubs– mer information finns i Spring Cloud Stream Binder för Azure Event Hubs
- 
              spring-cloud-azure-stream-binder-servicebus– mer information finns i Spring Cloud Stream Binder för Azure Service Bus
Spring Cloud Stream Binder för Azure Event Hubs
Viktiga begrepp
Spring Cloud Stream Binder för Azure Event Hubs tillhandahåller bindningsimplementeringen för Spring Cloud Stream-ramverket. Den här implementeringen använder Spring Integration Event Hubs-kanalkort i grunden. Från designens perspektiv liknar Event Hubs Kafka. Event Hubs kan också nås via Kafka API. Om ditt projekt har ett nära beroende av Kafka API kan du prova Events Hub med Kafka API Sample
Konsumentgrupp
Event Hubs har liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Medan Kafka lagrar alla incheckade förskjutningar i asynkron meddelandekö måste du lagra förskjutningar av Event Hubs-meddelanden som bearbetas manuellt. Event Hubs SDK tillhandahåller funktionen för att lagra sådana förskjutningar i Azure Storage.
Stöd för partitionering
Event Hubs har ett liknande koncept för fysisk partition som Kafka. Men till skillnad från Kafkas automatiska ombalansering mellan konsumenter och partitioner tillhandahåller Event Hubs ett slags förebyggande läge. Lagringskontot fungerar som ett lån för att avgöra vilken konsument som äger vilken partition. När en ny konsument startar försöker den stjäla vissa partitioner från de mest belastade konsumenterna för att uppnå arbetsbelastningsbalansen.
För att ange belastningsutjämningsstrategin tillhandahålls egenskaper för spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Mer information finns i avsnittet Konsumentegenskaper.
Stöd för Batch-konsument
Spring Cloud Azure Stream Event Hubs binder stöder Spring Cloud Stream Batch Consumer-funktion.
Om du vill arbeta med batch-konsumentläget anger du egenskapen spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode till true. När det är aktiverat tas ett meddelande med en nyttolast av en lista över batchade händelser emot och skickas till funktionen Consumer. Varje meddelanderubrik konverteras också till en lista, där innehållet är det associerade rubrikvärdet som parsas från varje händelse. De gemensamma huvudena för partitions-ID, checkpointer och de senaste egenskaperna visas som ett enda värde eftersom hela batchen med händelser delar samma värde. Mer information finns i meddelanderubrikerna Event Hubs i Spring Cloud Azure-stöd för Spring Integration.
Not
Kontrollpunktshuvudet finns bara när MANUAL kontrollpunktsläge används.
Kontrollpunkter för batchkonsumenten stöder två lägen: BATCH och MANUAL. 
              BATCH läget är ett automatiskt kontrollpunktsläge för att kontrollera hela batchen med händelser tillsammans när bindemedlet tar emot dem. 
              MANUAL läge är att kontrollera händelserna av användare. När den används skickas Checkpointer till meddelandehuvudet och användarna kan använda den för att göra kontrollpunkter.
Du kan ange batchstorleken genom att ange egenskaperna max-size och max-wait-time som har prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Egenskapen max-size är nödvändig och egenskapen max-wait-time är valfri. Mer information finns i avsnittet Konsumentegenskaper.
Beroendekonfiguration
<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Du kan också använda Spring Cloud Azure Stream Event Hubs Starter, som du ser i följande exempel för Maven:
<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfiguration
Pärmen innehåller följande tre delar av konfigurationsalternativen:
Egenskaper för anslutningskonfiguration
Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Event Hubs.
Not
Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.
Anslutningskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:
| Egenskap | Typ | Beskrivning | 
|---|---|---|
| spring.cloud.azure.eventhubs.enabled | boolesk | Om en Azure Event Hubs är aktiverad. | 
| spring.cloud.azure.eventhubs.connection-string | Sträng | Anslutningssträngsvärde för Event Hubs-namnområde. | 
| spring.cloud.azure.eventhubs.namespace | Sträng | Event Hubs-namnområdesvärde, som är prefixet för det fullständiga domännamnet. Ett FQDN ska bestå av NamespaceName.DomainName | 
| spring.cloud.azure.eventhubs.domain-name | Sträng | Domännamn för ett Namnområdesvärde för Azure Event Hubs. | 
| spring.cloud.azure.eventhubs.custom-endpoint-address | Sträng | Anpassad slutpunktsadress. | 
Dricks
Vanliga konfigurationsalternativ för Azure Service SDK kan konfigureras även för Spring Cloud Azure Stream Event Hubs-pärmen. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.eventhubs..
Pärmen stöder också Spring Could Azure Resource Manager- som standard. Mer information om hur du hämtar anslutningssträngen med säkerhetsobjekt som inte beviljas med Data relaterade roller finns i avsnittet Grundläggande användning i Spring Could Azure Resource Manager.
Konfigurationsegenskaper för kontrollpunkt
Det här avsnittet innehåller konfigurationsalternativen för Storage Blobs-tjänsten, som används för att bevara partitionsägarskap och kontrollpunktsinformation.
Not
Från version 4.0.0, när egenskapen för spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists inte aktiveras manuellt, skapas ingen Lagringscontainer automatiskt med namnet från spring.cloud.stream.bindings.binding-name.destination.
Kontrollpunktskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:
| Egenskap | Typ | Beskrivning | 
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolesk | Om du vill tillåta att containrar skapas om de inte finns. | 
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Sträng | Namn på lagringskontot. | 
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Sträng | Åtkomstnyckel för lagringskonto. | 
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Sträng | Namn på lagringscontainer. | 
Dricks
Vanliga konfigurationsalternativ för Azure Service SDK kan även konfigureras för Lagringsblobkontrollpunktsarkiv. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.eventhubs.processor.checkpoint-store.
Konfigurationsegenskaper för Azure Event Hubs-bindning
Följande alternativ är indelade i fyra avsnitt: Konsumentegenskaper, Avancerade konsumentkonfigurationer, Producentegenskaper och Avancerade producentkonfigurationer.
Konsumentegenskaper
Dessa egenskaper exponeras via EventHubsConsumerProperties.
Not
För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Event Hubs inställningsvärden för alla kanaler, i formatet spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.
Konfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:
| Egenskap | Typ | Beskrivning | 
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode (Kontrollpunkt) | Kontrollpunktsläge som används när konsumenten bestämmer hur ett kontrollpunktsmeddelande ska visas | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Heltal | Bestämmer mängden meddelande för varje partition för att göra en kontrollpunkt. Börjar gälla endast när PARTITION_COUNTkontrollpunktsläge används. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Varaktighet | Bestämmer tidsintervallet för att göra en kontrollpunkt. Börjar gälla endast när TIMEkontrollpunktsläge används. | 
| spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-storlek | Heltal | Det maximala antalet händelser i en batch. Krävs för batch-konsumentläge. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Varaktighet | Den maximala tidsperioden för batchförbrukning. Börjar gälla endast när batch-konsumentläget är aktiverat och är valfritt. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Varaktighet | Varaktighet för intervalltid för uppdatering. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LastbalanseringStrategi | Belastningsutjämningsstrategin. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Varaktighet | Tidsåtgången efter vilken ägarskapet för partitionen upphör att gälla. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolesk | Om händelseprocessorn ska begära information om den senaste begärda händelsen på den associerade partitionen och spåra den informationen när händelser tas emot. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Heltal | Antalet som används av konsumenten för att kontrollera antalet händelser som händelsehubbens konsument aktivt tar emot och köar lokalt. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Mappa med nyckeln som partitions-ID och värden för StartPositionProperties | Kartan som innehåller den händelseposition som ska användas för varje partition om en kontrollpunkt för partitionen inte finns i kontrollpunktsarkivet. Den här kartan är bortkopplad från partitions-ID:t. | 
Not
Konfigurationen initial-partition-event-position accepterar en map för att ange den första positionen för varje händelsehubb. Därför är dess nyckel partitions-ID och värdet är StartPositionProperties, som innehåller egenskaper för offset, sekvensnummer, köad datumtid och huruvida inkluderande. Du kan till exempel ange den som
spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Avancerad konsumentkonfiguration
Ovanstående anslutning, kontrollpunktoch vanliga Azure SDK-klient konfiguration stöder anpassning för varje bindemedelskonsument, som du kan konfigurera med prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..
Producentegenskaper
Dessa egenskaper exponeras via EventHubsProducerProperties.
Not
För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Event Hubs inställningsvärden för alla kanaler, i formatet spring.cloud.stream.eventhubs.default.producer.<property>=<value>.
Producentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:
| Egenskap | Typ | Beskrivning | 
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | boolesk | Switch-flaggan för synkronisering av producent. Om det är sant väntar producenten på ett svar efter en sändningsåtgärd. | 
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | lång | Hur lång tid det går att vänta på ett svar efter en sändningsåtgärd. Börjar gälla endast när en synkroniseringsproducent är aktiverad. | 
Avancerad producentkonfiguration
Ovanstående anslutning och vanliga Azure SDK-klient konfiguration stöder anpassning för varje pärmproducent, som du kan konfigurera med prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..
Grundläggande användning
Skicka och ta emot meddelanden från/till Event Hubs
- Fyll i konfigurationsalternativen med information om autentiseringsuppgifter. - För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml: - spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL- Not - Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar. 
- För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil: - spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
 
Not
De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.
- För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil: - spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
- Definiera leverantör och konsument. - @Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Stöd för partitionering
En PartitionSupplier med partitionsinformation från användaren skapas för att konfigurera partitionsinformationen om meddelandet som ska skickas. Följande flödesschema visar processen för att hämta olika prioriteringar för partitions-ID och nyckel:
               
              
            
Stöd för Batch-konsument
- Ange konfigurationsalternativen för batchen enligt följande exempel: - spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
- Definiera leverantör och konsument. - För kontrollpunktsläge som - BATCHkan du använda följande kod för att skicka meddelanden och använda i batchar.- @Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }- För kontrollpunktsläge som - MANUALkan du använda följande kod för att skicka meddelanden och använda/kontrollpunkt i batchar.- @Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Not
I batchförbrukningsläget är standardinnehållstypen för Spring Cloud Stream-pärm application/json, så se till att meddelandets nyttolast är anpassad till innehållstypen. När du till exempel använder standardinnehållstypen för application/json för att ta emot meddelanden med String nyttolast ska nyttolasten vara JSON String, omgiven av dubbla citattecken för den ursprungliga String texten. För text/plain innehållstyp kan det vara ett String objekt direkt. Mer information finns i Spring Cloud Stream Content Type Negotiation.
Hantera felmeddelanden
- Hantera utgående bindningsfelmeddelanden - Som standard skapar Spring Integration en global felkanal med namnet - errorChannel. Konfigurera följande meddelandeslutpunkt för att hantera utgående bindningsfelmeddelanden.- @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
- Hantera inkommande bindningsfelmeddelanden - Spring Cloud Stream Event Hubs Binder stöder en lösning för att hantera fel för inkommande meddelandebindningar: felhanterare. - Felhanterare: - Spring Cloud Stream exponerar en mekanism som du kan använda för att tillhandahålla en anpassad felhanterare genom att lägga till en - Consumersom accepterar- ErrorMessageinstanser. Mer information finns i Hantera felmeddelanden i Spring Cloud Stream-dokumentationen.- Felhanterare för bindningsstandard - Konfigurera en enda - Consumerböna för att använda alla inkommande bindningsfelmeddelanden. Följande standardfunktion prenumererar på varje inkommande bindningsfelkanal:- @Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }- Du måste också ange egenskapen - spring.cloud.stream.default.error-handler-definitiontill funktionsnamnet.
- Bindningsspecifik felhanterare - Konfigurera en - Consumerböna för att använda de specifika inkommande bindningsfelmeddelandena. Följande funktion prenumererar på den specifika felkanalen för inkommande bindning och har högre prioritet än felhanteraren för bindningsstandard:- @Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }- Du måste också ange egenskapen - spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitiontill funktionsnamnet.
 
Meddelandehuvuden för Event Hubs
De grundläggande meddelandehuvuden som stöds finns i meddelanderubrikerna Event Hubs i Spring Cloud Azure-stöd för Spring Integration.
Stöd för flera pärmar
Anslutning till flera Event Hubs-namnområden stöds också med hjälp av flera pärmar. Det här exemplet tar en anslutningssträng som exempel. Autentiseringsuppgifter för tjänstens huvudnamn och hanterade identiteter stöds också. Du kan ange relaterade egenskaper i varje bindemedels miljöinställningar.
- Om du vill använda flera pärmar med Event Hubs konfigurerar du följande egenskaper i din application.yml-fil: - spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000- Not - Den tidigare programfilen visar hur du konfigurerar en enda standardsökare för programmet till alla bindningar. Om du vill konfigurera polleraren för en specifik bindning kan du använda en konfiguration som - spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.- Not - Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar. 
- Vi behöver definiera två leverantörer och två konsumenter: - @Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Resursetablering
Event Hubs binder stöder etablering av händelsehubb och konsumentgrupp. Användarna kan använda följande egenskaper för att aktivera etablering.
spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Not
De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.
Prover
Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.
Spring Cloud Stream Binder för Azure Service Bus
Viktiga begrepp
Spring Cloud Stream Binder för Azure Service Bus tillhandahåller bindningsimplementeringen för Spring Cloud Stream Framework. Den här implementeringen använder Spring Integration Service Bus-kanalkort i grunden.
Schemalagt meddelande
Den här pärmen stöder sändning av meddelanden till ett ämne för fördröjd bearbetning. Användare kan skicka schemalagda meddelanden med rubrik x-delay uttrycker i millisekunder en fördröjningstid för meddelandet. Meddelandet levereras till respektive avsnitt efter x-delay millisekunder.
Konsumentgrupp
Service Bus-ämnet ger liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik.
Den här pärmen förlitar sig på Subscription av ett ämne för att fungera som en konsumentgrupp.
Beroendekonfiguration
<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Du kan också använda Spring Cloud Azure Stream Service Bus Starter, som du ser i följande exempel för Maven:
<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfiguration
Pärmen innehåller följande två delar av konfigurationsalternativen:
Egenskaper för anslutningskonfiguration
Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Service Bus.
Not
Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.
Anslutningskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:
| Egenskap | Typ | Beskrivning | 
|---|---|---|
| spring.cloud.azure.servicebus.enabled | boolesk | Om en Azure Service Bus är aktiverad. | 
| spring.cloud.azure.servicebus.connection-string | Sträng | Anslutningssträngsvärde för Service Bus-namnområde. | 
| spring.cloud.azure.servicebus.custom-endpoint-address | Sträng | Den anpassade slutpunktsadress som ska användas vid anslutning till Service Bus. | 
| spring.cloud.azure.servicebus.namespace | Sträng | Service Bus-namnområdesvärde, som är prefixet för FQDN. Ett FQDN ska bestå av NamespaceName.DomainName | 
| spring.cloud.azure.servicebus.domain-name | Sträng | Domännamn för ett Azure Service Bus-namnområdesvärde. | 
Not
Vanliga konfigurationsalternativ för Azure Service SDK kan konfigureras även för Spring Cloud Azure Stream Service Bus-pärmen. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.servicebus..
Pärmen stöder också Spring Could Azure Resource Manager- som standard. Mer information om hur du hämtar anslutningssträngen med säkerhetsobjekt som inte beviljas med Data relaterade roller finns i avsnittet Grundläggande användning i Spring Could Azure Resource Manager.
Konfigurationsegenskaper för Azure Service Bus-bindning
Följande alternativ är indelade i fyra avsnitt: Konsumentegenskaper, Avancerade konsumentkonfigurationer, Producentegenskaper och Avancerade producentkonfigurationer.
Konsumentegenskaper
Dessa egenskaper exponeras via ServiceBusConsumerProperties.
Not
För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Service Bus inställningsvärden för alla kanaler, i formatet spring.cloud.stream.servicebus.default.consumer.<property>=<value>.
Konsumentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:
| Egenskap | Typ | Standard | Beskrivning | 
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | boolesk | falsk | Om de misslyckade meddelandena dirigeras till DLQ. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Heltal | 1 | Maximalt antal samtidiga meddelanden som Service Bus-processorklienten ska bearbeta. När sessionen är aktiverad gäller den för varje session. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Heltal | noll | Maximalt antal samtidiga sessioner som ska bearbetas vid en viss tidpunkt. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolesk | noll | Om sessionen är aktiverad. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-idle-timeout | Varaktighet | noll | Anger den maximala tiden (varaktighet) för att vänta tills ett meddelande tas emot för den aktiva sessionen. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Heltal | 0 | Antalet prefetch för Service Bus-processorklienten. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Underkö | ingen | Typen av underkö som ska anslutas till. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Varaktighet | 5 m | Hur lång tid det går att fortsätta förnya låset automatiskt. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Mottagningsläget för Service Bus-processorklienten. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolesk | sann | Om meddelanden ska regleras automatiskt. Om det anges som falskt läggs ett meddelandehuvud för Checkpointertill för att göra det möjligt för utvecklare att reglera meddelanden manuellt. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-storlek-i-megabyte | Lång | 1024 | Den maximala storleken på kön/ämnet i megabyte, vilket är storleken på det minne som allokerats för kön/ämnet. | 
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Varaktighet | P10675199DT2H48M5.4775807S. (10675199 dagar, 2 timmar, 48 minuter, 5 sekunder och 477 millisekunder) | Varaktigheten efter vilken meddelandet upphör att gälla, från och med när meddelandet skickas till Service Bus. | 
Viktig
När du använder Azure Resource Manager (ARM) måste du konfigurera egenskapen spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Mer information finns i exemplet servicebus-queue-binder-arm på GitHub.
Avancerad konsumentkonfiguration
Ovanstående anslutning och vanliga Azure SDK-klient konfiguration stöder anpassning för varje bindemedelskonsument, som du kan konfigurera med prefixet spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..
Producentegenskaper
Dessa egenskaper exponeras via ServiceBusProducerProperties.
Not
För att undvika upprepning, eftersom version 4.17.0 och 5.11.0, stöder Spring Cloud Azure Stream Binder Service Bus inställningsvärden för alla kanaler, i formatet spring.cloud.stream.servicebus.default.producer.<property>=<value>.
Producentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:
| Egenskap | Typ | Standard | Beskrivning | 
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | boolesk | falsk | Växla flagga för synkronisering av producent. | 
| spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | lång | 10 000 | Timeout-värde för sändning av producent. | 
| spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | noll | Service Bus-entitetstypen för producenten, som krävs för den bindande producenten. | 
| spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes | Lång | 1024 | Den maximala storleken på kön/ämnet i megabyte, vilket är storleken på det minne som allokerats för kön/ämnet. | 
| spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Varaktighet | P10675199DT2H48M5.4775807S. (10675199 dagar, 2 timmar, 48 minuter, 5 sekunder och 477 millisekunder) | Varaktigheten efter vilken meddelandet upphör att gälla, från och med när meddelandet skickas till Service Bus. | 
Viktig
När du använder bindningsproducenten måste egenskapen för spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type konfigureras.
Avancerad producentkonfiguration
Ovanstående anslutning och vanliga Azure SDK-klient konfiguration stöder anpassning för varje pärmproducent, som du kan konfigurera med prefixet spring.cloud.stream.servicebus.bindings.<binding-name>.producer..
Grundläggande användning
Skicka och ta emot meddelanden från/till Service Bus
- Fyll i konfigurationsalternativen med information om autentiseringsuppgifter. - För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml: - spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic- Not - Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar. 
- För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil: - spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
 
Not
De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.
- För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil: - spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
- Definiera leverantör och konsument. - @Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Stöd för partitionsnyckel
Pärmen stöder Service Bus-partitionering genom att tillåta inställning av partitionsnyckel och sessions-ID i meddelandehuvudet. I det här avsnittet beskrivs hur du anger partitionsnyckel för meddelanden.
Spring Cloud Stream tillhandahåller en spEL-uttrycksegenskap för partitionsnyckeln spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Du kan till exempel ange den här egenskapen som "'partitionKey-' + headers[<message-header-key>]" och lägga till ett huvud med namnet message-header-key. Spring Cloud Stream använder värdet för det här huvudet när uttrycket utvärderas för att tilldela en partitionsnyckel. Följande kod ger en exempelproducent:
@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}
Sessionsstöd
Pärmen stöder meddelandesessioner i Service Bus. Sessions-ID för ett meddelande kan anges via meddelanderubriken.
@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}
Not
Enligt Service Bus-partitioneringhar sessions-ID högre prioritet än partitionsnyckel. Så när både ServiceBusMessageHeaders#SESSION_ID och ServiceBusMessageHeaders#PARTITION_KEY huvuden anges används värdet för sessions-ID:t så småningom för att skriva över värdet för partitionsnyckeln.
Hantera felmeddelanden
- Hantera utgående bindningsfelmeddelanden - Som standard skapar Spring Integration en global felkanal med namnet - errorChannel. Konfigurera följande meddelandeslutpunkt för att hantera utgående bindningsfel.- @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
- Hantera inkommande bindningsfelmeddelanden - Spring Cloud Stream Service Bus Binder stöder två lösningar för att hantera fel för inkommande meddelandebindningar: binder-felhanteraren och hanterare. - Binder-felhanterare: - Standardhanteraren för bindemedelsfel hanterar den inkommande bindningen. Du använder den här hanteraren för att skicka misslyckade meddelanden till kön med obeställbara meddelanden när - spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejectedär aktiverat. Annars avbryts de misslyckade meddelandena. Hanteraren för bindemedelsfel är ömsesidigt uteslutande med andra angivna felhanterare.- Felhanterare: - Spring Cloud Stream exponerar en mekanism som du kan använda för att tillhandahålla en anpassad felhanterare genom att lägga till en - Consumersom accepterar- ErrorMessageinstanser. Mer information finns i Hantera felmeddelanden i Spring Cloud Stream-dokumentationen.- Felhanterare för bindningsstandard - Konfigurera en enda - Consumerböna för att använda alla inkommande bindningsfelmeddelanden. Följande standardfunktion prenumererar på varje inkommande bindningsfelkanal:- @Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }- Du måste också ange egenskapen - spring.cloud.stream.default.error-handler-definitiontill funktionsnamnet.
- Bindningsspecifik felhanterare - Konfigurera en - Consumerböna för att använda de specifika inkommande bindningsfelmeddelandena. Följande funktion prenumererar på den specifika inkommande bindningsfelkanalen med högre prioritet än felhanteraren för bindningsstandard.- @Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }- Du måste också ange egenskapen - spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitiontill funktionsnamnet.
 
Service Bus-meddelandehuvuden
De grundläggande meddelandehuvuden som stöds finns i avsnittet Service Bus-meddelandehuvuden i Spring Cloud Azure-stöd för Spring Integration.
Not
När du ställer in partitionsnyckeln är prioriteten för meddelandehuvudet högre än spring cloud stream-egenskapen. Så spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression börjar gälla endast när ingen av ServiceBusMessageHeaders#SESSION_ID- och ServiceBusMessageHeaders#PARTITION_KEY-rubrikerna har konfigurerats.
Stöd för flera pärmar
Anslutning till flera Service Bus-namnområden stöds också med hjälp av flera pärmar. Det här exemplet tar anslutningssträngen som exempel. Autentiseringsuppgifter för tjänstens huvudnamn och hanterade identiteter stöds också, användare kan ange relaterade egenskaper i varje pärms miljöinställningar.
- Om du vill använda flera bindemedel för ServiceBus konfigurerar du följande egenskaper i din application.yml-fil: - spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000- Not - Den tidigare programfilen visar hur du konfigurerar en enda standardsökare för programmet till alla bindningar. Om du vill konfigurera polleraren för en specifik bindning kan du använda en konfiguration som - spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.- Not - Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar. 
- vi behöver definiera två leverantörer och två konsumenter - @Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Resursetablering
Service Bus Binder stöder etablering av kö, ämne och prenumeration. Användarna kan använda följande egenskaper för att aktivera etablering.
spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Not
De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.
Anpassa Service Bus-klientegenskaper
Utvecklare kan använda AzureServiceClientBuilderCustomizer för att anpassa Service Bus-klientegenskaper. I följande exempel anpassas egenskapen sessionIdleTimeout i ServiceBusClientBuilder:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Prover
Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.