Skip to content

Commit

Permalink
2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads. (#5847
Browse files Browse the repository at this point in the history
)

* 2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads.

* Compact tests, use named constants
  • Loading branch information
akarnokd authored Feb 19, 2018
1 parent 4227a59 commit 7ede8ee
Show file tree
Hide file tree
Showing 21 changed files with 3,441 additions and 4 deletions.
85 changes: 84 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5771,7 +5771,7 @@ public final Future<T> toFuture() {
}

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
* Runs the source Flowable to a terminal event, ignoring any values and rethrowing any exception.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
Expand Down Expand Up @@ -10112,6 +10112,89 @@ public final Flowable<T> mergeWith(Publisher<? extends T> other) {
return merge(this, other);
}

/**
* Merges the sequence of items of this Flowable with the success value of the other SingleSource.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* The success value of the other {@code SingleSource} can get interleaved at any point of this
* {@code Flowable} sequence.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and ensures the success item from the
* {@code SingleSource} is emitted only when there is a downstream demand.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the {@code SingleSource} whose success value to merge with
* @return the new Flowable instance
* @since 2.1.10 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Flowable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new FlowableMergeWithSingle<T>(this, other));
}

/**
* Merges the sequence of items of this Flowable with the success value of the other MaybeSource
* or waits both to complete normally if the MaybeSource is empty.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* The success value of the other {@code MaybeSource} can get interleaved at any point of this
* {@code Flowable} sequence.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and ensures the success item from the
* {@code MaybeSource} is emitted only when there is a downstream demand.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the {@code MaybeSource} which provides a success value to merge with or completes
* @return the new Flowable instance
* @since 2.1.10 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Flowable<T> mergeWith(@NonNull MaybeSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new FlowableMergeWithMaybe<T>(this, other));
}

/**
* Relays the items of this Flowable and completes only when the other CompletableSource completes
* as well.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the {@code CompletableSource} to await for completion
* @return the new Flowable instance
* @since 2.1.10 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Flowable<T> mergeWith(@NonNull CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new FlowableMergeWithCompletable<T>(this, other));
}

/**
* Modifies a Publisher to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of {@link #bufferSize()} slots.
Expand Down
71 changes: 71 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8991,6 +8991,77 @@ public final Observable<T> mergeWith(ObservableSource<? extends T> other) {
return merge(this, other);
}

/**
* Merges the sequence of items of this Observable with the success value of the other SingleSource.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* The success value of the other {@code SingleSource} can get interleaved at any point of this
* {@code Observable} sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the {@code SingleSource} whose success value to merge with
* @return the new Observable instance
* @since 2.1.10 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Observable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new ObservableMergeWithSingle<T>(this, other));
}

/**
* Merges the sequence of items of this Observable with the success value of the other MaybeSource
* or waits both to complete normally if the MaybeSource is empty.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* The success value of the other {@code MaybeSource} can get interleaved at any point of this
* {@code Observable} sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the {@code MaybeSource} which provides a success value to merge with or completes
* @return the new Observable instance
* @since 2.1.10 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Observable<T> mergeWith(@NonNull MaybeSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new ObservableMergeWithMaybe<T>(this, other));
}

/**
* Relays the items of this Observable and completes only when the other CompletableSource completes
* as well.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the {@code CompletableSource} to await for completion
* @return the new Observable instance
* @since 2.1.10 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Observable<T> mergeWith(@NonNull CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new ObservableMergeWithCompletable<T>(this, other));
}

/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.*;

/**
* Merges a Flowable and a Completable by emitting the items of the Flowable and waiting until
* both the Flowable and Completable complete normally.
*
* @param <T> the element type of the Flowable
* @since 2.1.10 - experimental
*/
public final class FlowableMergeWithCompletable<T> extends AbstractFlowableWithUpstream<T, T> {

final CompletableSource other;

public FlowableMergeWithCompletable(Flowable<T> source, CompletableSource other) {
super(source);
this.other = other;
}

@Override
protected void subscribeActual(Subscriber<? super T> observer) {
MergeWithSubscriber<T> parent = new MergeWithSubscriber<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
other.subscribe(parent.otherObserver);
}

static final class MergeWithSubscriber<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = -4592979584110982903L;

final Subscriber<? super T> actual;

final AtomicReference<Subscription> mainSubscription;

final OtherObserver otherObserver;

final AtomicThrowable error;

final AtomicLong requested;

volatile boolean mainDone;

volatile boolean otherDone;

MergeWithSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
this.mainSubscription = new AtomicReference<Subscription>();
this.otherObserver = new OtherObserver(this);
this.error = new AtomicThrowable();
this.requested = new AtomicLong();
}

@Override
public void onSubscribe(Subscription d) {
SubscriptionHelper.deferredSetOnce(mainSubscription, requested, d);
}

@Override
public void onNext(T t) {
HalfSerializer.onNext(actual, t, this, error);
}

@Override
public void onError(Throwable ex) {
SubscriptionHelper.cancel(mainSubscription);
HalfSerializer.onError(actual, ex, this, error);
}

@Override
public void onComplete() {
mainDone = true;
if (otherDone) {
HalfSerializer.onComplete(actual, this, error);
}
}

@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(mainSubscription, requested, n);
}

@Override
public void cancel() {
SubscriptionHelper.cancel(mainSubscription);
DisposableHelper.dispose(otherObserver);
}

void otherError(Throwable ex) {
SubscriptionHelper.cancel(mainSubscription);
HalfSerializer.onError(actual, ex, this, error);
}

void otherComplete() {
otherDone = true;
if (mainDone) {
HalfSerializer.onComplete(actual, this, error);
}
}

static final class OtherObserver extends AtomicReference<Disposable>
implements CompletableObserver {

private static final long serialVersionUID = -2935427570954647017L;

final MergeWithSubscriber<?> parent;

OtherObserver(MergeWithSubscriber<?> parent) {
this.parent = parent;
}

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public void onError(Throwable e) {
parent.otherError(e);
}

@Override
public void onComplete() {
parent.otherComplete();
}
}
}
}
Loading

0 comments on commit 7ede8ee

Please sign in to comment.