-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Add Flowable.switchMapCompletable{DelayError} operator #5870
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5870 +/- ##
============================================
+ Coverage 96.54% 96.61% +0.07%
- Complexity 5862 5868 +6
============================================
Files 647 648 +1
Lines 42766 42865 +99
Branches 5933 5952 +19
============================================
+ Hits 41289 41416 +127
+ Misses 568 552 -16
+ Partials 909 897 -12
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but comment about mapper
* <dt><b>Error handling:</b></dt> | ||
* <dd>Errors of this {@code Flowable} and all the {@code CompletableSource}s, who had the chance | ||
* to run to their completion, are delayed until | ||
* all of the terminate in some fashion. At this point, if there was only one failure, the respective |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[all of the]m
@Override | ||
public void onNext(T t) { | ||
CompletableSource c; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I'm looking at Flowable.switchMap
and Observable.switchMap
and they dispose current first:
RxJava/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java
Line 109 in 8068404
inner.cancel(); |
RxJava/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java
Line 105 in 8068404
inner.cancel(); |
While this operator maps first and then disposes current which give current more time to work which makes it more delayErrors
friendly I guess, however I still have a question about delayErrors
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to adapt Flowable.switchMap
behavior to respect delayErrors
if mapper throws but failed so far, this is the test I wrote, maybe it'll help you:
@Test
public void delayErrorsErrorInMapper() {
final PublishProcessor<Object> trigger = PublishProcessor.create();
TestSubscriber<Integer> ts = Flowable
.just(1, 2)
.switchMapDelayError(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer integer) throws Exception {
if (integer.equals(1)) {
return Flowable
.combineLatest(Flowable.just(1), trigger, new BiFunction<Integer, Object, Integer>() {
@Override
public Integer apply(Integer integer, Object o) {
return integer;
}
})
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
trigger.toString();
}
});
} else {
// Fail for second item.
throw new TestException("Item " + integer + " mapping failure.");
}
}
})
.test();
ts.assertNoValues();
ts.assertNotTerminated();
trigger.onNext(new Object());
ts.assertValue(1);
ts.assertError(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) {
return throwable instanceof TestException && throwable.getMessage().equals("Item 2 mapping failure.");
}
});
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, essentially it gives a larger window for errors but otherwise has no effect because there are no items to forward and a late completion has no effect anyway.
Mapping errors are terminal errors, as if the upstream ended with a failure. The problem is that the operator can't decide if it is okay to continue. You can always do try-catch
and return a Completable.error
.
} | ||
if (inner.compareAndSet(current, o)) { | ||
if (current != null) { | ||
current.dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to do it in loop? Do we expect concurrent onNext
from upstream
? Or I'm missing something…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current
could be null or the previous inner observer. When that old inner observer is completing, it CASes itself to null, which fails this CAS and forces a retry. In addition, a dispose
can also change the inner
to the INNER_DISPOSED
concurrently at which point it shouldn't be overridden and the loop should simply end.
78fee1f
to
75b7558
Compare
Rebased and refactored due to |
This PR adds the
Flowable.switchMapCompletable
andFlowable.switchMapCompletableDelayError
operators as requested by #4853.The associated new marbles are: