-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publisher[retry|repeat] operators demand management if onNext throws #2639
Conversation
Motivation: Publisher `retry*` and `repeat*` operators need to persist outstanding demand between "re-do" operations. Otherwise the downstream Subscriber will have requested signals, but they won't be requested from the new async source after switching. If a downstream operator (or Subscriber) throws from onNext the retry/repeat operator has to decrement demand to avoid violating the Reactive Streams specification (no more than request(n) amount of onNext(..)), but downstream Subscribers that didn't see the onNext (because an earlier operator threw) will be hung waiting for request(n) that will never be delivered. Modifications: - Publisher[retry|repeat] introduce a flag to determine if downstream onNext throwing should be terminal. This mode should be used unless it is known the onNext signal always makes it to the last Subscriber and its request(n) accounting can tolerate the exception it throws.
*/ | ||
public final Publisher<T> retry(BiIntPredicate<Throwable> shouldRetry) { | ||
return new RedoPublisher<>(this, | ||
return retry(false, shouldRetry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tempted to have the default be true
because it is generally safer, error propagation is more explicit, and you will never "hang". However that would be a breaking change, and may not be the right choice if the Subscriber
is the one that throws (see PublisherBufferTest.java
below).
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec says
Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.
which covers the solutions here, doesn't it?
I think the test may be biased toward our implementation of the operators in the chain. I believe defaulting to true
makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good. unless there are objections in this PR I'll keep default at false
and submit a followup PR to make the change more explicit/visible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's a very good catch!
Interestingly, I've checked other RS implementations (reactor and rxjava3) and they let an exception from onNext
propagate upstream, as we do when terminateOnNextException == false
.
The spec says...
The most frequent source of exceptions will be operators like Publisher.map
or filter
. I've seen users throwing from the mapper intentionally.
I would say that we need to handle retry and repeat separately. For repeat, it makes sense to try-catch, cancel, terminate (like terminateOnNextException == true
) and that should be the only behavior. We do not need an overload that takes a boolean. This will be consistent with Single.repeat*
.
The retry is a little bit more interesting because it handles exceptions. However, I would say that terminateOnNextException == true
should be an expected behavior.
Reasoning: retry*
should retry exceptions originated by the upstream. If we already delivered onNext
and the exception was thrown by onNext
downstream, it should not be considered for retry predicate.
Hesitations:
- Other RS implementations behave like
terminateOnNextException == false
😞 - It's a behavior change for existing users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry* should retry exceptions originated by the upstream. If we already delivered onNext and the exception was thrown by onNext downstream, it should not be considered for retry predicate.
Exactly that 👍 - It's very unexpected to see this behavior, and hard to debug, because you are paying attention to the wrong part of the stream.
Other RS implementations behave like terminateOnNextException == false
sadly true, which makes me think that they likely have the same bug.
It's a behavior change for existing users.
The risk of having a silent bug like this is real, I would say let's use our communication channels, to highlight this change, and have a release that introduces just this, so people can move back to the previous version if needed, we can iterate if we notice bad feedback. Maybe even add a WARN in the output when the retry catches an uncaught exception?
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
Outdated
Show resolved
Hide resolved
Code change LGTM |
Motivation:
Publisher
retry*
andrepeat*
operators need to persist outstanding demand between "re-do" operations. Otherwise the downstream Subscriber will have requested signals, but they won't be requested from the new async source after switching. If a downstream operator (or Subscriber) throws from onNext the retry/repeat operator has to decrement demand to avoid violating the Reactive Streams specification (no more than request(n) amount of onNext(..)), but downstream Subscribers that didn't see the onNext (because an earlier operator threw) will be hung waiting for request(n) that will never be delivered.Modifications: