Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool doesn't reconnect when DB server is restarted #68

Closed
michaelr524 opened this issue Apr 5, 2020 · 8 comments
Closed

Pool doesn't reconnect when DB server is restarted #68

michaelr524 opened this issue Apr 5, 2020 · 8 comments
Labels
type: bug A general bug
Milestone

Comments

@michaelr524
Copy link

michaelr524 commented Apr 5, 2020

I restart the DB (postgres) when the pool is running and from that moment I get the below error on any DB access query attempt.

  1. Is there a way to make it automatically reconnect?
  2. If the pool completely fails for any reason, I'd like to detect it and restart the pool or the application. What would be the correct way to do that?
io.r2dbc.spi.R2dbcNonTransientResourceException: Connection validation failed
	at io.r2dbc.pool.Validation.lambda$validate$2(Validation.java:44) ~[r2dbc-pool-0.8.1.RELEASE.jar:0.8.1.RELEASE]
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:163) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4105) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:373) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.pool.SimplePool.lambda$drainLoop$13(SimplePool.java:245) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.pool.SimplePool.drainLoop(SimplePool.java:245) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.pool.SimplePool.drain(SimplePool.java:172) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.pool.SimplePool.doAcquire(SimplePool.java:132) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:130) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:103) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:163) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) [reactor-pool-0.1.2.RELEASE.jar:0.1.2.RELEASE]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:110) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:49) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8160) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:418) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1871) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:252) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:138) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:206) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:262) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:138) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:223) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1638) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) [reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
	at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:419) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:209) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:367) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:412) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:572) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) [reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:316) [netty-codec-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) [netty-codec-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1470) [netty-handler-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1219) [netty-handler-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1266) [netty-handler-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:493) [netty-codec-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432) [netty-codec-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:271) [netty-codec-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.44.Final.jar:4.1.44.Final]
	at java.lang.Thread.run(Thread.java:835) [?:?]
20
@michaelr524
Copy link
Author

When I made a few more attempts, it looks like the Pool reinstated the lost connections and started functioning again.

Updated questions:

  1. Is there a way to cleanup these stale connections such that no requests to the DB fail when the DB is up?
  2. If the pool completely fails for any reason, I'd like to detect it and restart the pool or the application. What would be the correct way to do that?

@itzikiusa
Copy link

itzikiusa commented May 7, 2020

having same issue
for me it does not reconnect

Caused by: org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection; nested exception is java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 30000ms in 'peek' (and no fallback has been configured)
at org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:70) ~[spring-data-r2dbc-1.0.0.RELEASE.jar!/:1.0.0.RELEASE]
at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3270) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:88) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:114) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117) ~[reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.3.2.RELEASE.jar!/:3.3.2.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 30000ms in 'peek' (and no fallback has been configured)
... 14 common frames omitted```

@mp911de mp911de added the type: bug A general bug label May 7, 2020
@mp911de mp911de added this to the 0.8.3.RELEASE milestone May 7, 2020
@mp911de
Copy link
Member

mp911de commented May 7, 2020

Right now, connection validation is not guarded with a timeout. So if the network request hangs for some reason or the driver got stuck, then the blocker propagates all the way down. Adding a timeout to the validation should address the issue.

mp911de added a commit that referenced this issue May 7, 2020
We now apply the connection acquisition timeout to the validation to avoid hanging connections.

[#68]
mp911de added a commit that referenced this issue May 7, 2020
We now apply the connection acquisition timeout to the validation to avoid hanging connections.

[#68]
@mp911de
Copy link
Member

mp911de commented May 7, 2020

To @michaelr524's question: The pool cannot be cleared. That's a feature that needs to be provided by reactor-pool first.

The change is available in 0.8.3.BUILD-SNAPSHOT builds. Please test the change and report any findings so we can either fix the remaining issues or close this ticket.

@setevoy
Copy link

setevoy commented May 12, 2020

Seems like I have related issue.
I run following code against patroni-backed postgresql cluster behind haproxy.

           testRepository.save(new TestDta("Test "))
                .doOnNext(a-> { log.info("SAVED!"); })
                .log()
                .timeout(ofSeconds(5))
                .retryWhen(Retry.any()
                                .retryMax(1000)
                                .timeout(Duration.ofMinutes(2))
                                .fixedBackoff(ofSeconds(1))
                                .doOnRetry(a->log.info("Retry!")))
                .delayElement(ofSeconds(1))
                .repeat()
                .subscribe();

I kill leader postgres, patroni successfully propagates it to another node and I expect above code to reconnect after some delay.

When I use PostgresqlConnectionFactory without a pool, everything works fine. A new connection initiated on each attempt and after some delay, it successfully establishes and continues to save data.

But when I add r2dbc-pool

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
            .acquireRetry(Integer.MAX_VALUE)
            .maxAcquireTime(Duration.ofSeconds(2))
            .validationQuery("select 1")
            .maxSize(1)
           .build();

        return new ConnectionPool(configuration);

when pool loses postgresql it seems like it does not try to reconnect at all

2020-05-12 21:24:34.726  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : request(unbounded)
2020-05-12 21:24:34.802  INFO 3520 --- [actor-tcp-nio-1] sample.sample.DbTest                 : SAVED!
2020-05-12 21:24:34.802  INFO 3520 --- [actor-tcp-nio-1] reactor.Mono.Peek.1                      : onNext(sample.sample.TestDta@5be35f7)
2020-05-12 21:24:34.802  INFO 3520 --- [actor-tcp-nio-1] reactor.Mono.Peek.1                      : onComplete()
2020-05-12 21:24:35.803  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : onSubscribe(FluxPeek.PeekSubscriber)
2020-05-12 21:24:35.804  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : request(unbounded)
2020-05-12 21:24:35.877  INFO 3520 --- [actor-tcp-nio-1] sample.sample.DbTest                 : SAVED!
2020-05-12 21:24:35.877  INFO 3520 --- [actor-tcp-nio-1] reactor.Mono.Peek.1                      : onNext(sample.sample.TestDta@5be35f7)
2020-05-12 21:24:35.877  INFO 3520 --- [actor-tcp-nio-1] reactor.Mono.Peek.1                      : onComplete()
2020-05-12 21:24:35.910 DEBUG 3520 --- [actor-tcp-nio-1] reactor.netty.ReactorNetty               : [id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432] Non Removed handler: SSLSessionHandlerAdapter, context: null, pipeline: DefaultChannelPipeline{(SslHandler#0 = io.netty.handler.ssl.SslHandler), (EnsureSubscribersCompleteChannelHandler = io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler), (LengthFieldBasedFrameDecoder = io.netty.handler.codec.LengthFieldBasedFrameDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:35.910 DEBUG 3520 --- [actor-tcp-nio-1] reactor.netty.ReactorNetty               : [id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432] Non Removed handler: LengthFieldBasedFrameDecoder, context: ChannelHandlerContext(LengthFieldBasedFrameDecoder, [id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432]), pipeline: DefaultChannelPipeline{(SslHandler#0 = io.netty.handler.ssl.SslHandler), (EnsureSubscribersCompleteChannelHandler = io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler), (LengthFieldBasedFrameDecoder = io.netty.handler.codec.LengthFieldBasedFrameDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:35.910 DEBUG 3520 --- [actor-tcp-nio-1] reactor.netty.ReactorNetty               : [id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432] Non Removed handler: EnsureSubscribersCompleteChannelHandler, context: ChannelHandlerContext(EnsureSubscribersCompleteChannelHandler, [id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432]), pipeline: DefaultChannelPipeline{(SslHandler#0 = io.netty.handler.ssl.SslHandler), (EnsureSubscribersCompleteChannelHandler = io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler), (LengthFieldBasedFrameDecoder = io.netty.handler.codec.LengthFieldBasedFrameDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:35.914 DEBUG 3520 --- [actor-tcp-nio-1] r.netty.resources.NewConnectionProvider  : [id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432] onStateChange([disconnecting], ChannelOperations{SimpleConnection{channel=[id: 0xf1fc8119, L:/127.0.0.1:32289 ! R:/127.0.0.1:35432]}})
2020-05-12 21:24:36.879  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : onSubscribe(FluxPeek.PeekSubscriber)
2020-05-12 21:24:36.879  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : request(unbounded)
2020-05-12 21:24:36.890 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0xe6548e1e] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:36.891 DEBUG 3520 --- [actor-tcp-nio-2] r.netty.resources.NewConnectionProvider  : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] Connected new channel
2020-05-12 21:24:36.891 DEBUG 3520 --- [actor-tcp-nio-2] r.netty.resources.NewConnectionProvider  : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] onStateChange([connected], SimpleConnection{channel=[id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432]})
2020-05-12 21:24:36.891 DEBUG 3520 --- [actor-tcp-nio-2] r.netty.resources.NewConnectionProvider  : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] onStateChange([configured], ChannelOperations{SimpleConnection{channel=[id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432]}})
2020-05-12 21:24:37.165 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.ReactorNetty               : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] Added encoder [SSLSessionHandlerAdapter] at the beginning of the user pipeline, full pipeline: [SSLSessionHandlerAdapter, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2020-05-12 21:24:37.165 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.ReactorNetty               : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] Added decoder [LengthFieldBasedFrameDecoder] at the end of the user pipeline, full pipeline: [SSLSessionHandlerAdapter, LengthFieldBasedFrameDecoder, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2020-05-12 21:24:37.165 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.ReactorNetty               : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] Added encoder [EnsureSubscribersCompleteChannelHandler] at the beginning of the user pipeline, full pipeline: [EnsureSubscribersCompleteChannelHandler, SSLSessionHandlerAdapter, LengthFieldBasedFrameDecoder, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2020-05-12 21:24:37.165 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.channel.FluxReceive        : [id: 0xe6548e1e, L:/127.0.0.1:32360 - R:/127.0.0.1:35432] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
2020-05-12 21:24:41.880  INFO 3520 --- [     parallel-3] reactor.Mono.Peek.1                      : cancel()
2020-05-12 21:24:41.882  INFO 3520 --- [     parallel-3] sample.sample.DbTest                 : Retry!
2020-05-12 21:24:42.174 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.ReactorNetty               : [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432] Non Removed handler: SSLSessionHandlerAdapter, context: ChannelHandlerContext(SSLSessionHandlerAdapter, [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432]), pipeline: DefaultChannelPipeline{(EnsureSubscribersCompleteChannelHandler = io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler), (SSLSessionHandlerAdapter = io.r2dbc.postgresql.client.SSLSessionHandlerAdapter), (LengthFieldBasedFrameDecoder = io.netty.handler.codec.LengthFieldBasedFrameDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:42.175 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.ReactorNetty               : [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432] Non Removed handler: LengthFieldBasedFrameDecoder, context: ChannelHandlerContext(LengthFieldBasedFrameDecoder, [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432]), pipeline: DefaultChannelPipeline{(EnsureSubscribersCompleteChannelHandler = io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler), (SSLSessionHandlerAdapter = io.r2dbc.postgresql.client.SSLSessionHandlerAdapter), (LengthFieldBasedFrameDecoder = io.netty.handler.codec.LengthFieldBasedFrameDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:42.175 DEBUG 3520 --- [actor-tcp-nio-2] reactor.netty.ReactorNetty               : [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432] Non Removed handler: EnsureSubscribersCompleteChannelHandler, context: ChannelHandlerContext(EnsureSubscribersCompleteChannelHandler, [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432]), pipeline: DefaultChannelPipeline{(EnsureSubscribersCompleteChannelHandler = io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler), (SSLSessionHandlerAdapter = io.r2dbc.postgresql.client.SSLSessionHandlerAdapter), (LengthFieldBasedFrameDecoder = io.netty.handler.codec.LengthFieldBasedFrameDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2020-05-12 21:24:42.175 DEBUG 3520 --- [actor-tcp-nio-2] r.netty.resources.NewConnectionProvider  : [id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432] onStateChange([disconnecting], ChannelOperations{SimpleConnection{channel=[id: 0xe6548e1e, L:/127.0.0.1:32360 ! R:/127.0.0.1:35432]}})
2020-05-12 21:24:42.883  INFO 3520 --- [     parallel-1] reactor.Mono.Peek.1                      : onSubscribe(FluxPeek.PeekSubscriber)
2020-05-12 21:24:42.883  INFO 3520 --- [     parallel-1] reactor.Mono.Peek.1                      : request(unbounded)
2020-05-12 21:24:47.883  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : cancel()
2020-05-12 21:24:47.884  INFO 3520 --- [     parallel-2] sample.sample.DbTest                 : Retry!
2020-05-12 21:24:48.886  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : onSubscribe(FluxPeek.PeekSubscriber)
2020-05-12 21:24:48.886  INFO 3520 --- [     parallel-2] reactor.Mono.Peek.1                      : request(unbounded)
2020-05-12 21:24:53.886  INFO 3520 --- [     parallel-3] reactor.Mono.Peek.1                      : cancel()
2020-05-12 21:24:53.887  INFO 3520 --- [     parallel-3] sample.sample.DbTest                 : Retry!
2020-05-12 21:24:54.887  INFO 3520 --- [     parallel-3] reactor.Mono.Peek.1                      : onSubscribe(FluxPeek.PeekSubscriber)
2020-05-12 21:24:54.887  INFO 3520 --- [     parallel-3] reactor.Mono.Peek.1                      : request(unbounded)
2020-05-12 21:24:59.887  INFO 3520 --- [     parallel-4] reactor.Mono.Peek.1                      : cancel()
2020-05-12 21:24:59.887  INFO 3520 --- [     parallel-4] sample.sample.DbTest                 : Retry!

I independently checked with plsql that a new leader successfully goes online, but r2dbc-pool cannot establish new connection and retries continue.

I just tried 0.8.3.BUILD-SNAPSHOT but it did not help.

@mp911de
Copy link
Member

mp911de commented May 18, 2020

I successfully validated that the pool invalidates the connection with Toxiproxy and a remote server that either dropped connections or where the remote process was killed entirely. In both cases, the pool recovered (see the log output).

I'm closing this issue as solved. Feel free to submit a fully reproducible test case if the issue arises again.

@mp911de mp911de closed this as completed May 18, 2020
@AmitBRD
Copy link

AmitBRD commented Sep 29, 2020

I successfully validated that the pool invalidates the connection with Toxiproxy and a remote server that either dropped connections or where the remote process was killed entirely. In both cases, the pool recovered (see the log output).

I'm closing this issue as solved. Feel free to submit a fully reproducible test case if the issue arises again.

Hi @mp911de can you outline the configuration you used to provide autoreconnect? I am trying to implement reconnects with postgresql but the connection pool becomes stagnate after an error

@mp911de
Copy link
Member

mp911de commented Sep 29, 2020

We're happy if you can provide a reproducer (even one using mocked connections would be fine) so we can sort out any pool reliability issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

5 participants