From 526b5d21b28993585657f5d7b84862e46476951b Mon Sep 17 00:00:00 2001 From: Vedran Pavic Date: Wed, 4 Sep 2024 12:22:37 +0200 Subject: [PATCH] Improve Pulsar listener container concurrency configuration This is a follow-up to gh-42062 that utilizes newly introduced `concurrency` property in `PulsarContainerProperties` to simplify auto-configuration support for Pulsar listener container concurrency. See: https://github.com/spring-projects/spring-pulsar/issues/820 --- .../pulsar/PulsarAutoConfiguration.java | 6 +----- .../pulsar/PulsarPropertiesMapper.java | 9 +-------- .../pulsar/PulsarPropertiesMapperTests.java | 14 ++------------ 3 files changed, 4 insertions(+), 25 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 9d2f4d88d319..5b5f9dc41235 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -69,7 +69,6 @@ * @author Alexander Preuß * @author Phillip Webb * @author Jonas Geiregat - * @author Vedran Pavic * @since 3.2.0 */ @AutoConfiguration @@ -188,10 +187,7 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( } pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager); this.propertiesMapper.customizeContainerProperties(containerProperties); - ConcurrentPulsarListenerContainerFactory listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>( - pulsarConsumerFactory, containerProperties); - this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory); - return listenerContainerFactory; + return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index d02d04f457d9..ccedefa7a397 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -39,7 +39,6 @@ import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.json.JsonWriter; -import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; @@ -196,16 +195,10 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie PulsarProperties.Listener properties = this.properties.getListener(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(properties::getSchemaType).to(containerProperties::setSchemaType); + map.from(properties::getConcurrency).to(containerProperties::setConcurrency); map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled); } - void customizeConcurrentPulsarListenerContainerFactory( - ConcurrentPulsarListenerContainerFactory listenerContainerFactory) { - PulsarProperties.Listener properties = this.properties.getListener(); - PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency); - } - void customizeReaderBuilder(ReaderBuilder readerBuilder) { PulsarProperties.Reader properties = this.properties.getReader(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index 79e9818685a0..08101633bd88 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -41,7 +41,6 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster; -import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -263,27 +262,18 @@ void customizeContainerProperties() { PulsarProperties properties = new PulsarProperties(); properties.getConsumer().getSubscription().setType(SubscriptionType.Shared); properties.getListener().setSchemaType(SchemaType.AVRO); + properties.getListener().setConcurrency(10); properties.getListener().setObservationEnabled(true); properties.getTransaction().setEnabled(true); PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern"); new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties); assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(containerProperties.getConcurrency()).isEqualTo(10); assertThat(containerProperties.isObservationEnabled()).isTrue(); assertThat(containerProperties.transactions().isEnabled()).isTrue(); } - @Test - void customizeConcurrentPulsarListenerContainerFactory() { - PulsarProperties properties = new PulsarProperties(); - properties.getListener().setConcurrency(10); - ConcurrentPulsarListenerContainerFactory listenerContainerFactory = mock( - ConcurrentPulsarListenerContainerFactory.class); - new PulsarPropertiesMapper(properties) - .customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory); - then(listenerContainerFactory).should().setConcurrency(10); - } - @Test @SuppressWarnings("unchecked") void customizeReaderBuilder() {