From f2e0b0e9d4f347de217db9f90fe48d1b1e9d00ec Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 8 Mar 2017 19:39:31 +1100 Subject: [PATCH] add Observable.switchMapSingle --- src/main/java/io/reactivex/Observable.java | 59 +++++++++++++++++++ .../observable/ObservableInternalHelper.java | 27 +++++++++ .../observable/ObservableSwitchTest.java | 59 ++++++++++++++++++- 3 files changed, 144 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 169073a5b55..2089a4002aa 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -10865,6 +10865,65 @@ public final Observable switchMap(Function(this, mapper, bufferSize, false)); } + /** + * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source + * ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted + * of these SingleSources. + *

+ * The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete. + * If the upstream ObservableSource signals an onError, the inner SingleSource is unsubscribed and the error delivered in-sequence. + *

+ * + *

+ *
Scheduler:
+ *
{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the inner SingleSources and the output + * @param mapper + * a function that, when applied to an item emitted by the source ObservableSource, returns a + * SingleSource + * @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource + * @see ReactiveX operators documentation: FlatMap + * @since 2.0.8 + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable switchMapSingle(Function> mapper) { + return ObservableInternalHelper.switchMapSingle(this, mapper); + } + + /** + * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source + * ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted + * of these SingleSources and delays any error until all SingleSources terminate. + *

+ * The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete. + * If the upstream ObservableSource signals an onError, the termination of the last inner SingleSource will emit that error as is + * or wrapped into a CompositeException along with the other possible errors the former inner SingleSources signalled. + *

+ * + *

+ *
Scheduler:
+ *
{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the inner SingleSources and the output + * @param mapper + * a function that, when applied to an item emitted by the source ObservableSource, returns a + * SingleSource + * @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource + * @see ReactiveX operators documentation: FlatMap + * @since 2.0.8 + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable switchMapSingleDelayError(Function> mapper) { + return ObservableInternalHelper.switchMapSingleDelayError(this, mapper); + } + /** * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source * ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 17a8df604fb..11242f835a2 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -315,4 +315,31 @@ public static Function>, ObservableSou return new ZipIterableFunction(zipper); } + public static Observable switchMapSingle(Observable source, final Function> mapper) { + return source.switchMap(convertSingleMapperToObservableMapper(mapper), 1); + } + + public static Observable switchMapSingleDelayError(Observable source, + Function> mapper) { + return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1); + } + + private static Function> convertSingleMapperToObservableMapper( + final Function> mapper) { + return new Function>() { + @SuppressWarnings("unchecked") + @Override + public Observable apply(T t) throws Exception { + SingleSource source = mapper.apply(t); + Single single; + if (source instanceof Single) { + single = (Single) source; + } else { + single = Single.unsafeCreate(source); + } + return (Observable) single.toObservable(); + } + }; + } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java index c725767201f..d0b5f0dba35 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java @@ -26,6 +26,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.*; +import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.ExceptionHelper; @@ -579,7 +580,6 @@ public ObservableSource apply(Object v) throws Exception { }, 16) .test() .assertResult(1); - } @Test @@ -622,7 +622,64 @@ public void switchMapInnerCancelled() { assertFalse(pp.hasObservers()); } + + @Test + public void switchMapSingleJustSource() { + Observable.just(0) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return Single.just(1); + } + }) + .test() + .assertResult(1); + } + + @Test + public void switchMapSingleFunctionDoesntReturnSingle() { + Observable.just(0) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return new SingleSource() { + @Override + public void subscribe(SingleObserver s) { + s.onSubscribe(Disposables.empty()); + s.onSuccess(1); + } + }; + } + }) + .test() + .assertResult(1); + } + @Test + public void switchMapSingleDelayErrorJustSource() { + final AtomicBoolean completed = new AtomicBoolean(); + Observable.just(0, 1) + .switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + if (v == 0) { + return Single.error(new RuntimeException()); + } else { + return Single.just(1).doOnSuccess(new Consumer() { + + @Override + public void accept(Integer n) throws Exception { + completed.set(true); + }}); + } + } + }) + .test() + .assertValue(1) + .assertError(RuntimeException.class); + assertTrue(completed.get()); + } + @Test public void scalarMap() { Observable.switchOnNext(Observable.just(Observable.just(1)))