Skip to content

Commit

Permalink
Add property for Integration's default endpoint timeout
Browse files Browse the repository at this point in the history
Fixes gh-41477
  • Loading branch information
wilkinsona committed Jul 16, 2024
1 parent 17d6f90 commit 1cc7498
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,6 +105,9 @@ public static org.springframework.integration.context.IntegrationProperties inte
map.from(properties.getError().isIgnoreFailures()).to(integrationProperties::setErrorChannelIgnoreFailures);
map.from(properties.getEndpoint().isThrowExceptionOnLateReply())
.to(integrationProperties::setMessagingTemplateThrowExceptionOnLateReply);
map.from(properties.getEndpoint().getDefaultTimeout())
.as(Duration::toMillis)
.to(integrationProperties::setEndpointsDefaultTimeout);
map.from(properties.getEndpoint().getReadOnlyHeaders())
.as(StringUtils::toStringArray)
.to(integrationProperties::setReadOnlyHeaders);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,6 +141,11 @@ public static class Endpoint {
*/
private List<String> noAutoStartup = new ArrayList<>();

/**
* Default timeout for blocking operations such as sending or receiving messages.
*/
private Duration defaultTimeout = Duration.ofSeconds(30);

public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
Expand All @@ -165,6 +170,14 @@ public void setNoAutoStartup(List<String> noAutoStartup) {
this.noAutoStartup = noAutoStartup;
}

public Duration getDefaultTimeout() {
return this.defaultTimeout;
}

public void setDefaultTimeout(Duration defaultTimeout) {
this.defaultTimeout = defaultTimeout;
}

}

public static class Error {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,6 +85,7 @@ private static final class IntegrationPropertiesPropertySource extends PropertyS
IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS);
mappings.put(PREFIX + "error.require-subscribers", IntegrationProperties.ERROR_CHANNEL_REQUIRE_SUBSCRIBERS);
mappings.put(PREFIX + "error.ignore-failures", IntegrationProperties.ERROR_CHANNEL_IGNORE_FAILURES);
mappings.put(PREFIX + "endpoint.default-timeout", IntegrationProperties.ENDPOINTS_DEFAULT_TIMEOUT);
mappings.put(PREFIX + "endpoint.throw-exception-on-late-reply",
IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY);
mappings.put(PREFIX + "endpoint.read-only-headers", IntegrationProperties.READ_ONLY_HEADERS);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,13 @@

package org.springframework.boot.autoconfigure.integration;

import java.beans.PropertyDescriptor;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import javax.management.MBeanServer;
import javax.sql.DataSource;
Expand All @@ -32,6 +35,7 @@
import reactor.core.publisher.Mono;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration.IntegrationComponentScanConfiguration;
Expand Down Expand Up @@ -82,6 +86,7 @@
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -295,55 +300,63 @@ void taskSchedulerCanBeCustomized() {

@Test
void integrationGlobalPropertiesAutoConfigured() {
this.contextRunner.withPropertyValues("spring.integration.channel.auto-create=false",
String[] propertyValues = { "spring.integration.channel.auto-create=false",
"spring.integration.channel.max-unicast-subscribers=2",
"spring.integration.channel.max-broadcast-subscribers=3",
"spring.integration.error.require-subscribers=false", "spring.integration.error.ignore-failures=false",
"spring.integration.endpoint.defaultTimeout=60s",
"spring.integration.endpoint.throw-exception-on-late-reply=true",
"spring.integration.endpoint.read-only-headers=ignoredHeader",
"spring.integration.endpoint.no-auto-startup=notStartedEndpoint,_org.springframework.integration.errorLogger")
.run((context) -> {
assertThat(context).hasSingleBean(org.springframework.integration.context.IntegrationProperties.class);
org.springframework.integration.context.IntegrationProperties integrationProperties = context
.getBean(org.springframework.integration.context.IntegrationProperties.class);
assertThat(integrationProperties.isChannelsAutoCreate()).isFalse();
assertThat(integrationProperties.getChannelsMaxUnicastSubscribers()).isEqualTo(2);
assertThat(integrationProperties.getChannelsMaxBroadcastSubscribers()).isEqualTo(3);
assertThat(integrationProperties.isErrorChannelRequireSubscribers()).isFalse();
assertThat(integrationProperties.isErrorChannelIgnoreFailures()).isFalse();
assertThat(integrationProperties.isMessagingTemplateThrowExceptionOnLateReply()).isTrue();
assertThat(integrationProperties.getReadOnlyHeaders()).containsOnly("ignoredHeader");
assertThat(integrationProperties.getNoAutoStartupEndpoints()).containsOnly("notStartedEndpoint",
"_org.springframework.integration.errorLogger");
});
"spring.integration.endpoint.no-auto-startup=notStartedEndpoint,_org.springframework.integration.errorLogger" };
assertThat(propertyValues).hasSameSizeAs(globalIntegrationPropertyNames());
this.contextRunner.withPropertyValues(propertyValues).run((context) -> {
assertThat(context).hasSingleBean(org.springframework.integration.context.IntegrationProperties.class);
org.springframework.integration.context.IntegrationProperties integrationProperties = context
.getBean(org.springframework.integration.context.IntegrationProperties.class);
assertThat(integrationProperties.isChannelsAutoCreate()).isFalse();
assertThat(integrationProperties.getChannelsMaxUnicastSubscribers()).isEqualTo(2);
assertThat(integrationProperties.getChannelsMaxBroadcastSubscribers()).isEqualTo(3);
assertThat(integrationProperties.isErrorChannelRequireSubscribers()).isFalse();
assertThat(integrationProperties.isErrorChannelIgnoreFailures()).isFalse();
assertThat(integrationProperties.getEndpointsDefaultTimeout()).isEqualTo(60000);
assertThat(integrationProperties.isMessagingTemplateThrowExceptionOnLateReply()).isTrue();
assertThat(integrationProperties.getReadOnlyHeaders()).containsOnly("ignoredHeader");
assertThat(integrationProperties.getNoAutoStartupEndpoints()).containsOnly("notStartedEndpoint",
"_org.springframework.integration.errorLogger");
});
}

@Test
void integrationGlobalPropertiesUseConsistentDefault() {
List<PropertyAccessor> properties = List
.of("isChannelsAutoCreate", "getChannelsMaxUnicastSubscribers", "getChannelsMaxBroadcastSubscribers",
"isErrorChannelRequireSubscribers", "isErrorChannelIgnoreFailures", "getEndpointsDefaultTimeout",
"isMessagingTemplateThrowExceptionOnLateReply", "getReadOnlyHeaders", "getNoAutoStartupEndpoints")
.stream()
.map(PropertyAccessor::new)
.toList();
assertThat(properties).hasSameSizeAs(globalIntegrationPropertyNames());
org.springframework.integration.context.IntegrationProperties defaultIntegrationProperties = new org.springframework.integration.context.IntegrationProperties();
this.contextRunner.run((context) -> {
assertThat(context).hasSingleBean(org.springframework.integration.context.IntegrationProperties.class);
org.springframework.integration.context.IntegrationProperties integrationProperties = context
.getBean(org.springframework.integration.context.IntegrationProperties.class);
assertThat(integrationProperties.isChannelsAutoCreate())
.isEqualTo(defaultIntegrationProperties.isChannelsAutoCreate());
assertThat(integrationProperties.getChannelsMaxUnicastSubscribers())
.isEqualTo(defaultIntegrationProperties.getChannelsMaxBroadcastSubscribers());
assertThat(integrationProperties.getChannelsMaxBroadcastSubscribers())
.isEqualTo(defaultIntegrationProperties.getChannelsMaxBroadcastSubscribers());
assertThat(integrationProperties.isErrorChannelRequireSubscribers())
.isEqualTo(defaultIntegrationProperties.isErrorChannelIgnoreFailures());
assertThat(integrationProperties.isErrorChannelIgnoreFailures())
.isEqualTo(defaultIntegrationProperties.isErrorChannelIgnoreFailures());
assertThat(integrationProperties.isMessagingTemplateThrowExceptionOnLateReply())
.isEqualTo(defaultIntegrationProperties.isMessagingTemplateThrowExceptionOnLateReply());
assertThat(integrationProperties.getReadOnlyHeaders())
.isEqualTo(defaultIntegrationProperties.getReadOnlyHeaders());
assertThat(integrationProperties.getNoAutoStartupEndpoints())
.isEqualTo(defaultIntegrationProperties.getNoAutoStartupEndpoints());
properties.forEach((property) -> assertThat(property.get(integrationProperties))
.isEqualTo(property.get(defaultIntegrationProperties)));
});
}

private List<String> globalIntegrationPropertyNames() {
return Stream
.of(PropertyAccessorFactory
.forBeanPropertyAccess(new org.springframework.integration.context.IntegrationProperties())
.getPropertyDescriptors())
.map(PropertyDescriptor::getName)
.filter((name) -> !"class".equals(name))
.filter((name) -> !"taskSchedulerPoolSize".equals(name))
.toList();
}

@Test
void integrationGlobalPropertiesUserBeanOverridesAutoConfiguration() {
org.springframework.integration.context.IntegrationProperties userIntegrationProperties = new org.springframework.integration.context.IntegrationProperties();
Expand Down Expand Up @@ -604,4 +617,23 @@ MessageHandler handler(BlockingQueue<Message<?>> sink) {

}

static class PropertyAccessor {

private final String name;

PropertyAccessor(String name) {
this.name = name;
}

Object get(org.springframework.integration.context.IntegrationProperties properties) {
return ReflectionTestUtils.invokeMethod(properties, this.name);
}

@Override
public String toString() {
return this.name;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,23 @@
package org.springframework.boot.autoconfigure.integration;

import java.io.FileNotFoundException;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import io.lettuce.core.dynamic.support.ReflectionUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.properties.bind.BindResult;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.origin.Origin;
import org.springframework.boot.origin.OriginLookup;
import org.springframework.boot.origin.TextResourceOrigin;
Expand All @@ -32,6 +43,10 @@
import org.springframework.core.env.StandardEnvironment;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ClassUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
Expand Down Expand Up @@ -114,6 +129,56 @@ void registerIntegrationPropertiesPropertySourceWithResourceCanRetrieveOrigin()
.satisfies(textOrigin(resource, 2, 52));
}

@Test
@SuppressWarnings("unchecked")
void hasMappingsForAllMappableProperties() throws Exception {
Class<?> propertySource = ClassUtils.forName("%s.IntegrationPropertiesPropertySource"
.formatted(IntegrationPropertiesEnvironmentPostProcessor.class.getName()), getClass().getClassLoader());
Map<String, String> mappings = (Map<String, String>) ReflectionTestUtils.getField(propertySource,
"KEYS_MAPPING");
assertThat(mappings.values()).containsExactlyInAnyOrderElementsOf(integrationPropertyNames());
}

private static List<String> integrationPropertyNames() {
List<String> propertiesToMap = new ArrayList<>();
ReflectionUtils.doWithFields(IntegrationProperties.class, (field) -> {
String value = (String) ReflectionUtils.getField(field, null);
if (value.startsWith(IntegrationProperties.INTEGRATION_PROPERTIES_PREFIX)
&& value.length() > IntegrationProperties.INTEGRATION_PROPERTIES_PREFIX.length()) {
propertiesToMap.add(value);
}
}, (field) -> Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class));
propertiesToMap.remove(IntegrationProperties.TASK_SCHEDULER_POOL_SIZE);
return propertiesToMap;
}

@MethodSource("mappedConfigurationProperties")
@ParameterizedTest
void mappedPropertiesExistOnBootsIntegrationProperties(String mapping) {
Bindable<org.springframework.boot.autoconfigure.integration.IntegrationProperties> bindable = Bindable
.of(org.springframework.boot.autoconfigure.integration.IntegrationProperties.class);
MockEnvironment environment = new MockEnvironment().withProperty(mapping,
(mapping.contains("max") || mapping.contains("timeout")) ? "1" : "true");
BindResult<org.springframework.boot.autoconfigure.integration.IntegrationProperties> result = Binder
.get(environment)
.bind("spring.integration", bindable);
assertThat(result.isBound()).isTrue();
}

@SuppressWarnings("unchecked")
private static Collection<String> mappedConfigurationProperties() {
try {
Class<?> propertySource = ClassUtils.forName("%s.IntegrationPropertiesPropertySource"
.formatted(IntegrationPropertiesEnvironmentPostProcessor.class.getName()), null);
Map<String, String> mappings = (Map<String, String>) ReflectionTestUtils.getField(propertySource,
"KEYS_MAPPING");
return mappings.keySet();
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}

private Consumer<Origin> textOrigin(Resource resource, int line, int column) {
return (origin) -> {
assertThat(origin).isInstanceOf(TextResourceOrigin.class);
Expand Down

0 comments on commit 1cc7498

Please sign in to comment.