Skip to content

Commit

Permalink
Retry/repeat operators consistently not copy AsyncContext (#2670)
Browse files Browse the repository at this point in the history
Motivation:
Some of the retry/repeat operators copy AsyncContext on subscribe
and others doen't. We should consistently not copy in these operators
and if copy is desired it can be implemented externally via another
operator.
  • Loading branch information
Scottmitch authored Aug 10, 2023
1 parent 02f67a6 commit 28b7494
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ final class RedoPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
@Override
void handleSubscribe(Subscriber<? super T> subscriber, ContextMap contextMap,
AsyncContextProvider contextProvider) {
// For the current subscribe operation we want to use contextMap directly, but in the event a re-subscribe
// operation occurs we want to restore the original state of the AsyncContext map, so we save a copy upfront.
// Current expected behavior is to capture the context on the first subscribe, save it, and re-use it on each
// resubscribe. This allows for async context to be shared across each request retry, and follows the same
// shared state model as the request object on the client. If copy-on-each-resubscribe is desired this could
// be provided by an independent operator, or manually cleared/overwritten.
original.delegateSubscribe(new RedoSubscriber<>(terminateOnNextException, new SequentialSubscription(), 0,
subscriber, contextMap.copy(), contextProvider, this), contextMap, contextProvider);
subscriber, contextMap, contextProvider, this), contextMap, contextProvider);
}

abstract static class AbstractRedoSubscriber<T> implements Subscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ final class RedoWhenPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
@Override
void handleSubscribe(Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
// For the current subscribe operation we want to use contextMap directly, but in the event a re-subscribe
// operation occurs we want to restore the original state of the AsyncContext map, so we save a copy upfront.
original.delegateSubscribe(
new RedoSubscriber<>(terminateOnNextException, new SequentialSubscription(), 0, subscriber,
contextMap.copy(), contextProvider, this),
contextMap, contextProvider);
// Current expected behavior is to capture the context on the first subscribe, save it, and re-use it on each
// resubscribe. This allows for async context to be shared across each request retry, and follows the same
// shared state model as the request object on the client. If copy-on-each-resubscribe is desired this could
// be provided by an independent operator, or manually cleared/overwritten.
original.delegateSubscribe(new RedoSubscriber<>(terminateOnNextException, new SequentialSubscription(), 0,
subscriber, contextMap, contextProvider, this), contextMap, contextProvider);
}

private static final class RedoSubscriber<T> extends AbstractRedoSubscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
* @param <T> Type of result of this {@link Single}.
*/
final class RetrySingle<T> extends AbstractNoHandleSubscribeSingle<T> {

private final Single<T> original;
private final BiIntPredicate<Throwable> shouldRetry;

Expand All @@ -41,11 +40,12 @@ final class RetrySingle<T> extends AbstractNoHandleSubscribeSingle<T> {
@Override
void handleSubscribe(final Subscriber<? super T> subscriber,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
// For the current subscribe operation we want to use contextMap directly, but in the event a re-subscribe
// operation occurs we want to restore the original state of the AsyncContext map, so we save a copy upfront.
original.delegateSubscribe(new RetrySubscriber<>(new SequentialCancellable(), this, subscriber,
0, contextMap.copy(), contextProvider),
contextMap, contextProvider);
// Current expected behavior is to capture the context on the first subscribe, save it, and re-use it on each
// resubscribe. This allows for async context to be shared across each request retry, and follows the same
// shared state model as the request object on the client. If copy-on-each-resubscribe is desired this could
// be provided by an independent operator, or manually cleared/overwritten.
original.delegateSubscribe(new RetrySubscriber<>(new SequentialCancellable(), this, subscriber, 0, contextMap,
contextProvider), contextMap, contextProvider);
}

abstract static class AbstractRetrySubscriber<T> implements Subscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* @param <T> Type of result of this {@link Single}.
*/
final class RetryWhenSingle<T> extends AbstractNoHandleSubscribeSingle<T> {

private final Single<T> original;
private final BiIntFunction<Throwable, ? extends Completable> shouldRetry;

Expand All @@ -43,10 +42,12 @@ final class RetryWhenSingle<T> extends AbstractNoHandleSubscribeSingle<T> {
@Override
void handleSubscribe(final Subscriber<? super T> subscriber,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
// For the current subscribe operation we want to use contextMap directly, but in the event a re-subscribe
// operation occurs we want to restore the original state of the AsyncContext map, so we save a copy upfront.
original.delegateSubscribe(new RetrySubscriber<>(new SequentialCancellable(), 0, subscriber,
contextMap, contextProvider, this), contextMap, contextProvider);
// Current expected behavior is to capture the context on the first subscribe, save it, and re-use it on each
// resubscribe. This allows for async context to be shared across each request retry, and follows the same
// shared state model as the request object on the client. If copy-on-each-resubscribe is desired this could
// be provided by an independent operator, or manually cleared/overwritten.
original.delegateSubscribe(new RetrySubscriber<>(new SequentialCancellable(), 0, subscriber, contextMap,
contextProvider, this), contextMap, contextProvider);
}

private static final class RetrySubscriber<T> extends RetrySingle.AbstractRetrySubscriber<T> {
Expand Down

0 comments on commit 28b7494

Please sign in to comment.