Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume related Netty ByteBuf leak #779

Closed
piotr-signalfx opened this issue Apr 14, 2020 · 9 comments · Fixed by #934
Closed

Resume related Netty ByteBuf leak #779

piotr-signalfx opened this issue Apr 14, 2020 · 9 comments · Fixed by #934
Assignees
Labels
Milestone

Comments

@piotr-signalfx
Copy link

We observe the following leak reported by Netty from time to time:

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AdvancedLeakAwareByteBuf.getShort(AdvancedLeakAwareByteBuf.java:166)
	io.rsocket.buffer.AbstractTupleByteBuf._getShort(AbstractTupleByteBuf.java:85)
	io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:751)
	io.rsocket.frame.FrameHeaderFlyweight.nativeFrameType(FrameHeaderFlyweight.java:89)
	io.rsocket.resume.ResumableDuplexConnection.isResumableFrame(ResumableDuplexConnection.java:366)
	io.rsocket.resume.ResumableDuplexConnection.sendFrame(ResumableDuplexConnection.java:221)
	io.rsocket.resume.ResumableDuplexConnection.dispatch(ResumableDuplexConnection.java:241)
	io.rsocket.resume.UpstreamFramesSubscriber.processFrame(UpstreamFramesSubscriber.java:156)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:73)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:32)
	reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:695)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:571)
	reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:971)
	io.rsocket.internal.UnboundedProcessor.drainFused(UnboundedProcessor.java:144)
	io.rsocket.internal.UnboundedProcessor.drain(UnboundedProcessor.java:177)
	io.rsocket.internal.UnboundedProcessor.onNext(UnboundedProcessor.java:255)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:461)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:447)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	xxx.xxxx.lambda$listXXX$1(XXXXService.java:74)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	io.rsocket.frame.DataAndMetadataFlyweight.encodeLength(DataAndMetadataFlyweight.java:21)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:47)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:453)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:447)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	xxx.xxxx.lambda$listXXX$1(XXXXService.java:74)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	io.rsocket.frame.DataAndMetadataFlyweight.encodeLength(DataAndMetadataFlyweight.java:19)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:47)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:453)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:447)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	xxx.xxxx.lambda$listXXX$1(XXXXService.java:74)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#4:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeShort(AdvancedLeakAwareByteBuf.java:550)
	io.rsocket.frame.FrameHeaderFlyweight.encode(FrameHeaderFlyweight.java:56)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:50)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:453)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:447)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	xxx.xxxx.lambda$listXXX$1(XXXXService.java:74)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#5:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeInt(AdvancedLeakAwareByteBuf.java:562)
	io.rsocket.frame.FrameHeaderFlyweight.encode(FrameHeaderFlyweight.java:56)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:50)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:453)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:447)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	xxx.xxxx.lambda$listXXX$1(XXXXService.java:74)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newHeapBuffer(PooledByteBufAllocator.java:332)
	io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:168)
	io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:154)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:109)
	io.rsocket.frame.FrameHeaderFlyweight.encode(FrameHeaderFlyweight.java:56)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:50)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:453)
	io.rsocket.RSocketResponder$3.hookOnNext(RSocketResponder.java:447)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	xxx.xxxx.lambda$listXXX$1(XXXXService.java:74)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
: 1 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.
@OlegDokuka
Copy link
Member

Hi @piotr-signalfx!

Thanks for reporting that.
We need to know the version of RSocket-Java along with the use case which can reproduce that.

Also, you may find similar the following issue #733.

In any case, we are working on improvements toward catching possible leaks in #777. However, we cannot say anything until we know that is the case that reproduces your leak.

Cheers,
Oleh

@piotr-signalfx
Copy link
Author

Hi Oleh,

We use rsocket-java 1.0.0-RC5.
We do not use zero copy.
I will try to reproduce the issue and put a test case together, but as mentioned it does not happen frequently, just a few times in a week in a production environment which constantly sees some load. My wild guess would be it might be related to session resumption for instance.

Cheers,
Piotr

@OlegDokuka
Copy link
Member

Definitely, an example of a code that simulates what you do may simplify the investigation dramatically.

In any case, we will investigate more. It might be some race cases which is not releasing all the buffers in the resuming queue

@OlegDokuka
Copy link
Member

Also, please try RC7 and see whether it has the same issue as RC5

@piotr-signalfx
Copy link
Author

Thanks. I will upgrade to RC7 and let you know if we find something more.

@OlegDokuka
Copy link
Member

OlegDokuka commented Apr 14, 2020

Just FYI, RC-7 is in the snapshot. Briefly looking into resumability implementation, I can see a few cases which might produce leaks so I will try to work around those shortly.

@OlegDokuka OlegDokuka added this to the 1.x Backlog milestone Apr 15, 2020
@OlegDokuka OlegDokuka self-assigned this Apr 15, 2020
@rstoyanchev rstoyanchev changed the title Memory leak reported by Netty Resume related Netty ByteBuf leak Apr 17, 2020
@piotr-signalfx
Copy link
Author

piotr-signalfx commented Apr 27, 2020

I can confirm the same issue affects RC-7 (480b501).
We tried to simulate some network problems with https://github.com/alexei-led/pumba in order to reproduce the issue locally with a reasonably simple test case but we could not.

Logs from RC-7:

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AdvancedLeakAwareByteBuf.order(AdvancedLeakAwareByteBuf.java:70)
	io.netty.buffer.CompositeByteBuf.newComponent(CompositeByteBuf.java:335)
	io.netty.buffer.CompositeByteBuf.addComponents0(CompositeByteBuf.java:374)
	io.netty.buffer.CompositeByteBuf.addComponents(CompositeByteBuf.java:236)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:47)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:474)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:443)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	x.x.XService.lambda$null$4(XService.java:92)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	io.rsocket.frame.DataAndMetadataFlyweight.encodeLength(DataAndMetadataFlyweight.java:19)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:46)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:474)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:443)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	x.x.XService.lambda$null$4(XService.java:92)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	io.rsocket.frame.DataAndMetadataFlyweight.encodeLength(DataAndMetadataFlyweight.java:18)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:46)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:474)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:443)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	x.x.XService.lambda$null$4(XService.java:92)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#4:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeShort(AdvancedLeakAwareByteBuf.java:550)
	io.rsocket.frame.FrameHeaderFlyweight.encode(FrameHeaderFlyweight.java:56)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:50)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:474)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:443)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	x.x.XService.lambda$null$4(XService.java:92)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#5:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeInt(AdvancedLeakAwareByteBuf.java:562)
	io.rsocket.frame.FrameHeaderFlyweight.encode(FrameHeaderFlyweight.java:56)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:50)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:474)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:443)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	x.x.XService.lambda$null$4(XService.java:92)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newHeapBuffer(PooledByteBufAllocator.java:332)
	io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:168)
	io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:154)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:109)
	io.rsocket.frame.FrameHeaderFlyweight.encode(FrameHeaderFlyweight.java:56)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:50)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNext(PayloadFrameFlyweight.java:56)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:474)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:443)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:415)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	x.x.XService.lambda$null$4(XService.java:92)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
: 1 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.```

@piotr-signalfx
Copy link
Author

@OlegDokuka After upgrading to RC7 we are still facing some leaks. It looks like they might be related to this error we see in the logs as the leak is always reported a few seconds later.

Message:  Error receiving frame:
Thread:     reactor-tcp-epoll-4
Logger:     io.rsocket.FrameLogger
Stacktrace: io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

And the leak log:


LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.rsocket.resume.InMemoryResumableFramesStore.dispose(InMemoryResumableFramesStore.java:132)
	io.rsocket.resume.ResumableDuplexConnection.dispose(ResumableDuplexConnection.java:211)
	io.rsocket.resume.ServerRSocketSession.lambda$new$3(ServerRSocketSession.java:84)
	reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:149)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:819)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:589)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:840)
	reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:979)
	reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
	reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:819)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:589)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:840)
	reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:979)
	reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:114)
	reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
	reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
	reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)
	reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
	reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
	reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#2:
	io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.retain(AdvancedLeakAwareCompositeByteBuf.java:36)
	io.rsocket.resume.InMemoryResumableFramesStore.saveFrame(InMemoryResumableFramesStore.java:171)
	io.rsocket.resume.InMemoryResumableFramesStore$FramesSubscriber.onNext(InMemoryResumableFramesStore.java:225)
	io.rsocket.resume.InMemoryResumableFramesStore$FramesSubscriber.onNext(InMemoryResumableFramesStore.java:206)
	reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
	reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
	reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
	io.rsocket.resume.ResumableDuplexConnection.sendFrame(ResumableDuplexConnection.java:232)
	io.rsocket.resume.ResumableDuplexConnection.dispatch(ResumableDuplexConnection.java:251)
	io.rsocket.resume.UpstreamFramesSubscriber.processFrame(UpstreamFramesSubscriber.java:156)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:73)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:32)
	reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:693)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
	reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:969)
	io.rsocket.internal.UnboundedProcessor.drainFused(UnboundedProcessor.java:169)
	io.rsocket.internal.UnboundedProcessor.drain(UnboundedProcessor.java:205)
	io.rsocket.internal.UnboundedProcessor.onNext(UnboundedProcessor.java:291)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:471)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:440)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:478)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#3:
	io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.readShort(AdvancedLeakAwareCompositeByteBuf.java:392)
	io.rsocket.frame.FrameHeaderFlyweight.nativeFrameType(FrameHeaderFlyweight.java:89)
	io.rsocket.resume.ResumableDuplexConnection.isResumableFrame(ResumableDuplexConnection.java:376)
	io.rsocket.resume.ResumableDuplexConnection.sendFrame(ResumableDuplexConnection.java:231)
	io.rsocket.resume.ResumableDuplexConnection.dispatch(ResumableDuplexConnection.java:251)
	io.rsocket.resume.UpstreamFramesSubscriber.processFrame(UpstreamFramesSubscriber.java:156)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:73)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:32)
	reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:693)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
	reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:969)
	io.rsocket.internal.UnboundedProcessor.drainFused(UnboundedProcessor.java:169)
	io.rsocket.internal.UnboundedProcessor.drain(UnboundedProcessor.java:205)
	io.rsocket.internal.UnboundedProcessor.onNext(UnboundedProcessor.java:291)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:471)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:440)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:478)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#4:
	io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.skipBytes(AdvancedLeakAwareCompositeByteBuf.java:512)
	io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.skipBytes(AdvancedLeakAwareCompositeByteBuf.java:36)
	io.rsocket.frame.FrameHeaderFlyweight.nativeFrameType(FrameHeaderFlyweight.java:88)
	io.rsocket.resume.ResumableDuplexConnection.isResumableFrame(ResumableDuplexConnection.java:376)
	io.rsocket.resume.ResumableDuplexConnection.sendFrame(ResumableDuplexConnection.java:231)
	io.rsocket.resume.ResumableDuplexConnection.dispatch(ResumableDuplexConnection.java:251)
	io.rsocket.resume.UpstreamFramesSubscriber.processFrame(UpstreamFramesSubscriber.java:156)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:73)
	io.rsocket.resume.UpstreamFramesSubscriber.onNext(UpstreamFramesSubscriber.java:32)
	reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:693)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
	reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:969)
	io.rsocket.internal.UnboundedProcessor.drainFused(UnboundedProcessor.java:169)
	io.rsocket.internal.UnboundedProcessor.drain(UnboundedProcessor.java:205)
	io.rsocket.internal.UnboundedProcessor.onNext(UnboundedProcessor.java:291)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:471)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:440)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:478)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
#5:
	io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.addComponents(AdvancedLeakAwareCompositeByteBuf.java:926)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:75)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encode(PayloadFrameFlyweight.java:68)
	io.rsocket.frame.PayloadFrameFlyweight.encodeReleasingPayload(PayloadFrameFlyweight.java:53)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNextReleasingPayload(PayloadFrameFlyweight.java:15)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:470)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:440)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:478)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
Created at:
	io.netty.buffer.AbstractByteBufAllocator.compositeHeapBuffer(AbstractByteBufAllocator.java:213)
	io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:203)
	io.rsocket.frame.DataAndMetadataFlyweight.encode(DataAndMetadataFlyweight.java:75)
	io.rsocket.frame.RequestFlyweight.encode(RequestFlyweight.java:59)
	io.rsocket.frame.PayloadFrameFlyweight.encode(PayloadFrameFlyweight.java:68)
	io.rsocket.frame.PayloadFrameFlyweight.encodeReleasingPayload(PayloadFrameFlyweight.java:53)
	io.rsocket.frame.PayloadFrameFlyweight.encodeNextReleasingPayload(PayloadFrameFlyweight.java:15)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:470)
	io.rsocket.core.RSocketResponder$3.hookOnNext(RSocketResponder.java:440)
	reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	reactor.core.publisher.FluxContextStart$ContextStartSubscriber.tryOnNext(FluxContextStart.java:111)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.runAsync(FluxPublishOn.java:866)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.run(FluxPublishOn.java:939)
	reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:79)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759)
	reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:704)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:478)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
	reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
	reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:834)
# offset	   	71,569,077

@OlegDokuka
Copy link
Member

@piotr-signalfx I was able to reproduce that as well locally, but unfortunately at this stage, we can nothing to do with that because of a few bugs in Reactor and in our Resumability impl as well.

The good news is that the issue will be addressed in 1.1, thus, please stay tuned!

@OlegDokuka OlegDokuka added on-hold and removed minor labels Apr 30, 2020
@OlegDokuka OlegDokuka linked a pull request Sep 27, 2020 that will close this issue
@OlegDokuka OlegDokuka removed this from the 1.x Backlog milestone Sep 27, 2020
@OlegDokuka OlegDokuka added this to the 1.1 RC1 milestone Sep 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants