Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spring-weblux/netty-reactor] Reactor subcriber context can't access token to link async thread to initiating transaction #185

Closed
jasonjkeller opened this issue Jan 11, 2021 · 2 comments
Assignees
Labels
bug Something isn't working as designed/intended

Comments

@jasonjkeller
Copy link
Contributor

It appears that there is a gap in our instrumentation with how a token is shared amongst our spring-webflux and netty-reactor instrumentation.

Essentially the spring-webflux instrumentation that starts the transaction creates a token that it stores where it can be accessed via the reactive subscriber context on an async thread (such as those used by our netty-reactor instrumentation) to link these async threads back to the thread that started the transaction, thus tying the transaction together. At some point work spins off to a parallel thread and a subscriber context switch happens that is unaccounted for and which cannot access the stored token. This subscriber is part of reactor core which we do not instrument and would like to avoid doing so. Because this subscriber cannot access the shared token none of the work on that thread is linked to the transaction. This happens when the subscribeOn(Schedulers.parallel()) operator is called.

Here is a repro app, the issue occurs when hitting the http://localhost:8080/fireAndForget route. Ultimately the external call made when calling postExternalMonoAlternative doesn't get linked to the transaction.

This seems to be dependent on the type of reactor core Scheduler that is used when calling Flux#publishOn or Mono#subscribeOn. In the example above, the parallel scheduler is used which causes work to execute on a parallel thread that cannot access the token. If instead, the Schedulers.immediate() scheduler is used the token is accessible and the thread that makes the second external call successfully gets linked back to original transaction thread.

The parallel scheduler spins off this thread that only executes reactor core classes that we don’t instrument and thus has no way to grab the token saved by our spring-webflux instrumentation:

"parallel-1@11947" daemon prio=5 tid=0x5a nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at com.example.springwebfluxrepro.controllers.WebFluxReproController.postExternalMonoAlternative(WebFluxReproController.java:83)
      at com.example.springwebfluxrepro.controllers.WebFluxReproController$$Lambda$1053.1474671496.apply(Unknown Source:-1)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:497)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:481)
      at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
      at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
      at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
      at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
      at java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.lang.Thread.run(Thread.java:834)

The immediate scheduler executes on a thread that can access and link the saved token via com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber (which is created by the spring-webflux instrumentation and should be accessible to the netty-reactor instrumentation from the current subscriber context):

"reactor-http-nio-4@11526" daemon prio=5 tid=0x4c nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at com.example.springwebfluxrepro.controllers.WebFluxReproController.postExternalMonoAlternative(WebFluxReproController.java:83)
      at com.example.springwebfluxrepro.controllers.WebFluxReproController$$Lambda$1053.1987417235.apply(Unknown Source:-1)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:497)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:481)
      at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
      at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
      at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:83)
      at reactor.core.publisher.MonoSubscribeOn.subscribeOrReturn(MonoSubscribeOn.java:55)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4031)
      at reactor.core.publisher.Mono.subscribeWith(Mono.java:4161)
      at reactor.core.publisher.Mono.subscribe(Mono.java:3878)
      at com.example.springwebfluxrepro.controllers.WebFluxReproController.fireAndForgetLogic(WebFluxReproController.java:101)
      at com.example.springwebfluxrepro.controllers.WebFluxReproController$$Lambda$951.660733125.accept(Unknown Source:-1)
      at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.lambda$onNext$1(TokenLinkingSubscriber.java:41)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber$$Lambda$907.667781731.run(Unknown Source:-1)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.withNRToken(TokenLinkingSubscriber.java:64)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onNext(TokenLinkingSubscriber.java:41)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
      at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.lambda$onNext$1(TokenLinkingSubscriber.java:41)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber$$Lambda$907.667781731.run(Unknown Source:-1)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.withNRToken(TokenLinkingSubscriber.java:64)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onNext(TokenLinkingSubscriber.java:41)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.lambda$onNext$1(TokenLinkingSubscriber.java:41)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber$$Lambda$907.667781731.run(Unknown Source:-1)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.withNRToken(TokenLinkingSubscriber.java:64)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onNext(TokenLinkingSubscriber.java:41)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
      at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.lambda$onNext$1(TokenLinkingSubscriber.java:41)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber$$Lambda$907.667781731.run(Unknown Source:-1)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.withNRToken(TokenLinkingSubscriber.java:64)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onNext(TokenLinkingSubscriber.java:41)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
      at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.lambda$onNext$1(TokenLinkingSubscriber.java:41)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber$$Lambda$907.667781731.run(Unknown Source:-1)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.withNRToken(TokenLinkingSubscriber.java:64)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onNext(TokenLinkingSubscriber.java:41)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
      at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.lambda$onNext$1(TokenLinkingSubscriber.java:41)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber$$Lambda$907.667781731.run(Unknown Source:-1)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.withNRToken(TokenLinkingSubscriber.java:64)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onNext(TokenLinkingSubscriber.java:41)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
      at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:397)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:397)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onComplete(TokenLinkingSubscriber.java:51)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:146)
      at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onComplete(TokenLinkingSubscriber.java:51)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:146)
      at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:397)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:397)
      at com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.onComplete(TokenLinkingSubscriber.java:51)
      at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:146)
      at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:397)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:397)
      at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:383)
      at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396)
      at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:452)
      at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:664)
      at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
      at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
      at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
      at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      at java.lang.Thread.run(Thread.java:834)

It is not yet clear how to fix this or how long a possible fix might take. We would prefer to avoid adding new instrumentation for reactor core as it is likely to result in a significant overhead increase. We likely need a clever way to merge the old subscriber context with the new one in our netty instrumentation so that the token is available to all.

@jasonjkeller jasonjkeller added the bug Something isn't working as designed/intended label Jan 11, 2021
@jasonjkeller
Copy link
Contributor Author

Might be related to #174 and/or #125

@tspring tspring self-assigned this Jan 15, 2021
@tspring
Copy link
Contributor

tspring commented Jan 15, 2021

should be fixed in #190

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working as designed/intended
Projects
Archived in project
Development

No branches or pull requests

2 participants