Skip to content

Commit

Permalink
add nullable annotation to simple queue (fixes #5053) (#5054)
Browse files Browse the repository at this point in the history
  • Loading branch information
jschneider authored and akarnokd committed Feb 3, 2017
1 parent 8720828 commit e5d3b0e
Show file tree
Hide file tree
Showing 55 changed files with 131 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.disposables;

import io.reactivex.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.internal.fuseable.QueueDisposable;

/**
Expand Down Expand Up @@ -93,6 +94,7 @@ public boolean offer(Object v1, Object v2) {
throw new UnsupportedOperationException("Should not be called!");
}

@Nullable
@Override
public Object poll() throws Exception {
return null; // always empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

package io.reactivex.internal.fuseable;

import io.reactivex.annotations.Nullable;

/**
* Override of the SimpleQueue interface with no throws Exception on poll.
*
* @param <T> the value type to enqueue and dequeue, not null
*/
public interface SimplePlainQueue<T> extends SimpleQueue<T> {

@Nullable
@Override
T poll();
}
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.reactivex.internal.fuseable;

import io.reactivex.annotations.Nullable;

/**
* A minimalist queue interface without the method bloat of java.util.Collection and java.util.Queue.
*
Expand All @@ -24,6 +26,10 @@ public interface SimpleQueue<T> {

boolean offer(T v1, T v2);

/**
* @return null to indicate an empty queue
*/
@Nullable
T poll() throws Exception;

boolean isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.observers;

import io.reactivex.Observer;
import io.reactivex.annotations.Nullable;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand Down Expand Up @@ -110,6 +111,7 @@ public final void complete() {
actual.onComplete();
}

@Nullable
@Override
public final T poll() throws Exception {
if (get() == FUSED_READY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.Flowable;
Expand Down Expand Up @@ -466,6 +467,7 @@ public int requestFusion(int requestedMode) {
return m;
}

@Nullable
@SuppressWarnings("unchecked")
@Override
public R poll() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collection;
import java.util.concurrent.Callable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
Expand Down Expand Up @@ -117,6 +118,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.functions.*;
Expand Down Expand Up @@ -106,6 +107,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
for (;;) {
Expand Down Expand Up @@ -195,6 +197,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.annotations.Experimental;
Expand Down Expand Up @@ -74,6 +75,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down Expand Up @@ -122,6 +124,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.annotations.Experimental;
Expand Down Expand Up @@ -130,6 +131,7 @@ public boolean isEmpty() {
return qs.isEmpty();
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down Expand Up @@ -239,6 +241,7 @@ public boolean isEmpty() {
return qs.isEmpty();
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.*;
Expand Down Expand Up @@ -144,6 +145,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down Expand Up @@ -276,6 +278,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.functions.Predicate;
Expand Down Expand Up @@ -79,6 +80,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
QueueSubscription<T> qs = this.qs;
Expand Down Expand Up @@ -143,6 +145,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
QueueSubscription<T> qs = this.qs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -174,6 +175,7 @@ public void request(long n) {
// ignored, no values emitted
}

@Nullable
@Override
public T poll() throws Exception {
return null; // always empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.*;
Expand Down Expand Up @@ -413,6 +414,7 @@ public boolean isEmpty() {
return (it != null && !it.hasNext()) || queue.isEmpty();
}

@Nullable
@Override
public R poll() throws Exception {
Iterator<? extends R> it = current;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.Subscriber;

import io.reactivex.Flowable;
Expand Down Expand Up @@ -55,6 +56,7 @@ public final int requestFusion(int mode) {
return mode & SYNC;
}

@Nullable
@Override
public final T poll() {
int i = index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Iterator;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.Subscriber;

import io.reactivex.Flowable;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final int requestFusion(int mode) {
return mode & SYNC;
}

@Nullable
@Override
public final T poll() {
if (it == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
Expand Down Expand Up @@ -353,6 +354,7 @@ public int requestFusion(int mode) {
return NONE;
}

@Nullable
@Override
public GroupedFlowable<K, V> poll() {
return queue.poll();
Expand Down Expand Up @@ -627,6 +629,7 @@ public int requestFusion(int mode) {
return NONE;
}

@Nullable
@Override
public T poll() {
T v = queue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.internal.fuseable.QueueSubscription;
Expand Down Expand Up @@ -72,6 +73,7 @@ public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called!");
}

@Nullable
@Override
public T poll() {
return null; // empty, always
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.functions.Function;
Expand Down Expand Up @@ -72,6 +73,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
Expand Down Expand Up @@ -131,6 +133,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicLong;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.Scheduler;
Expand Down Expand Up @@ -457,6 +458,7 @@ void runBackfused() {
}
}

@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
Expand Down Expand Up @@ -695,6 +697,7 @@ void runBackfused() {
}
}

@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicLong;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.*;
Expand Down Expand Up @@ -251,6 +252,7 @@ public int requestFusion(int mode) {
return NONE;
}

@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
Expand Down
Loading

0 comments on commit e5d3b0e

Please sign in to comment.