Skip to content

Commit

Permalink
2.x: coverage and cleanup 10/11-1 (#4689)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 11, 2016
1 parent 497f35f commit 5888b23
Show file tree
Hide file tree
Showing 43 changed files with 3,209 additions and 263 deletions.
16 changes: 1 addition & 15 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ public static <T> Maybe<T> fromCompletable(CompletableSource completableSource)
* @throws NullPointerException if single is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Maybe<T> fromSingle(SingleSource singleSource) {
public static <T> Maybe<T> fromSingle(SingleSource<T> singleSource) {
ObjectHelper.requireNonNull(singleSource, "singleSource is null");
return RxJavaPlugins.onAssembly(new MaybeFromSingle<T>(singleSource));
}
Expand Down Expand Up @@ -2883,20 +2883,6 @@ public final <R> R to(Function<? super Maybe<T>, R> convert) {
}
}

/**
* Converts this Maybe into a Completable instance composing cancellation
* through and dropping a success value if emitted.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable toCompletable() {
return RxJavaPlugins.onAssembly(new MaybeToCompletable<T>(this));
}

/**
* Converts this Maybe into a backpressure-aware Flowable instance composing cancellation
* through.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public boolean isDisposed() {
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void dispose() {

@Override
public boolean isDisposed() {
return observer1.isDisposed();
return DisposableHelper.isDisposed(observer1.get());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -125,7 +125,7 @@ void error(EqualObserver<T> sender, Throwable ex) {

static final class EqualObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {
implements MaybeObserver<T> {


private static final long serialVersionUID = -3031974433025990931L;
Expand All @@ -138,16 +138,10 @@ static final class EqualObserver<T>
this.parent = parent;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,46 @@ public void cancel() {
d = DisposableHelper.DISPOSED;
}

void fastPath(Subscriber<? super R> a, Iterator<? extends R> iter) {
for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
}

void drain() {
if (getAndIncrement() != 0) {
return;
Expand All @@ -155,48 +195,14 @@ void drain() {

if (iter != null) {
long r = requested.get();
long e = 0L;

if (r == Long.MAX_VALUE) {
for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
fastPath(a, iter);
return;
}

long e = 0L;

while (e != r) {
if (cancelled) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.BasicIntQueueDisposable;
import io.reactivex.internal.observers.BasicQueueDisposable;

/**
* Maps a success value into an Iterable and streams it back as a Flowable.
Expand All @@ -47,11 +47,9 @@ protected void subscribeActual(Observer<? super R> s) {
}

static final class FlatMapIterableObserver<T, R>
extends BasicIntQueueDisposable<R>
extends BasicQueueDisposable<R>
implements MaybeObserver<T> {

private static final long serialVersionUID = -8938804753851907758L;

final Observer<? super R> actual;

final Function<? super T, ? extends Iterable<? extends R>> mapper;
Expand Down Expand Up @@ -81,6 +79,8 @@ public void onSubscribe(Disposable d) {

@Override
public void onSuccess(T value) {
Observer<? super R> a = actual;

Iterator<? extends R> iter;
boolean has;
try {
Expand All @@ -89,17 +89,60 @@ public void onSuccess(T value) {
has = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
actual.onError(ex);
a.onError(ex);
return;
}

if (!has) {
actual.onComplete();
a.onComplete();
return;
}

this.it = iter;
drain();

if (outputFused && iter != null) {
a.onNext(null);
a.onComplete();
return;
}

for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
}

@Override
Expand All @@ -125,75 +168,6 @@ public boolean isDisposed() {
return cancelled;
}

void drain() {
if (getAndIncrement() != 0) {
return;
}

Observer<? super R> a = actual;
Iterator<? extends R> iter = this.it;

if (outputFused && iter != null) {
a.onNext(null);
a.onComplete();
return;
}

int missed = 1;

for (;;) {

if (iter != null) {
for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}

if (iter == null) {
iter = it;
}
}
}

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(d);
actual.onSubscribe(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(d);
actual.onSubscribe(this);
}
}

Expand Down
Loading

0 comments on commit 5888b23

Please sign in to comment.