-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure Sink.contextView is propagated #1450
Conversation
When creating a Mono via Mono.create any nested Monos should have the higher level sinks context view propogated down. JAVA-5345
I was unable to create a test to replicate the tracing issue reported in: spring-projects/spring-data-mongodb#4650. However, I manually tested against the repo in ticket and confirmed it works. |
Shouldn't we also do a similar change here? We have a collectionInfoRetriever.filter(databaseName, cryptContext.getMongoOperation())
.contextWrite(sink.contextView())
.doOnSuccess...
...
.subscribe(); If "yes", then there is plenty more places that need the change. |
...ctive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java
Outdated
Show resolved
Hide resolved
...s/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
Outdated
Show resolved
Hide resolved
...s/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
Outdated
Show resolved
Hide resolved
@stIncMale I agree its not helped by not being able to replicate the original test without the full stack. Let me investigate some more. |
...r-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java
Show resolved
Hide resolved
...r-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java
Show resolved
Hide resolved
...ctive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java
Show resolved
Hide resolved
@@ -120,7 +120,7 @@ public void subscribe(final Subscriber<? super Void> s) { | |||
return originalError; | |||
}) | |||
.then(Mono.error(originalError))) | |||
.doOnCancel(() -> createCancellationMono(terminated, timeout).subscribe()) | |||
.doOnCancel(() -> createCancellationMono(terminated, timeout).contextWrite(ctx).subscribe()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancellation has a side effect - so we need to propagate context there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't us calling subscribe()
here not with a subscriber that was given to us (the s
argument), the only thing that requires us to propagate the context from the subscriber s
? I suspect that the fact that createCancellationMono
has a side-effect (it mutates terminated
) is irrelevant. If the side-effect is relevant to context propagation, then could you please explain why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On cancellation of the main Publisher
we are calling subscribe() on a async clean up Publisher
and we ignore any errors, results / completion.
If the side-effect is relevant to context propagation, then could you please explain why?
The user may have tracing and want to trace from the web app to the driver. Without adding the context here we break that chain and they cannot tie the cleanup operations to a web request.
...s/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small code suggestion
...s/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java
Outdated
Show resolved
Hide resolved
@@ -120,7 +120,7 @@ public void subscribe(final Subscriber<? super Void> s) { | |||
return originalError; | |||
}) | |||
.then(Mono.error(originalError))) | |||
.doOnCancel(() -> createCancellationMono(terminated, timeout).subscribe()) | |||
.doOnCancel(() -> createCancellationMono(terminated, timeout).contextWrite(ctx).subscribe()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't us calling subscribe()
here not with a subscriber that was given to us (the s
argument), the only thing that requires us to propagate the context from the subscriber s
? I suspect that the fact that createCancellationMono
has a side-effect (it mutates terminated
) is irrelevant. If the side-effect is relevant to context propagation, then could you please explain why?
@stIncMale answered your questions - not sure why the reply to |
When creating a Mono via Mono.create any nested
Monos should have the higher level sinks context view propogated down.
JAVA-5345