Skip to content

Commit

Permalink
spring-projectsGH-8704: Add global property for defaultTimeout
Browse files Browse the repository at this point in the history
Fixes spring-projects#8704

The default timeout for requests and replies in the integration endpoints
is 30 seconds to avoid indefinite blocking in threads.
Sometime those 30 seconds is not enough.

* Introduce a `spring.integration.endpoints.defaultTimeout` global property
to allow overriding all the timeouts to desired value.
The negative number indicates an indefinite waiting time: similar to what
was there before introducing 30 seconds by default
  • Loading branch information
artembilan committed Aug 17, 2023
1 parent ef5db30 commit 44a997a
Show file tree
Hide file tree
Showing 23 changed files with 144 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,7 @@

import java.lang.reflect.Field;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.springframework.amqp.core.Address;
Expand All @@ -40,11 +39,11 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.isNull;

/**
Expand All @@ -55,8 +54,7 @@
*
* @since 2.1
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class AmqpInboundGatewayParserTests {

Expand All @@ -66,16 +64,16 @@ public class AmqpInboundGatewayParserTests {
@Test
public void customMessageConverter() {
Object gateway = context.getBean("gateway");
MessageConverter gatewayConverter = TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class);
MessageConverter templateConverter = TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class);
MessageConverter gatewayConverter =
TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class);
MessageConverter templateConverter =
TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class);
TestConverter testConverter = context.getBean("testConverter", TestConverter.class);
assertThat(gatewayConverter).isSameAs(testConverter);
assertThat(templateConverter).isSameAs(testConverter);
assertThat(TestUtils.getPropertyValue(gateway, "autoStartup")).isEqualTo(Boolean.TRUE);
assertThat(TestUtils.getPropertyValue(gateway, "phase")).isEqualTo(0);
assertThat(TestUtils.getPropertyValue(gateway, "replyTimeout", Long.class)).isEqualTo(Long.valueOf(1234L));
assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout", Long.class))
.isEqualTo(Long.valueOf(1234L));
assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout")).isEqualTo(1234L);
assertThat(TestUtils.getPropertyValue(gateway, "messageListenerContainer.missingQueuesFatal", Boolean.class))
.isTrue();
}
Expand Down Expand Up @@ -145,14 +143,12 @@ public void verifyUsageWithHeaderMapper() throws Exception {

@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
try {
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml",
this.getClass()).close();
}
catch (BeanDefinitionParsingException e) {
assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue();
}
assertThatExceptionOfType(BeanDefinitionParsingException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml",
getClass()))
.withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'");
}

private static class TestConverter extends SimpleMessageConverter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,7 @@
* <li> {@code spring.integration.endpoints.noAutoStartup=}
* <li> {@code spring.integration.channels.error.requireSubscribers=true}
* <li> {@code spring.integration.channels.error.ignoreFailures=true}
* <li> {@code spring.integration.endpoints.defaultTimeout=30000}
* </ul>
*
* @author Artem Bilan
Expand Down Expand Up @@ -112,6 +113,12 @@ public final class IntegrationProperties {
*/
public static final String ENDPOINTS_NO_AUTO_STARTUP = INTEGRATION_PROPERTIES_PREFIX + "endpoints.noAutoStartup";

/**
* Specifies the default timeout for blocking operations like send and receive messages.
* @since 6.2
*/
public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout";

private static final Properties DEFAULTS;

private boolean channelsAutoCreate = true;
Expand All @@ -132,6 +139,8 @@ public final class IntegrationProperties {

private String[] noAutoStartupEndpoints = {};

private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;

private volatile Properties properties;

static {
Expand Down Expand Up @@ -293,6 +302,23 @@ public String[] getNoAutoStartupEndpoints() {
return Arrays.copyOf(this.noAutoStartupEndpoints, this.noAutoStartupEndpoints.length);
}

/**
* Return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @since 6.2
*/
public long getEndpointsDefaultTimeout() {
return this.endpointsDefaultTimeout;
}

/**
* Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
*/
public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) {
this.endpointsDefaultTimeout = endpointsDefaultTimeout;
}

/**
* Represent the current instance as a {@link Properties}.
* @return the {@link Properties} representation.
Expand All @@ -312,6 +338,7 @@ public Properties toProperties() {
props.setProperty(READ_ONLY_HEADERS, StringUtils.arrayToCommaDelimitedString(this.readOnlyHeaders));
props.setProperty(ENDPOINTS_NO_AUTO_STARTUP,
StringUtils.arrayToCommaDelimitedString(this.noAutoStartupEndpoints));
props.setProperty(ENDPOINTS_DEFAULT_TIMEOUT, "" + this.endpointsDefaultTimeout);

this.properties = props;
}
Expand Down Expand Up @@ -348,7 +375,9 @@ public static IntegrationProperties parse(Properties properties) {
StringUtils.commaDelimitedListToStringArray(value)))
.acceptIfHasText(properties.getProperty(ENDPOINTS_NO_AUTO_STARTUP),
(value) -> integrationProperties.setNoAutoStartupEndpoints(
StringUtils.commaDelimitedListToStringArray(value)));
StringUtils.commaDelimitedListToStringArray(value)))
.acceptIfHasText(properties.getProperty(ENDPOINTS_DEFAULT_TIMEOUT),
(value) -> integrationProperties.setEndpointsDefaultTimeout(Long.parseLong(value)));
return integrationProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
Expand Down Expand Up @@ -124,7 +123,9 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint

private String errorChannelName;

private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
private boolean requestTimeoutSet;

private boolean replyTimeoutSet;

private InboundMessageMapper<Object> requestMapper = new DefaultRequestMapper();

Expand Down Expand Up @@ -167,8 +168,6 @@ public MessagingGatewaySupport() {
public MessagingGatewaySupport(boolean errorOnTimeout) {
ConvertingMessagingTemplate template = new ConvertingMessagingTemplate();
template.setMessageConverter(this.messageConverter);
template.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
template.setReceiveTimeout(this.replyTimeout);
this.messagingTemplate = template;
this.errorOnTimeout = errorOnTimeout;
}
Expand Down Expand Up @@ -252,6 +251,7 @@ public void setErrorChannelName(String errorChannelName) {
*/
public void setRequestTimeout(long requestTimeout) {
this.messagingTemplate.setSendTimeout(requestTimeout);
this.requestTimeoutSet = true;
}

/**
Expand All @@ -260,8 +260,8 @@ public void setRequestTimeout(long requestTimeout) {
* @param replyTimeout the timeout value in milliseconds
*/
public void setReplyTimeout(long replyTimeout) {
this.replyTimeout = replyTimeout;
this.messagingTemplate.setReceiveTimeout(replyTimeout);
this.replyTimeoutSet = true;
}

/**
Expand Down Expand Up @@ -406,6 +406,13 @@ protected void onInit() {
}
this.messageConverter.setBeanFactory(beanFactory);
}
long endpointsDefaultTimeout = getIntegrationProperties().getEndpointsDefaultTimeout();
if (!this.requestTimeoutSet) {
this.messagingTemplate.setSendTimeout(endpointsDefaultTimeout);
}
if (!this.replyTimeoutSet) {
this.messagingTemplate.setReceiveTimeout(endpointsDefaultTimeout);
}
this.initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,15 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan

private boolean noHeadersPropagation;

{
this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
}
private boolean sendTimeoutSet;

/**
* Set the timeout for sending reply Messages.
* @param sendTimeout The send timeout.
*/
public void setSendTimeout(long sendTimeout) {
this.messagingTemplate.setSendTimeout(sendTimeout);
this.sendTimeoutSet = true;
}

@Override
Expand Down Expand Up @@ -189,7 +188,7 @@ protected final void updateNotPropagatedHeaders(String[] headers, boolean merge)
@Override
public Collection<String> getNotPropagatedHeaders() {
return this.notPropagatedHeaders != null
? Collections.unmodifiableSet(new HashSet<>(Arrays.asList(this.notPropagatedHeaders)))
? Set.of(this.notPropagatedHeaders)
: Collections.emptyList();
}

Expand Down Expand Up @@ -217,6 +216,9 @@ protected void onInit() {
}
this.messagingTemplate.setDestinationResolver(getChannelResolver());
setAsyncIfCan();
if (!this.sendTimeoutSet) {
this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout());
}
}

private void setAsyncIfCan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.management.IntegrationManagedResource;
Expand Down Expand Up @@ -63,9 +62,7 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler imple

private volatile boolean applySequence;

{
this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
}
private boolean sendTimeoutSet;

/**
* Set the default channel where Messages should be sent if channel resolution
Expand Down Expand Up @@ -115,10 +112,11 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) {
*/
public void setSendTimeout(long timeout) {
this.messagingTemplate.setSendTimeout(timeout);
this.sendTimeoutSet = true;
}

/**
* Specify whether send failures for one or more of the recipients should be ignored. By default this is
* Specify whether send failures for one or more of the recipients should be ignored. By default, this is
* <code>false</code> meaning that an Exception will be thrown whenever a send fails. To override this and suppress
* Exceptions, set the value to <code>true</code>.
* @param ignoreSendFailures true to ignore send failures.
Expand Down Expand Up @@ -174,6 +172,10 @@ protected void onInit() {
if (beanFactory != null) {
this.messagingTemplate.setBeanFactory(beanFactory);
}

if (!this.sendTimeoutSet) {
this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler i

private String errorChannelName = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;

private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
private Long gatherTimeout;

private AbstractEndpoint gatherEndpoint;

Expand Down Expand Up @@ -119,6 +119,10 @@ public IntegrationPatternType getIntegrationPatternType() {

@Override
protected void doInit() {
if (this.gatherTimeout == null) {
this.gatherTimeout = getIntegrationProperties().getEndpointsDefaultTimeout();
}

BeanFactory beanFactory = getBeanFactory();
if (this.gatherChannel == null) {
this.gatherChannel =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false
# Defaults to MessageHeaders.ID and MessageHeaders.TIMESTAMP
spring.integration.readOnly.headers=
spring.integration.endpoints.noAutoStartup=
spring.integration.endpoints.defaultTimeout=30000
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ public void initializeSubject() {
outputChannel = mock(MessageChannel.class);
handler = new AggregatingMessageHandler(processor, store, correlationStrategy, ReleaseStrategy);
handler.setOutputChannel(outputChannel);
handler.setBeanFactory(mock());
handler.afterPropertiesSet();
}


@Test
public void bufferCompletesNormally() {
String correlationKey = "key";
Expand All @@ -95,7 +96,7 @@ public void bufferCompletesNormally() {
}

@Test
public void bufferCompletesWithException() throws Exception {
public void bufferCompletesWithException() {

doAnswer(new ThrowsException(new RuntimeException("Planned test exception")))
.when(processor).processMessageGroup(isA(SimpleMessageGroup.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void testDefaultResequencerProperties() {
ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler",
ResequencingMessageHandler.class);
assertThat(getPropertyValue(resequencer, "outputChannel")).isNull();
assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(30000L);
assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(45000L);
assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry"))
.as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on " +
"timeout'" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testAnnotationWithDefaultSettings() {
assertThat(getPropertyValue(aggregator, "releaseStrategy") instanceof SimpleSequenceSizeReleaseStrategy)
.isTrue();
assertThat(getPropertyValue(aggregator, "outputChannel")).isNull();
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(30000L);
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(45000L);
assertThat(getPropertyValue(aggregator, "sendPartialResultOnExpiry")).isEqualTo(false);
context.close();
}
Expand All @@ -72,7 +72,7 @@ public void testAnnotationWithCustomSettings() {
}

@Test
public void testAnnotationWithCustomReleaseStrategy() throws Exception {
public void testAnnotationWithCustomReleaseStrategy() {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"});
final String endpointName = "endpointWithDefaultAnnotationAndCustomReleaseStrategy";
Expand All @@ -90,7 +90,7 @@ public void testAnnotationWithCustomReleaseStrategy() throws Exception {
}

@Test
public void testAnnotationWithCustomCorrelationStrategy() throws Exception {
public void testAnnotationWithCustomCorrelationStrategy() {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"});
final String endpointName = "endpointWithCorrelationStrategy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class HeaderEnricherParserTests {
void sendTimeoutDefault() {
Object endpoint = context.getBean("headerEnricherWithDefaults");
long sendTimeout = TestUtils.getPropertyValue(endpoint, "handler.messagingTemplate.sendTimeout", Long.class);
assertThat(sendTimeout).isEqualTo(30000L);
assertThat(sendTimeout).isEqualTo(45000L);
}

@Test
Expand Down
Loading

0 comments on commit 44a997a

Please sign in to comment.