Dela via


Spring Cloud Azure-stöd för Spring Integration

Spring Integration Extension för Azure tillhandahåller Spring Integration-kort för de olika tjänster som tillhandahålls av Azure SDK för Java. Vi tillhandahåller Spring Integration-stöd för dessa Azure-tjänster: Event Hubs, Service Bus, Storage Queue. Följande är en lista över kort som stöds:

Spring-integrering med Azure Event Hubs

Viktiga begrepp

Azure Event Hubs är en stordataströmningsplattform och händelseinmatningstjänst. Den kan ta emot och bearbeta miljontals händelser per sekund. Data som skickas till en händelsehubb kan transformeras och lagras med hjälp av valfri realtidsanalysprovider eller batchbearbetning/lagringskort.

Spring Integration möjliggör enklare meddelanden i Spring-baserade program och stöder integrering med externa system via deklarativa kort. Dessa kort ger en högre abstraktionsnivå jämfört med Spring-stöd för fjärrkommunikation, meddelanden och schemaläggning. Spring Integration for Event Hubs-tilläggsprojektet tillhandahåller inkommande och utgående kanalkort och gatewayer för Azure Event Hubs.

Not

RxJava-stöd-API:er tas bort från version 4.0.0. Mer information finns i Javadoc.

Konsumentgrupp

Event Hubs har liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Medan Kafka lagrar alla incheckade förskjutningar i asynkron meddelandekö måste du lagra förskjutningar av Event Hubs-meddelanden som bearbetas manuellt. Event Hubs SDK tillhandahåller funktionen för att lagra sådana förskjutningar i Azure Storage.

Stöd för partitionering

Event Hubs har ett liknande koncept för fysisk partition som Kafka. Men till skillnad från Kafkas automatiska ombalansering mellan konsumenter och partitioner tillhandahåller Event Hubs ett slags förebyggande läge. Lagringskontot fungerar som ett lån för att avgöra vilken partition som ägs av vilken konsument. När en ny konsument startar försöker den stjäla vissa partitioner från de flesta tunga användare för att uppnå arbetsbelastningsutjämningen.

För att ange belastningsutjämningsstrategin kan utvecklare använda EventHubsContainerProperties för konfigurationen. I följande avsnitt ett exempel på hur du konfigurerar EventHubsContainerProperties.

Stöd för Batch-konsument

EventHubsInboundChannelAdapter stöder batchförbrukningsläget. För att aktivera det kan användarna ange lyssnarläget som ListenerMode.BATCH när de skapar en EventHubsInboundChannelAdapter instans. När det är aktiverat tas ett meddelande som nyttolasten är en lista över batchade händelser emot och skickas till nedströmskanalen. Varje meddelanderubrik konverteras också som en lista, där innehållet är det associerade rubrikvärdet som parsas från varje händelse. För de gemensamma huvudena för partitions-ID, checkpointer och de senaste egenskaperna visas de som ett enda värde för hela batchen med händelser som delar samma. Mer information finns i avsnittet Event Hubs-meddelandehuvuden.

Not

Kontrollpunktshuvudet finns bara när LÄGET MANUELL kontrollpunkt används.

Kontrollpunkter för batchkonsumenten stöder två lägen: BATCH och MANUAL. BATCH läget är ett automatiskt kontrollpunktsläge för att kontrollera hela batchen med händelser tillsammans när de tas emot. MANUAL läge är att kontrollera händelserna av användare. När den används skickas Checkpointer- till meddelanderubriken och användarna kan använda den för att göra kontrollpunkter.

Principen för batchförbrukning kan anges av egenskaperna för max-size och max-wait-time, där max-size är en nödvändig egenskap medan max-wait-time är valfritt. Utvecklare kan använda EventHubsContainerProperties för konfigurationen för att ange den batchkrävande strategin. I följande avsnitt ett exempel på hur du konfigurerar EventHubsContainerProperties.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguration

Den här startprogrammet innehåller följande tre delar av konfigurationsalternativen:

Egenskaper för anslutningskonfiguration

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Event Hubs.

Not

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.

Anslutningskonfigurerbara egenskaper för spring-cloud-azure-starter-integration-eventhubs:

Egenskap Typ Beskrivning
spring.cloud.azure.eventhubs.enabled boolesk Om en Azure Event Hubs är aktiverad.
spring.cloud.azure.eventhubs.connection-string Sträng Anslutningssträngsvärde för Event Hubs-namnområde.
spring.cloud.azure.eventhubs.namespace Sträng Event Hubs-namnområdesvärde, som är prefixet för det fullständiga domännamnet. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Sträng Domännamn för ett Namnområdesvärde för Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Sträng Anpassad slutpunktsadress.
spring.cloud.azure.eventhubs.shared-connection Boolesk Om den underliggande EventProcessorClient och EventHubProducerAsyncClient använder samma anslutning. Som standard skapas en ny anslutning och används för varje händelsehubbklient som skapas.

Konfigurationsegenskaper för kontrollpunkt

Det här avsnittet innehåller konfigurationsalternativen för Storage Blobs-tjänsten, som används för att bevara partitionsägarskap och kontrollpunktsinformation.

Not

Från version 4.0.0, när egenskapen för spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists inte aktiveras manuellt, skapas ingen lagringscontainer automatiskt.

Kontrollpunktskonfigurerbara egenskaper för spring-cloud-azure-starter-integration-eventhubs:

Egenskap Typ Beskrivning
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolesk Om du vill tillåta att containrar skapas om de inte finns.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Sträng Namn på lagringskontot.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Sträng Åtkomstnyckel för lagringskonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Sträng Namn på lagringscontainer.

Vanliga konfigurationsalternativ för Azure Service SDK kan även konfigureras för Lagringsblobkontrollpunktsarkiv. Konfigurationsalternativen som stöds introduceras i Spring Cloud Azure-konfigurationenoch kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet för spring.cloud.azure.eventhubs.processor.checkpoint-store.

Konfigurationsegenskaper för Event Hub-processor

EventHubsInboundChannelAdapter använder EventProcessorClient för att använda meddelanden från en händelsehubb för att konfigurera de övergripande egenskaperna för en EventProcessorClientkan utvecklare använda EventHubsContainerProperties för konfigurationen. Se följande avsnitt om hur du arbetar med EventHubsInboundChannelAdapter.

Grundläggande användning

Skicka meddelanden till Azure Event Hubs

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      

      Not

      Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

    • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  1. Skapa DefaultMessageHandler med den EventHubsTemplate bönan för att skicka meddelanden till Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Skicka meddelanden med hjälp av gatewayen.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Ta emot meddelanden från Azure Event Hubs

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

  2. Skapa en böna med meddelandekanalen som indatakanal.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Skapa EventHubsInboundChannelAdapter med EventHubsMessageListenerContainer böna för att ta emot meddelanden från Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Skapa en meddelandemottagarebindning med EventHubsInboundChannelAdapter via meddelandekanalen som skapades tidigare.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurera EventHubsMessageConverter för att anpassa objectMapper

EventHubsMessageConverter görs som en konfigurerbar böna så att användarna kan anpassa ObjectMapper.

Stöd för Batch-konsument

Att använda meddelanden från Event Hubs i batchar liknar exemplet ovan, förutom att användarna bör ange de batchkrävande relaterade konfigurationsalternativen för EventHubsInboundChannelAdapter.

När du skapar EventHubsInboundChannelAdapterska lyssnarläget anges som BATCH. När du skapar en böna för EventHubsMessageListenerContaineranger du kontrollpunktsläget som antingen MANUAL eller BATCH, och batchalternativen kan konfigureras efter behov.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Meddelandehuvuden för Event Hubs

I följande tabell visas hur event hubs-meddelandeegenskaper mappas till Spring-meddelandehuvuden. För Azure Event Hubs anropas meddelandet som event.

Mappning mellan Event Hubs-meddelande/händelseegenskaper och Spring Message-huvuden i inspelningslyssningsläge:

Händelseegenskaper för Event Hubs Spring Message-huvudkonstanter Typ Beskrivning
Köad tid EventHubsHeaders#ENQUEUED_TIME Ögonblick Omedelbart, i UTC, när händelsen angavs i Event Hub-partitionen.
Uppväga EventHubsHeaders#OFFSET Lång Förskjutningen av händelsen när den togs emot från den associerade Event Hub-partitionen.
Partitionsnyckel AzureHeaders#PARTITION_KEY Sträng Partitionens hashningsnyckel om den angavs när händelsen ursprungligen publicerades.
Partitions-ID AzureHeaders#RAW_PARTITION_ID Sträng Partitions-ID för händelsehubben.
Sekvensnummer EventHubsHeaders#SEQUENCE_NUMBER Lång Sekvensnumret som tilldelades händelsen när den angavs i den associerade Event Hub-partitionen.
Senaste egenskaper för enqueued-händelse EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties (på engelska) Egenskaperna för den senaste köade händelsen i den här partitionen.
NA AzureHeaders#CHECKPOINTER Kontrollpunkt Huvudet för kontrollpunkten det specifika meddelandet.

Användare kan parsa meddelanderubrikerna för relaterad information om varje händelse. Om du vill ange ett meddelandehuvud för händelsen placeras alla anpassade rubriker som en programegenskap för en händelse, där rubriken anges som egenskapsnyckel. När händelser tas emot från Event Hubs konverteras alla programegenskaper till meddelandehuvudet.

Not

Meddelandehuvuden för partitionsnyckel, köad tid, förskjutning och sekvensnummer stöds inte för att ställas in manuellt.

När batch-konsumentläget är aktiverat visas de specifika rubrikerna för batchmeddelanden på följande sätt, som innehåller en lista med värden från varje händelse i Händelsehubbar.

Mappning mellan Event Hubs-meddelande/händelseegenskaper och Spring Message-huvuden i Batch-lyssnarläge:

Händelseegenskaper för Event Hubs Spring Batch-meddelandehuvudkonstanter Typ Beskrivning
Köad tid EventHubsHeaders#ENQUEUED_TIME Lista över snabbmeddelanden Lista över ögonblicket, i UTC, över när varje händelse angavs i Event Hub-partitionen.
Uppväga EventHubsHeaders#OFFSET Lista över långa Lista över förskjutningen för varje händelse när den togs emot från den associerade Event Hub-partitionen.
Partitionsnyckel AzureHeaders#PARTITION_KEY Lista över sträng Lista över partitionshashingnyckeln om den angavs när varje händelse ursprungligen publicerades.
Sekvensnummer EventHubsHeaders#SEQUENCE_NUMBER Lista över långa Lista över sekvensnumret som tilldelades varje händelse när den angavs i den associerade Event Hub-partitionen.
Systemegenskaper EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista över karta Lista över systemegenskaperna för varje händelse.
Programegenskaper EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista över karta Lista över programegenskaperna för varje händelse, där alla anpassade meddelandehuvuden eller händelseegenskaper placeras.

Not

När du publicerar meddelanden tas alla batchhuvuden ovan bort från meddelandena om de finns.

Prover

Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.

Spring-integrering med Azure Service Bus

Viktiga begrepp

Spring Integration möjliggör enklare meddelanden i Spring-baserade program och stöder integrering med externa system via deklarativa kort.

Spring Integration for Azure Service Bus-tilläggsprojektet tillhandahåller inkommande och utgående kanalkort för Azure Service Bus.

Not

CompletableFuture-stöd-API:er har föråldrats från version 2.10.0 och ersätts av Reactor Core från version 4.0.0. Mer information finns i Javadoc.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguration

Den här startprogrammet innehåller följande två delar av konfigurationsalternativen:

Egenskaper för anslutningskonfiguration

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Service Bus.

Not

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.

Anslutningskonfigurerbara egenskaper för spring-cloud-azure-starter-integration-servicebus:

Egenskap Typ Beskrivning
spring.cloud.azure.servicebus.enabled boolesk Om en Azure Service Bus är aktiverad.
spring.cloud.azure.servicebus.connection-string Sträng Anslutningssträngsvärde för Service Bus-namnområde.
spring.cloud.azure.servicebus.custom-endpoint-address Sträng Den anpassade slutpunktsadress som ska användas vid anslutning till Service Bus.
spring.cloud.azure.servicebus.namespace Sträng Service Bus-namnområdesvärde, som är prefixet för FQDN. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Sträng Domännamn för ett Azure Service Bus-namnområdesvärde.

Konfigurationsegenskaper för Service Bus-processor

ServiceBusInboundChannelAdapter använder ServiceBusProcessorClient för att använda meddelanden för att konfigurera de övergripande egenskaperna för en ServiceBusProcessorClient, kan utvecklare använda ServiceBusContainerProperties för konfigurationen. Se följande avsnitt om hur du arbetar med ServiceBusInboundChannelAdapter.

Grundläggande användning

Skicka meddelanden till Azure Service Bus

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      

      Not

      Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

    • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  1. Skapa DefaultMessageHandler med den ServiceBusTemplate bönan för att skicka meddelanden till Service Bus, ange entitetstypen för ServiceBusTemplate. Det här exemplet tar Service Bus Queue som exempel.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Skicka meddelanden med hjälp av gatewayen.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Ta emot meddelanden från Azure Service Bus

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

  2. Skapa en böna med meddelandekanalen som indatakanal.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Skapa ServiceBusInboundChannelAdapter med den ServiceBusMessageListenerContainer bönan för att ta emot meddelanden till Service Bus. Det här exemplet tar Service Bus Queue som exempel.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Skapa en meddelandemottagarebindning med ServiceBusInboundChannelAdapter via meddelandekanalen som vi skapade tidigare.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurera ServiceBusMessageConverter för att anpassa objectMapper

ServiceBusMessageConverter görs som en konfigurerbar böna så att användarna kan anpassa ObjectMapper.

Service Bus-meddelandehuvuden

För vissa Service Bus-huvuden som kan mappas till flera Spring-huvudkonstanter visas prioriteten för olika Spring-huvuden.

Mappning mellan Service Bus-huvuden och Spring-huvuden:

Service Bus-meddelandehuvuden och egenskaper Spring-meddelandehuvudkonstanter Typ Konfigurerbara Beskrivning
Innehållstyp MessageHeaders#CONTENT_TYPE Sträng Ja Beskrivningen RFC2045 innehållstyp för meddelandet.
Korrelations-ID ServiceBusMessageHeaders#CORRELATION_ID Sträng Ja Korrelations-ID för meddelandet
Meddelande-ID ServiceBusMessageHeaders#MESSAGE_ID Sträng Ja Meddelandets meddelande-ID har högre prioritet än MessageHeaders#ID.
Meddelande-ID MessageHeaders#ID universellt unik identifierare (UUID) Ja Meddelande-ID:t för meddelandet har den här rubriken lägre prioritet än ServiceBusMessageHeaders#MESSAGE_ID.
Partitionsnyckel ServiceBusMessageHeaders#PARTITION_KEY Sträng Ja Partitionsnyckeln för att skicka meddelandet till en partitionerad entitet.
Besvara MessageHeaders#REPLY_CHANNEL Sträng Ja Adressen till en entitet som svar ska skickas till.
Svara på sessions-ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Sträng Ja Egenskapen ReplyToGroupId för meddelandet.
Schemalagd kötidsutc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ja Den datumtid då meddelandet ska anges i Service Bus har den här rubriken högre prioritet än AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Schemalagd kötidsutc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Heltal Ja Den datumtid då meddelandet ska anges i Service Bus har den här rubriken lägre prioritet än ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Sessions-ID ServiceBusMessageHeaders#SESSION_ID Sträng Ja Sessions-IDentifier för en sessionsmedveten entitet.
Tid att leva ServiceBusMessageHeaders#TIME_TO_LIVE Varaktighet Ja Hur lång tid det tar innan det här meddelandet upphör att gälla.
Till ServiceBusMessageHeaders#TO Sträng Ja Meddelandets "till"-adress, reserverad för framtida användning i routningsscenarier och som för närvarande ignoreras av själva koordinatorn.
Subjekt ServiceBusMessageHeaders#SUBJECT Sträng Ja Ämnet för meddelandet.
Beskrivning av fel med obeställbara bokstäver ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Sträng Nej Beskrivningen av ett meddelande som har fått en obeställd bokstav.
Orsak till död bokstav ServiceBusMessageHeaders#DEAD_LETTER_REASON Sträng Nej Anledningen till att ett meddelande var obeställt.
Källa med obeställbara bokstäver ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Sträng Nej Entiteten där meddelandet var obeställt.
Antal leveranser ServiceBusMessageHeaders#DELIVERY_COUNT lång Nej Antalet gånger meddelandet levererades till klienter.
Kodat sekvensnummer ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER lång Nej Det kodade sekvensnumret som tilldelats ett meddelande av Service Bus.
Köad tid ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nej Den datetime då det här meddelandet angavs i Service Bus.
Upphör att gälla kl. ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nej Den datumtid då det här meddelandet upphör att gälla.
Lås token ServiceBusMessageHeaders#LOCK_TOKEN Sträng Nej Låstoken för det aktuella meddelandet.
Låst tills ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nej Den datumtid då låset för det här meddelandet upphör att gälla.
Sekvensnummer ServiceBusMessageHeaders#SEQUENCE_NUMBER lång Nej Det unika nummer som tilldelats ett meddelande av Service Bus.
Stat ServiceBusMessageHeaders#STATE ServiceBusMessageState Nej Tillståndet för meddelandet, som kan vara Aktiv, Uppskjuten eller Schemalagd.

Stöd för partitionsnyckel

Den här startprogrammet stöder Service Bus-partitionering genom att tillåta inställning av partitionsnyckel och sessions-ID i meddelandehuvudet. I det här avsnittet beskrivs hur du anger partitionsnyckel för meddelanden.

Rekommenderas: Använd ServiceBusMessageHeaders.PARTITION_KEY som nyckel i rubriken.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Rekommenderas inte men stöds för närvarande: AzureHeaders.PARTITION_KEY som nyckeln i rubriken.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Not

När både ServiceBusMessageHeaders.PARTITION_KEY och AzureHeaders.PARTITION_KEY anges i meddelanderubrikerna är ServiceBusMessageHeaders.PARTITION_KEY att föredra.

Sessionsstöd

Det här exemplet visar hur du manuellt anger sessions-ID för ett meddelande i programmet.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Not

När ServiceBusMessageHeaders.SESSION_ID anges i meddelanderubrikerna och en annan ServiceBusMessageHeaders.PARTITION_KEY rubrik också anges, används värdet för sessions-ID:t så småningom för att skriva över värdet för partitionsnyckeln.

Anpassa Service Bus-klientegenskaper

Utvecklare kan använda AzureServiceClientBuilderCustomizer för att anpassa Service Bus-klientegenskaper. I följande exempel anpassas egenskapen sessionIdleTimeout i ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Prover

Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.

Spring-integrering med Azure Storage Queue

Viktiga begrepp

Azure Queue Storage är en tjänst för att lagra ett stort antal meddelanden. Du kommer åt meddelanden var som helst i världen via autentiserade anrop med HTTP eller HTTPS. Ett kömeddelande kan vara upp till 64 KB stort. En kö kan innehålla miljontals meddelanden, upp till den totala kapacitetsgränsen för ett lagringskonto. Köer används ofta för att skapa en kvarvarande arbetslogg för att bearbeta asynkront.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguration

Den här startprogrammet innehåller följande konfigurationsalternativ:

Egenskaper för anslutningskonfiguration

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Storage Queue.

Not

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs kan du läsa Auktorisera åtkomst med Microsoft Entra-ID för att se till att säkerhetsobjektet har beviljats tillräcklig behörighet för åtkomst till Azure-resursen.

Anslutningskonfigurerbara egenskaper för spring-cloud-azure-starter-integration-storage-queue:

Egenskap Typ Beskrivning
spring.cloud.azure.storage.queue.enabled boolesk Om en Azure Storage-kö är aktiverad.
spring.cloud.azure.storage.queue.connection-string Sträng Anslutningssträngsvärde för lagringskönamnområde.
spring.cloud.azure.storage.queue.accountName Sträng Namn på lagringskökonto.
spring.cloud.azure.storage.queue.accountKey Sträng Kontonyckel för lagringskö.
spring.cloud.azure.storage.queue.endpoint Sträng Tjänstslutpunkt för lagringskö.
spring.cloud.azure.storage.queue.sasToken Sträng Sas-tokenautentiseringsuppgifter
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion (på engelska) QueueServiceVersion som används när API-begäranden görs.
spring.cloud.azure.storage.queue.messageEncoding Sträng Kodning av kömeddelanden.

Grundläggande användning

Skicka meddelanden till Azure Storage Queue

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      

      Not

      Microsoft rekommenderar att du använder det säkraste tillgängliga autentiseringsflödet. Det autentiseringsflöde som beskrivs i den här proceduren, till exempel för databaser, cacheminnen, meddelanden eller AI-tjänster, kräver en mycket hög grad av förtroende för programmet och medför risker som inte finns i andra flöden. Använd endast det här flödet när säkrare alternativ, till exempel hanterade identiteter för lösenordslösa eller nyckellösa anslutningar, inte är genomförbara. För lokala datoråtgärder föredrar du användaridentiteter för lösenordslösa eller nyckellösa anslutningar.

    • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i din application.yml-fil:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i din application.yml-fil:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Not

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Used the wrong endpoint (personal and organization accounts) i Error AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera enklientapp till flera klientorganisationer på Microsoft Entra-ID.

  1. Skapa DefaultMessageHandler med den StorageQueueTemplate bönan för att skicka meddelanden till Lagringskö.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Skicka meddelanden med hjälp av gatewayen.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Ta emot meddelanden från Azure Storage Queue

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

  2. Skapa en böna med meddelandekanalen som indatakanal.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Skapa StorageQueueMessageSource med den StorageQueueTemplate bönan för att ta emot meddelanden till Lagringskö.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Skapa en meddelandemottagarebindning med StorageQueueMessageSource som skapades i det sista steget via meddelandekanalen som vi skapade tidigare.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Prover

Mer information finns i azure-spring-boot-samples lagringsplats på GitHub.