Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #612 API for configuring pooled connection idle time #792

Merged
merged 1 commit into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions src/main/java/reactor/netty/resources/ConnectionProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.netty.resources;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

import io.netty.bootstrap.Bootstrap;
Expand All @@ -27,6 +28,8 @@
import reactor.pool.PoolBuilder;
import reactor.util.annotation.NonNull;

import javax.annotation.Nullable;

/**
* A {@link ConnectionProvider} will produce {@link Connection}
*
Expand Down Expand Up @@ -78,11 +81,30 @@ static ConnectionProvider newConnection() {
* {@link Connection}
*/
static ConnectionProvider elastic(String name) {
return elastic(name, null);
}

/**
* Create a {@link ConnectionProvider} to cache and grow on demand {@link Connection}.
* <p>An elastic {@link ConnectionProvider} will never wait before opening a new
* connection. The reuse window is limited but it cannot starve an undetermined volume
* of clients using it.
*
* @param name the channel pool map name
* @param maxIdleTime the {@link Duration} after which the channel will be closed (resolution: ms),
* if {@code NULL} there is no max idle time
*
* @return a new {@link ConnectionProvider} to cache and grow on demand
* {@link Connection}
*/
static ConnectionProvider elastic(String name, @Nullable Duration maxIdleTime) {
return new PooledConnectionProvider(name,
(allocator, destroyHandler, evictionPredicate) ->
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
.evictionPredicate(evictionPredicate)
.evictionPredicate(evictionPredicate
.or((poolable, meta) -> maxIdleTime != null &&
meta.idleTime() >= maxIdleTime.toMillis()))
.fifo());
}

Expand Down Expand Up @@ -134,6 +156,26 @@ static ConnectionProvider fixed(String name, int maxConnections) {
* number of {@link Connection}
*/
static ConnectionProvider fixed(String name, int maxConnections, long acquireTimeout) {
return fixed(name, maxConnections, acquireTimeout, null);
}

/**
* Create a new {@link ConnectionProvider} to cache and reuse a fixed maximum
* number of {@link Connection}.
* <p>A Fixed {@link ConnectionProvider} will open up to the given max connection value.
* Further connections will be pending acquisition indefinitely.
*
* @param name the connection pool name
* @param maxConnections the maximum number of connections before starting pending
* @param acquireTimeout the maximum time in millis after which a pending acquire
* must complete or the {@link TimeoutException} will be thrown.
* @param maxIdleTime the {@link Duration} after which the channel will be closed (resolution: ms),
* if {@code NULL} there is no max idle time
*
* @return a new {@link ConnectionProvider} to cache and reuse a fixed maximum
* number of {@link Connection}
*/
static ConnectionProvider fixed(String name, int maxConnections, long acquireTimeout, @Nullable Duration maxIdleTime) {
if (maxConnections == -1) {
return elastic(name);
}
Expand All @@ -149,7 +191,9 @@ static ConnectionProvider fixed(String name, int maxConnections, long acquireTim
.sizeMax(maxConnections)
.maxPendingAcquireUnbounded()
.destroyHandler(destroyHandler)
.evictionPredicate(evictionPredicate)
.evictionPredicate(evictionPredicate
.or((poolable, meta) -> maxIdleTime != null &&
meta.idleTime() >= maxIdleTime.toMillis()))
.fifo(),
acquireTimeout,
maxConnections);
Expand Down
56 changes: 55 additions & 1 deletion src/test/java/reactor/netty/http/client/HttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand Down Expand Up @@ -81,7 +82,6 @@
import reactor.netty.ByteBufMono;
import reactor.netty.DisposableServer;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
import reactor.netty.SocketUtils;
import reactor.netty.channel.AbortedException;
import reactor.netty.http.server.HttpServer;
Expand Down Expand Up @@ -1828,4 +1828,58 @@ private void doTestIssue777_2(HttpClient client, String uri, String expectation,
.expectErrorMessage(expectation)
.verify(Duration.ofSeconds(30));
}

@Test
public void testConnectionIdleTimeFixedPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.fixed("test", 1, 100, Duration.ofMillis(10));
ChannelId[] ids = doTestConnectionIdleTime(provider);
assertThat(ids[0]).isNotEqualTo(ids[1]);
}

@Test
public void testConnectionIdleTimeElasticPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.elastic("test", Duration.ofMillis(10));
ChannelId[] ids = doTestConnectionIdleTime(provider);
assertThat(ids[0]).isNotEqualTo(ids[1]);
}

@Test
public void testConnectionNoIdleTimeFixedPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.fixed("test", 1, 100);
ChannelId[] ids = doTestConnectionIdleTime(provider);
assertThat(ids[0]).isEqualTo(ids[1]);
}

@Test
public void testConnectionNoIdleTimeElasticPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.elastic("test");
ChannelId[] ids = doTestConnectionIdleTime(provider);
assertThat(ids[0]).isEqualTo(ids[1]);
}

private ChannelId[] doTestConnectionIdleTime(ConnectionProvider provider) throws Exception {
DisposableServer server =
HttpServer.create()
.port(0)
.wiretap(true)
.handle((req, res) -> res.sendString(Mono.just("hello")))
.bindNow();

Flux<ChannelId> id = createHttpClientForContextWithAddress(server, provider)
.get()
.uri("/")
.responseConnection((res, conn) -> Mono.just(conn.channel().id())
.delayUntil(ch -> conn.inbound().receive()));

ChannelId id1 = id.blockLast(Duration.ofSeconds(30));
Thread.sleep(30);
ChannelId id2 = id.blockLast(Duration.ofSeconds(30));

assertThat(id1).isNotNull();
assertThat(id2).isNotNull();

server.disposeNow();
provider.dispose();
return new ChannelId[] {id1, id2};
}
}