Skip to content

Commit

Permalink
2.x: reintroduce OnErrorNotImplementedException for 0-1 arg subscribe…
Browse files Browse the repository at this point in the history
…() (#5036)
  • Loading branch information
akarnokd authored Feb 1, 2017
1 parent 7ff1eef commit ceded86
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 26 deletions.
4 changes: 4 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,10 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
* Subscribes to this Completable and calls the given Action when this Completable
* completes normally.
* <p>
* If the Completable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <p>
* If this Completable emits an error, it is sent to RxJavaPlugins.onError and gets swallowed.
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand Down
25 changes: 19 additions & 6 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5559,6 +5559,11 @@ public final void blockingSubscribe() {

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
Expand All @@ -5572,7 +5577,7 @@ public final void blockingSubscribe() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext) {
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -8983,6 +8988,10 @@ public final Disposable forEach(Consumer<? super T> onNext) {
/**
* Subscribes to the {@link Publisher} and receives notifications for each element until the
* onNext Predicate returns false.
* <p>
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
Expand All @@ -9003,7 +9012,7 @@ public final Disposable forEach(Consumer<? super T> onNext) {
@BackpressureSupport(BackpressureKind.NONE)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable forEachWhile(Predicate<? super T> onNext) {
return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return forEachWhile(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -12713,7 +12722,9 @@ public final Flowable<T> strict() {
/**
* Subscribes to a Publisher and ignores {@code onNext} and {@code onComplete} emissions.
* <p>
* If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
Expand All @@ -12729,14 +12740,16 @@ public final Flowable<T> strict() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER,
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}

/**
* Subscribes to a Publisher and provides a callback to handle the items it emits.
* <p>
* If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
Expand All @@ -12757,7 +12770,7 @@ public final Disposable subscribe() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ERROR_CONSUMER,
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}

Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3592,7 +3592,9 @@ public final Maybe<T> retryWhen(
/**
* Subscribes to a Maybe and ignores {@code onSuccess} and {@code onComplete} emissions.
* <p>
* If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Maybe emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -3604,13 +3606,15 @@ public final Maybe<T> retryWhen(
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
* Subscribes to a Maybe and provides a callback to handle the items it emits.
* <p>
* If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Maybe emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -3627,7 +3631,7 @@ public final Disposable subscribe() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return subscribe(onSuccess, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down
26 changes: 19 additions & 7 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4917,6 +4917,10 @@ public final void blockingSubscribe() {

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -4926,7 +4930,7 @@ public final void blockingSubscribe() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext) {
ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -6761,7 +6765,7 @@ public final Observable<T> doFinally(Action onFinally) {
* The action is shared between subscriptions and thus may be called concurrently from multiple
* threads; the action must be thread safe.
* <p>
* If the action throws a runtime exception, that exception is rethrown by the {@code unsubscribe()} call,
* If the action throws a runtime exception, that exception is rethrown by the {@code dispose()} call,
* sometimes as a {@code CompositeException} if there were multiple exceptions along the way.
* <p>
* Note that terminal events trigger the action unless the {@code ObservableSource} is subscribed to via {@code unsafeSubscribe()}.
Expand Down Expand Up @@ -7785,6 +7789,10 @@ public final Disposable forEach(Consumer<? super T> onNext) {
/**
* Subscribes to the {@link ObservableSource} and receives notifications for each element until the
* onNext Predicate returns false.
* <p>
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code forEachWhile} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -7801,7 +7809,7 @@ public final Disposable forEach(Consumer<? super T> onNext) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable forEachWhile(Predicate<? super T> onNext) {
return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return forEachWhile(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -10536,7 +10544,9 @@ public final Observable<T> startWithArray(T... items) {
/**
* Subscribes to an ObservableSource and ignores {@code onNext} and {@code onComplete} emissions.
* <p>
* If the Observable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -10548,13 +10558,15 @@ public final Observable<T> startWithArray(T... items) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

/**
* Subscribes to an ObservableSource and provides a callback to handle the items it emits.
* <p>
* If the Observable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -10571,7 +10583,7 @@ public final Disposable subscribe() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

/**
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,10 @@ public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends

/**
* Subscribes to a Single but ignore its emission or notification.
* <p>
* If the Single emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -2563,7 +2567,7 @@ public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}

/**
Expand Down Expand Up @@ -2594,6 +2598,10 @@ public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable>

/**
* Subscribes to a Single and provides a callback to handle the item it emits.
* <p>
* If the Single emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -2609,7 +2617,7 @@ public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable>
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ERROR_CONSUMER);
return subscribe(onSuccess, Functions.ON_ERROR_MISSING);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

import io.reactivex.annotations.Experimental;

/**
* Represents an exception used to signal to the {@code RxJavaPlugins.onError()} that a
* callback-based subscribe() method on a base reactive type didn't specify
* an onError handler.
* @since 2.0.6 - experimental
*/
@Experimental
public final class OnErrorNotImplementedException extends RuntimeException {

private static final long serialVersionUID = -6298857009889503852L;

/**
* Customizes the {@code Throwable} with a custom message and wraps it before it
* is signalled to the {@code RxJavaPlugins.onError()} handler as {@code OnErrorNotImplementedException}.
*
* @param message
* the message to assign to the {@code Throwable} to signal
* @param e
* the {@code Throwable} to signal; if null, a NullPointerException is constructed
*/
public OnErrorNotImplementedException(String message, Throwable e) {
super(message, e != null ? e : new NullPointerException());
}

/**
* Wraps the {@code Throwable} before it
* is signalled to the {@code RxJavaPlugins.onError()}
* handler as {@code OnErrorNotImplementedException}.
*
* @param e
* the {@code Throwable} to signal; if null, a NullPointerException is constructed
*/
public OnErrorNotImplementedException(Throwable e) {
super(e != null ? e.getMessage() : null, e != null ? e : new NullPointerException());
}
}
12 changes: 12 additions & 0 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.reactivestreams.Subscription;

import io.reactivex.*;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.functions.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timed;
Expand Down Expand Up @@ -218,6 +219,17 @@ public void accept(Throwable error) {
}
};

/**
* Wraps the consumed Throwable into an OnErrorNotImplementedException and
* signals it to the plugin error handler.
*/
public static final Consumer<Throwable> ON_ERROR_MISSING = new Consumer<Throwable>() {
@Override
public void accept(Throwable error) {
RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
}
};

public static final LongConsumer EMPTY_LONG_CONSUMER = new LongConsumer() {
@Override
public void accept(long v) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class CallbackCompletableObserver
Expand All @@ -43,7 +43,7 @@ public CallbackCompletableObserver(Consumer<? super Throwable> onError, Action o

@Override
public void accept(Throwable e) {
RxJavaPlugins.onError(e);
RxJavaPlugins.onError(new OnErrorNotImplementedException(e));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class EmptyCompletableObserver
Expand Down Expand Up @@ -46,7 +47,7 @@ public void onComplete() {
@Override
public void onError(Throwable e) {
lazySet(DisposableHelper.DISPOSED);
RxJavaPlugins.onError(e);
RxJavaPlugins.onError(new OnErrorNotImplementedException(e));
}

@Override
Expand Down
Loading

0 comments on commit ceded86

Please sign in to comment.