From 9e3eb1514b47b1f5da7eb951f342499f880c6a7e Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sun, 8 Sep 2024 15:22:37 -0500 Subject: [PATCH 1/3] Add Pulsar container factory customizers This commit adds the ability for users to customize the auto-configured Spring for Apache Pulsar message container factories. Each container factory holds a set of container properties that is a common target for users to configure. Allowing the customization of these properties prevents a rapid increase of configuration properties. --- ...sarListenerContainerFactoryCustomizer.java | 36 +++++++++++ ...ulsarReaderContainerFactoryCustomizer.java | 36 +++++++++++ ...sarListenerContainerFactoryCustomizer.java | 36 +++++++++++ .../pulsar/PulsarAutoConfiguration.java | 15 ++++- .../PulsarReactiveAutoConfiguration.java | 8 ++- .../pulsar/PulsarAutoConfigurationTests.java | 63 ++++++++++++++++++- .../PulsarReactiveAutoConfigurationTests.java | 31 +++++++++ .../reference/pages/messaging/pulsar.adoc | 13 ++-- 8 files changed, 226 insertions(+), 12 deletions(-) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java new file mode 100644 index 000000000000..a0738988638e --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; + +/** + * Callback interface that can be implemented to customize a + * {@link ConcurrentPulsarListenerContainerFactory}. + * + * @author Chris Bono + */ +@FunctionalInterface +public interface ConcurrentPulsarListenerContainerFactoryCustomizer { + + /** + * Customize a {@link ConcurrentPulsarListenerContainerFactory}. + * @param containerFactory the factory to customize + */ + void customize(ConcurrentPulsarListenerContainerFactory containerFactory); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java new file mode 100644 index 000000000000..a09051f2e766 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; + +/** + * Callback interface that can be implemented to customize a + * {@link DefaultPulsarReaderContainerFactory}. + * + * @author Chris Bono + */ +@FunctionalInterface +public interface DefaultPulsarReaderContainerFactoryCustomizer { + + /** + * Customize a {@link DefaultPulsarReaderContainerFactory}. + * @param containerFactory the factory to customize + */ + void customize(DefaultPulsarReaderContainerFactory containerFactory); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java new file mode 100644 index 000000000000..e6d419935d9b --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; + +/** + * Callback interface that can be implemented to customize a + * {@link DefaultReactivePulsarListenerContainerFactory}. + * + * @author Chris Bono + */ +@FunctionalInterface +public interface DefaultReactivePulsarListenerContainerFactoryCustomizer { + + /** + * Customize a {@link DefaultReactivePulsarListenerContainerFactory}. + * @param containerFactory the factory to customize + */ + void customize(DefaultReactivePulsarListenerContainerFactory containerFactory); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 5b5f9dc41235..ca045eadc069 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -178,6 +178,7 @@ private void applyConsumerBuilderCustomizers(List> ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, SchemaResolver schemaResolver, TopicResolver topicResolver, ObjectProvider pulsarTransactionManager, + ObjectProvider customizersProvider, Environment environment) { PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSchemaResolver(schemaResolver); @@ -187,7 +188,10 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( } pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager); this.propertiesMapper.customizeContainerProperties(containerProperties); - return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); + ConcurrentPulsarListenerContainerFactory containerFactory = new ConcurrentPulsarListenerContainerFactory<>( + pulsarConsumerFactory, containerProperties); + customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory)); + return containerFactory; } @Bean @@ -215,14 +219,19 @@ private void applyReaderBuilderCustomizers(List> cust @Bean @ConditionalOnMissingBean(name = "pulsarReaderContainerFactory") DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReaderFactory pulsarReaderFactory, - SchemaResolver schemaResolver, Environment environment) { + SchemaResolver schemaResolver, + ObjectProvider customizersProvider, + Environment environment) { PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); if (Threading.VIRTUAL.isActive(environment)) { readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-")); } this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); - return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); + DefaultPulsarReaderContainerFactory containerFactory = new DefaultPulsarReaderContainerFactory<>( + pulsarReaderFactory, readerContainerProperties); + customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory)); + return containerFactory; } @Configuration(proxyBeanMethods = false) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index 5ca96e70579a..ca750549680c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -164,12 +164,16 @@ private void applyMessageConsumerBuilderCustomizers(List reactivePulsarListenerContainerFactory( ReactivePulsarConsumerFactory reactivePulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver) { + TopicResolver topicResolver, + ObjectProvider customizersProvider) { ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); this.propertiesMapper.customizeContainerProperties(containerProperties); - return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties); + DefaultReactivePulsarListenerContainerFactory containerFactory = new DefaultReactivePulsarListenerContainerFactory<>( + reactivePulsarConsumerFactory, containerProperties); + customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory)); + return containerFactory; } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index d8d30e942e32..8730ab6f3e3d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -585,6 +586,36 @@ void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() { }); } + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class) + .run((context) -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class) + .hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo")); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ListenerContainerFactoryCustomizersConfig { + + @Bean + @Order(200) + ConcurrentPulsarListenerContainerFactoryCustomizer customizerFoo() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); + } + + @Bean + @Order(100) + ConcurrentPulsarListenerContainerFactoryCustomizer customizerBar() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); + } + + private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory containerFactory, + String valueToAppend) { + String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), ""); + containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend)); + } + + } + } @Nested @@ -617,7 +648,7 @@ void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { } @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + void whenHasUserDefinedReaderBuilderCustomizersAppliesInCorrectOrder() { this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer") .withUserConfiguration(ReaderBuilderCustomizersConfig.class) .run((context) -> { @@ -654,6 +685,13 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThre }); } + @Test + void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() { + this.contextRunner.withUserConfiguration(ReaderContainerFactoryCustomizersConfig.class) + .run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class) + .hasFieldOrPropertyWithValue("containerProperties.readerListener", ":bar:foo")); + } + @TestConfiguration(proxyBeanMethods = false) static class ReaderBuilderCustomizersConfig { @@ -671,6 +709,29 @@ ReaderBuilderCustomizer customizerBar() { } + @TestConfiguration(proxyBeanMethods = false) + static class ReaderContainerFactoryCustomizersConfig { + + @Bean + @Order(200) + DefaultPulsarReaderContainerFactoryCustomizer customizerFoo() { + return (containerFactory) -> appendToReaderListener(containerFactory, ":foo"); + } + + @Bean + @Order(100) + DefaultPulsarReaderContainerFactoryCustomizer customizerBar() { + return (containerFactory) -> appendToReaderListener(containerFactory, ":bar"); + } + + private void appendToReaderListener(DefaultPulsarReaderContainerFactory containerFactory, + String valueToAppend) { + String name = Objects.toString(containerFactory.getContainerProperties().getReaderListener(), ""); + containerFactory.getContainerProperties().setReaderListener(name.concat(valueToAppend)); + } + + } + } @Nested diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 0ecb9e85e9fb..eafd91b891fc 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import com.github.benmanes.caffeine.cache.Caffeine; @@ -382,6 +383,36 @@ void injectsExpectedBeans() { }); } + @Test + void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() { + this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class) + .run((context) -> assertThat(context).getBean(DefaultReactivePulsarListenerContainerFactory.class) + .hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo")); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ListenerContainerFactoryCustomizersConfig { + + @Bean + @Order(200) + DefaultReactivePulsarListenerContainerFactoryCustomizer customizerFoo() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); + } + + @Bean + @Order(100) + DefaultReactivePulsarListenerContainerFactoryCustomizer customizerBar() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); + } + + private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory containerFactory, + String valueToAppend) { + String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), ""); + containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend)); + } + + } + } @Nested diff --git a/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc b/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc index 558456f1ecad..843b6685d9d7 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc @@ -150,11 +150,11 @@ include-code::MyBean[] Spring Boot auto-configuration provides all the components necessary for `PulsarListener`, such as the `PulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying Pulsar consumers. You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties. -If you need more control over the consumer factory configuration, consider registering one or more `ConsumerBuilderCustomizer` beans. +If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ConsumerBuilderCustomizer` beans. These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation. - +If you need more control over the actual container factory configuration, consider registering one or more `ConcurrentPulsarListenerContainerFactoryCustomizer` beans. [[messaging.pulsar.receiving-reactive]] == Receiving a Message Reactively @@ -165,13 +165,13 @@ The following component creates a reactive listener endpoint on the `someTopic` include-code::MyBean[] Spring Boot auto-configuration provides all the components necessary for `ReactivePulsarListener`, such as the `ReactivePulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying reactive Pulsar consumers. -You can configure these components by specifying any of the `spring.pulsar.listener.*` and `spring.pulsar.consumer.*` prefixed application properties. +You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties. -If you need more control over the consumer factory configuration, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans. +If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans. These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation. - +If you need more control over the actual container factory configuration, consider registering one or more `DefaultReactivePulsarListenerContainerFactoryCustomizer` beans. [[messaging.pulsar.reading]] == Reading a Message @@ -187,10 +187,11 @@ include-code::MyBean[] The `@PulsarReader` relies on a `PulsarReaderFactory` to create the underlying Pulsar reader. Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties. -If you need more control over the reader factory configuration, consider registering one or more `ReaderBuilderCustomizer` beans. +If you need more control over the configuration of the reader factory used by the container factory to create readers, consider registering one or more `ReaderBuilderCustomizer` beans. These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances. You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation. +If you need more control over the actual container factory configuration, consider registering one or more `DefaultPulsarReaderContainerFactoryCustomizer` beans. [[messaging.pulsar.reading-reactive]] From cf2a94cadd146b303336d9fb0911a080eba4f98c Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sun, 8 Sep 2024 21:56:32 -0500 Subject: [PATCH 2/3] Fix checkstyle --- .../ConcurrentPulsarListenerContainerFactoryCustomizer.java | 1 + .../pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java | 1 + .../DefaultReactivePulsarListenerContainerFactoryCustomizer.java | 1 + 3 files changed, 3 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java index a0738988638e..8526c86fc856 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java @@ -23,6 +23,7 @@ * {@link ConcurrentPulsarListenerContainerFactory}. * * @author Chris Bono + * @since 3.4.0 */ @FunctionalInterface public interface ConcurrentPulsarListenerContainerFactoryCustomizer { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java index a09051f2e766..05197b36188e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java @@ -23,6 +23,7 @@ * {@link DefaultPulsarReaderContainerFactory}. * * @author Chris Bono + * @since 3.4.0 */ @FunctionalInterface public interface DefaultPulsarReaderContainerFactoryCustomizer { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java index e6d419935d9b..1ad8c6c1f99f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java @@ -23,6 +23,7 @@ * {@link DefaultReactivePulsarListenerContainerFactory}. * * @author Chris Bono + * @since 3.4.0 */ @FunctionalInterface public interface DefaultReactivePulsarListenerContainerFactoryCustomizer { From ce0714e87551e3f297e0b4b4fc7734b313866076 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 9 Sep 2024 23:14:58 -0500 Subject: [PATCH 3/3] Switch to single generic customizer contract --- ...sarListenerContainerFactoryCustomizer.java | 37 ----- ...sarListenerContainerFactoryCustomizer.java | 37 ----- .../pulsar/PulsarAutoConfiguration.java | 10 +- .../pulsar/PulsarConfiguration.java | 7 + ... => PulsarContainerFactoryCustomizer.java} | 16 +- .../PulsarContainerFactoryCustomizers.java | 58 ++++++++ .../PulsarReactiveAutoConfiguration.java | 5 +- .../pulsar/PulsarAutoConfigurationTests.java | 25 +++- .../pulsar/PulsarConfigurationTests.java | 9 ++ ...ulsarContainerFactoryCustomizersTests.java | 140 ++++++++++++++++++ .../PulsarReactiveAutoConfigurationTests.java | 13 +- .../reference/pages/messaging/pulsar.adoc | 6 +- 12 files changed, 264 insertions(+), 99 deletions(-) delete mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java delete mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java rename spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/{DefaultPulsarReaderContainerFactoryCustomizer.java => PulsarContainerFactoryCustomizer.java} (57%) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java deleted file mode 100644 index 8526c86fc856..000000000000 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ConcurrentPulsarListenerContainerFactoryCustomizer.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2012-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.autoconfigure.pulsar; - -import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; - -/** - * Callback interface that can be implemented to customize a - * {@link ConcurrentPulsarListenerContainerFactory}. - * - * @author Chris Bono - * @since 3.4.0 - */ -@FunctionalInterface -public interface ConcurrentPulsarListenerContainerFactoryCustomizer { - - /** - * Customize a {@link ConcurrentPulsarListenerContainerFactory}. - * @param containerFactory the factory to customize - */ - void customize(ConcurrentPulsarListenerContainerFactory containerFactory); - -} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java deleted file mode 100644 index 1ad8c6c1f99f..000000000000 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultReactivePulsarListenerContainerFactoryCustomizer.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2012-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.autoconfigure.pulsar; - -import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; - -/** - * Callback interface that can be implemented to customize a - * {@link DefaultReactivePulsarListenerContainerFactory}. - * - * @author Chris Bono - * @since 3.4.0 - */ -@FunctionalInterface -public interface DefaultReactivePulsarListenerContainerFactoryCustomizer { - - /** - * Customize a {@link DefaultReactivePulsarListenerContainerFactory}. - * @param containerFactory the factory to customize - */ - void customize(DefaultReactivePulsarListenerContainerFactory containerFactory); - -} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index ca045eadc069..d5cf598b2038 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -178,8 +178,7 @@ private void applyConsumerBuilderCustomizers(List> ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, SchemaResolver schemaResolver, TopicResolver topicResolver, ObjectProvider pulsarTransactionManager, - ObjectProvider customizersProvider, - Environment environment) { + PulsarContainerFactoryCustomizers containerFactoryCustomizers, Environment environment) { PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); @@ -190,7 +189,7 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( this.propertiesMapper.customizeContainerProperties(containerProperties); ConcurrentPulsarListenerContainerFactory containerFactory = new ConcurrentPulsarListenerContainerFactory<>( pulsarConsumerFactory, containerProperties); - customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory)); + containerFactoryCustomizers.customize(containerFactory); return containerFactory; } @@ -219,8 +218,7 @@ private void applyReaderBuilderCustomizers(List> cust @Bean @ConditionalOnMissingBean(name = "pulsarReaderContainerFactory") DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReaderFactory pulsarReaderFactory, - SchemaResolver schemaResolver, - ObjectProvider customizersProvider, + SchemaResolver schemaResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers, Environment environment) { PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); @@ -230,7 +228,7 @@ DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReader this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); DefaultPulsarReaderContainerFactory containerFactory = new DefaultPulsarReaderContainerFactory<>( pulsarReaderFactory, readerContainerProperties); - customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory)); + containerFactoryCustomizers.customize(containerFactory); return containerFactory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java index ea60717f7bdf..6716f86eb016 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java @@ -188,4 +188,11 @@ PulsarTopicBuilder pulsarTopicBuilder() { this.properties.getDefaults().getTopic().getNamespace()); } + @Bean + @ConditionalOnMissingBean + PulsarContainerFactoryCustomizers pulsarContainerFactoryCustomizers( + ObjectProvider> customizers) { + return new PulsarContainerFactoryCustomizers(customizers.orderedStream().toList()); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizer.java similarity index 57% rename from spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java rename to spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizer.java index 05197b36188e..17e3de79b45d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/DefaultPulsarReaderContainerFactoryCustomizer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizer.java @@ -16,22 +16,24 @@ package org.springframework.boot.autoconfigure.pulsar; -import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; +import org.springframework.pulsar.config.PulsarContainerFactory; /** - * Callback interface that can be implemented to customize a - * {@link DefaultPulsarReaderContainerFactory}. + * Callback interface that can be implemented by beans wishing to customize a + * {@link PulsarContainerFactory} before it is fully initialized, in particular to tune + * its configuration. * + * @param the type of the {@link PulsarContainerFactory} * @author Chris Bono * @since 3.4.0 */ @FunctionalInterface -public interface DefaultPulsarReaderContainerFactoryCustomizer { +public interface PulsarContainerFactoryCustomizer> { /** - * Customize a {@link DefaultPulsarReaderContainerFactory}. - * @param containerFactory the factory to customize + * Customize the container factory. + * @param containerFactory the {@code PulsarContainerFactory} to customize */ - void customize(DefaultPulsarReaderContainerFactory containerFactory); + void customize(T containerFactory); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java new file mode 100644 index 000000000000..82bd14889883 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.springframework.boot.util.LambdaSafe; +import org.springframework.pulsar.config.PulsarContainerFactory; +import org.springframework.pulsar.core.PulsarConsumerFactory; + +/** + * Invokes the available {@link PulsarContainerFactoryCustomizer} instances in the context + * for a given {@link PulsarConsumerFactory}. + * + * @author Chris Bono + * @since 3.4.0 + */ +public class PulsarContainerFactoryCustomizers { + + private final List> customizers; + + public PulsarContainerFactoryCustomizers(List> customizers) { + this.customizers = (customizers != null) ? new ArrayList<>(customizers) : Collections.emptyList(); + } + + /** + * Customize the specified {@link PulsarContainerFactory}. Locates all + * {@link PulsarContainerFactoryCustomizer} beans able to handle the specified + * instance and invoke {@link PulsarContainerFactoryCustomizer#customize} on them. + * @param the type of container factory + * @param containerFactory the container factory to customize + * @return the customized container factory + */ + @SuppressWarnings("unchecked") + public > T customize(T containerFactory) { + LambdaSafe.callbacks(PulsarContainerFactoryCustomizer.class, this.customizers, containerFactory) + .withLogger(PulsarContainerFactoryCustomizers.class) + .invoke((customizer) -> customizer.customize(containerFactory)); + return containerFactory; + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index ca750549680c..45d8d3037212 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -164,15 +164,14 @@ private void applyMessageConsumerBuilderCustomizers(List reactivePulsarListenerContainerFactory( ReactivePulsarConsumerFactory reactivePulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver, - ObjectProvider customizersProvider) { + TopicResolver topicResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers) { ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); this.propertiesMapper.customizeContainerProperties(containerProperties); DefaultReactivePulsarListenerContainerFactory containerFactory = new DefaultReactivePulsarListenerContainerFactory<>( reactivePulsarConsumerFactory, containerProperties); - customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory)); + containerFactoryCustomizers.customize(containerFactory); return containerFactory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 8730ab6f3e3d..6a467810fc79 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -73,6 +73,7 @@ import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.test.util.ReflectionTestUtils; @@ -596,15 +597,23 @@ void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { @TestConfiguration(proxyBeanMethods = false) static class ListenerContainerFactoryCustomizersConfig { + @Bean + @Order(50) + PulsarContainerFactoryCustomizer> customizerIgnored() { + return (__) -> { + throw new RuntimeException("should-not-have-matched"); + }; + } + @Bean @Order(200) - ConcurrentPulsarListenerContainerFactoryCustomizer customizerFoo() { + PulsarContainerFactoryCustomizer> customizerFoo() { return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); } @Bean @Order(100) - ConcurrentPulsarListenerContainerFactoryCustomizer customizerBar() { + PulsarContainerFactoryCustomizer> customizerBar() { return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); } @@ -712,15 +721,23 @@ ReaderBuilderCustomizer customizerBar() { @TestConfiguration(proxyBeanMethods = false) static class ReaderContainerFactoryCustomizersConfig { + @Bean + @Order(50) + PulsarContainerFactoryCustomizer> customizerIgnored() { + return (__) -> { + throw new RuntimeException("should-not-have-matched"); + }; + } + @Bean @Order(200) - DefaultPulsarReaderContainerFactoryCustomizer customizerFoo() { + PulsarContainerFactoryCustomizer> customizerFoo() { return (containerFactory) -> appendToReaderListener(containerFactory, ":foo"); } @Bean @Order(100) - DefaultPulsarReaderContainerFactoryCustomizer customizerBar() { + PulsarContainerFactoryCustomizer> customizerBar() { return (containerFactory) -> appendToReaderListener(containerFactory, ":bar"); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index c61777862e43..a0763714ca5c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -86,6 +86,15 @@ void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() { .isSameAs(customConnectionDetails)); } + @Test + void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() { + PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class); + this.contextRunner + .withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers) + .run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class) + .isSameAs(customizers)); + } + @Nested class ClientTests { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java new file mode 100644 index 000000000000..dfc290745a63 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java @@ -0,0 +1,140 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.mockito.BDDMockito; + +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; +import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; +import org.springframework.pulsar.config.ListenerContainerFactory; +import org.springframework.pulsar.config.PulsarContainerFactory; +import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link PulsarContainerFactoryCustomizers}. + * + * @author Chris Bono + */ +class PulsarContainerFactoryCustomizersTests { + + @Test + void customizeWithNullCustomizersShouldDoNothing() { + PulsarContainerFactory containerFactory = mock(PulsarContainerFactory.class); + new PulsarContainerFactoryCustomizers(null).customize(containerFactory); + BDDMockito.verifyNoInteractions(containerFactory); + } + + @SuppressWarnings("unchecked") + @Test + void customizeSimplePulsarContainerFactory() { + PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers( + Collections.singletonList(new SimplePulsarContainerFactoryCustomizer())); + PulsarContainerProperties containerProperties = new PulsarContainerProperties(); + ConcurrentPulsarListenerContainerFactory pulsarContainerFactory = new ConcurrentPulsarListenerContainerFactory<>( + mock(PulsarConsumerFactory.class), containerProperties); + customizers.customize(pulsarContainerFactory); + assertThat(pulsarContainerFactory.getContainerProperties().getSubscriptionName()).isEqualTo("my-subscription"); + } + + @Test + void customizeShouldCheckGeneric() { + List> list = new ArrayList<>(); + list.add(new TestCustomizer<>()); + list.add(new TestPulsarListenersContainerFactoryCustomizer()); + list.add(new TestConcurrentPulsarListenerContainerFactoryCustomizer()); + PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers(list); + + customizers.customize(mock(PulsarContainerFactory.class)); + assertThat(list.get(0).getCount()).isOne(); + assertThat(list.get(1).getCount()).isZero(); + assertThat(list.get(2).getCount()).isZero(); + + customizers.customize(mock(ConcurrentPulsarListenerContainerFactory.class)); + assertThat(list.get(0).getCount()).isEqualTo(2); + assertThat(list.get(1).getCount()).isOne(); + assertThat(list.get(2).getCount()).isOne(); + + customizers.customize(mock(DefaultReactivePulsarListenerContainerFactory.class)); + assertThat(list.get(0).getCount()).isEqualTo(3); + assertThat(list.get(1).getCount()).isEqualTo(2); + assertThat(list.get(2).getCount()).isOne(); + + customizers.customize(mock(DefaultPulsarReaderContainerFactory.class)); + assertThat(list.get(0).getCount()).isEqualTo(4); + assertThat(list.get(1).getCount()).isEqualTo(2); + assertThat(list.get(2).getCount()).isOne(); + } + + static class SimplePulsarContainerFactoryCustomizer + implements PulsarContainerFactoryCustomizer> { + + @Override + public void customize(ConcurrentPulsarListenerContainerFactory containerFactory) { + containerFactory.getContainerProperties().setSubscriptionName("my-subscription"); + } + + } + + /** + * Test customizer that will match all {@link PulsarListenerContainerFactory}. + */ + static class TestCustomizer> implements PulsarContainerFactoryCustomizer { + + private int count; + + @Override + public void customize(T pulsarContainerFactory) { + this.count++; + } + + int getCount() { + return this.count; + } + + } + + /** + * Test customizer that will match both + * {@link ConcurrentPulsarListenerContainerFactory} and + * {@link DefaultReactivePulsarListenerContainerFactory} as they both extend + * {@link ListenerContainerFactory}. + */ + static class TestPulsarListenersContainerFactoryCustomizer extends TestCustomizer> { + + } + + /** + * Test customizer that will match only + * {@link ConcurrentPulsarListenerContainerFactory}. + */ + static class TestConcurrentPulsarListenerContainerFactoryCustomizer + extends TestCustomizer> { + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index eafd91b891fc..9b2d58d0004e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -46,6 +46,7 @@ import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.core.annotation.Order; +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.PulsarAdministration; @@ -393,15 +394,23 @@ void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() { @TestConfiguration(proxyBeanMethods = false) static class ListenerContainerFactoryCustomizersConfig { + @Bean + @Order(50) + PulsarContainerFactoryCustomizer> customizerIgnored() { + return (__) -> { + throw new RuntimeException("should-not-have-matched"); + }; + } + @Bean @Order(200) - DefaultReactivePulsarListenerContainerFactoryCustomizer customizerFoo() { + PulsarContainerFactoryCustomizer> customizerFoo() { return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); } @Bean @Order(100) - DefaultReactivePulsarListenerContainerFactoryCustomizer customizerBar() { + PulsarContainerFactoryCustomizer> customizerBar() { return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); } diff --git a/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc b/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc index 843b6685d9d7..bc6a8c6e3837 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc @@ -154,7 +154,7 @@ If you need more control over the configuration of the consumer factory used by These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation. -If you need more control over the actual container factory configuration, consider registering one or more `ConcurrentPulsarListenerContainerFactoryCustomizer` beans. +If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. [[messaging.pulsar.receiving-reactive]] == Receiving a Message Reactively @@ -171,7 +171,7 @@ If you need more control over the configuration of the consumer factory used by These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation. -If you need more control over the actual container factory configuration, consider registering one or more `DefaultReactivePulsarListenerContainerFactoryCustomizer` beans. +If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. [[messaging.pulsar.reading]] == Reading a Message @@ -191,7 +191,7 @@ If you need more control over the configuration of the reader factory used by th These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances. You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation. -If you need more control over the actual container factory configuration, consider registering one or more `DefaultPulsarReaderContainerFactoryCustomizer` beans. +If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. [[messaging.pulsar.reading-reactive]]