Skip to content

Commit

Permalink
Fall back to connection details when configuring Rabbit Streams
Browse files Browse the repository at this point in the history
Closes gh-42489
  • Loading branch information
wilkinsona committed Oct 1, 2024
1 parent 05b4edf commit 95665a4
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,9 +65,9 @@ StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Enviro

@Bean(name = "rabbitStreamEnvironment")
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
Environment rabbitStreamEnvironment(RabbitProperties properties,
Environment rabbitStreamEnvironment(RabbitProperties properties, RabbitConnectionDetails connectionDetails,
ObjectProvider<EnvironmentBuilderCustomizer> customizers) {
EnvironmentBuilder builder = configure(Environment.builder(), properties);
EnvironmentBuilder builder = configure(Environment.builder(), properties, connectionDetails);
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return builder.build();
}
Expand Down Expand Up @@ -96,18 +96,29 @@ RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, R
return template;
}

static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties,
RabbitConnectionDetails connectionDetails) {
return configure(builder, properties.getStream(), connectionDetails);
}

private static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties.Stream stream,
RabbitConnectionDetails connectionDetails) {
builder.lazyInitialization(true);
RabbitProperties.Stream stream = properties.getStream();
PropertyMapper map = PropertyMapper.get();
map.from(stream.getHost()).to(builder::host);
map.from(stream.getPort()).to(builder::port);
map.from(stream.getVirtualHost())
.as(withFallback(properties::getVirtualHost))
.as(withFallback(connectionDetails::getVirtualHost))
.whenNonNull()
.to(builder::virtualHost);
map.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
map.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
map.from(stream.getUsername())
.as(withFallback(connectionDetails::getUsername))
.whenNonNull()
.to(builder::username);
map.from(stream.getPassword())
.as(withFallback(connectionDetails::getPassword))
.whenNonNull()
.to(builder::password);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.amqp;

import java.time.Duration;
import java.util.List;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Codec;
Expand Down Expand Up @@ -113,12 +114,14 @@ void whenCustomMessageListenerContainerFactoryIsDefinedThenAutoConfiguredContain
}

@Test
void environmentUsesPropertyDefaultsByDefault() {
void environmentUsesConnectionDetailsByDefault() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
RabbitStreamConfiguration.configure(builder, properties);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
then(builder).should().port(5552);
then(builder).should().host("localhost");
then(builder).should().virtualHost("vhost");
then(builder).should().lazyInitialization(true);
then(builder).should().username("guest");
then(builder).should().password("guest");
Expand All @@ -130,7 +133,8 @@ void whenStreamPortIsSetThenEnvironmentUsesCustomPort() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.getStream().setPort(5553);
RabbitStreamConfiguration.configure(builder, properties);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
then(builder).should().port(5553);
}

Expand All @@ -139,7 +143,8 @@ void whenStreamHostIsSetThenEnvironmentUsesCustomHost() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.getStream().setHost("stream.rabbit.example.com");
RabbitStreamConfiguration.configure(builder, properties);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
then(builder).should().host("stream.rabbit.example.com");
}

Expand All @@ -148,28 +153,31 @@ void whenStreamVirtualHostIsSetThenEnvironmentUsesCustomVirtualHost() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.getStream().setVirtualHost("stream-virtual-host");
RabbitStreamConfiguration.configure(builder, properties);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
then(builder).should().virtualHost("stream-virtual-host");
}

@Test
void whenStreamVirtualHostIsNotSetButDefaultVirtualHostIsSetThenEnvironmentUsesDefaultVirtualHost() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.setVirtualHost("default-virtual-host");
RabbitStreamConfiguration.configure(builder, properties);
properties.setVirtualHost("properties-virtual-host");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "default-virtual-host"));
then(builder).should().virtualHost("default-virtual-host");
}

@Test
void whenStreamCredentialsAreNotSetThenEnvironmentUsesRabbitCredentials() {
void whenStreamCredentialsAreNotSetThenEnvironmentUsesConnectionDetailsCredentials() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.setUsername("alice");
properties.setPassword("secret");
RabbitStreamConfiguration.configure(builder, properties);
then(builder).should().username("alice");
then(builder).should().password("secret");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("bob", "password", "vhost"));
then(builder).should().username("bob");
then(builder).should().password("password");
}

@Test
Expand All @@ -180,7 +188,8 @@ void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() {
properties.setPassword("secret");
properties.getStream().setUsername("bob");
properties.getStream().setPassword("confidential");
RabbitStreamConfiguration.configure(builder, properties);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("charlotte", "hidden", "vhost"));
then(builder).should().username("bob");
then(builder).should().password("confidential");
}
Expand Down Expand Up @@ -334,4 +343,40 @@ EnvironmentBuilderCustomizer customizerB() {

}

private static final class TestRabbitConnectionDetails implements RabbitConnectionDetails {

private final String username;

private final String password;

private final String virtualHost;

private TestRabbitConnectionDetails(String username, String password, String virtualHost) {
this.username = username;
this.password = password;
this.virtualHost = virtualHost;
}

@Override
public String getUsername() {
return this.username;
}

@Override
public String getPassword() {
return this.password;
}

@Override
public String getVirtualHost() {
return this.virtualHost;
}

@Override
public List<Address> getAddresses() {
throw new UnsupportedOperationException();
}

}

}

0 comments on commit 95665a4

Please sign in to comment.