Skip to content

Commit

Permalink
Modifies rules 1.09 and 2.13 to mandate `java.lang.NullPointerExcepti…
Browse files Browse the repository at this point in the history
…on` be thrown.

Updates the TCK, Spec and example implementations.
  • Loading branch information
viktorklang committed Feb 4, 2015
1 parent bf8ea54 commit 46cdc0a
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 5 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?)
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
- The terms `emit`, `signal` or `send` are interchangeable. The specifications below will use `signal`.
- The terms `synchronously` or `synchronous` refer to executing in the calling `Thread`.
- The term "return normally" means "only throws exceptions that are explicitly allowed by the rule".

### SPECIFICATION

Expand All @@ -86,7 +87,7 @@ public interface Publisher<T> {
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally. The only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except in the case where the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, this aside the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| <a name="1.11">11</a> | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. |
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
Expand Down Expand Up @@ -119,7 +120,7 @@ public interface Subscriber<T> {
| <a name="2.10">10</a> | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call. |
| <a name="2.11">11</a> | A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before [[1]](#footnote-2-1) the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic. |
| <a name="2.12">12</a> | `Subscriber.onSubscribe` MUST be called at most once for a given `Subscriber` (based on object equality). |
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally. The only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST return normally except in the case where any provided parameter is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, this aside the only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
[<a name="footnote-2-1">1</a>] : See JMM definition of Happen-Before in section 17.4.5. on http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ final class SubscriptionImpl implements Subscription, Runnable {
private Iterator<T> iterator; // This is our cursor into the data stream, which we will send to the `Subscriber`

SubscriptionImpl(final Subscriber<? super T> subscriber) {
// As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null`
if (subscriber == null) throw null;
this.subscriber = subscriber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,23 @@ private void handleOnError(final Throwable error) {
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time

@Override public final void onSubscribe(final Subscription s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) throw null;
signal(new OnSubscribe(s));
}

@Override public final void onNext(final T element) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
if (element == null) throw null;

signal(new OnNext<T>(element));
}

@Override public final void onError(final Throwable t) {
signal(new OnError(t));
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
if (t == null) throw null;

signal(new OnError(t));
}

@Override public final void onComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
private boolean done = false;

@Override public void onSubscribe(final Subscription s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) throw null;

if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
try {
s.cancel(); // Cancel the additional subscription
Expand All @@ -38,6 +41,9 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
}

@Override public void onNext(final T element) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
if (element == null) throw null;

if (!done) { // If we aren't already done
try {
if (foreach(element)) {
Expand Down Expand Up @@ -80,6 +86,8 @@ private void done() {
protected abstract boolean foreach(final T element);

@Override public void onError(final Throwable t) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
if (t == null) throw null;
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T
publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
}

@Override @Test
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
}

@Override @Test
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
Expand Down Expand Up @@ -558,6 +563,11 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
subscriberVerification.untested_spec213_failingOnSignalInvocation();
}

@Override @Test
public void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
subscriberVerification.required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull();
}

@Override @Test
public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,24 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError
notVerified(); // can we meaningfully test this?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
@Override @Test
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
notVerified(); // cannot be meaningfully tested, or can it?
notVerified(); // can we meaningfully test this?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
@Override @Test
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
optionalActivePublisherTest(1, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
try {
pub.subscribe(null);
env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub));
} catch (NullPointerException npe) { }
env.verifyNoAsyncErrors();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.NullPointerException;
import java.lang.Override;
import java.lang.RuntimeException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
import static org.testng.Assert.assertTrue;

/**
* Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
Expand Down Expand Up @@ -310,6 +314,57 @@ public void untested_spec213_blackbox_failingOnSignalInvocation() throws Excepti
notVerified(); // cannot be meaningfully tested, or can it?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
@Override @Test
public void required_spec213_blackbox_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Subscription subscription = new Subscription() {
@Override public void request(final long elements) {}
@Override public void cancel() {}
};

{
final Subscriber<T> sub = createSubscriber();
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
}

{
final Subscriber<T> sub = createSubscriber();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onNext(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
}

{
final Subscriber<T> sub = createSubscriber();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onError(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}

env.verifyNoAsyncErrors();
}
});
}

////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,59 @@ public void untested_spec213_failingOnSignalInvocation() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#2.13
@Override @Test
public void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
subscriberTest(new TestStageTestRun() {
@Override
public void run(WhiteboxTestStage stage) throws Throwable {

final Subscription subscription = new Subscription() {
@Override public void request(final long elements) {}
@Override public void cancel() {}
};

{
final Subscriber<T> sub = createSubscriber(stage.probe());
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
}

{
final Subscriber<T> sub = createSubscriber(stage.probe());
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onNext(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
}

{
final Subscriber<T> sub = createSubscriber(stage.probe());
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onError(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}

env.verifyNoAsyncErrors();
}
});
}


////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface PublisherVerificationRules {
void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable;
void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable;
void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable;
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public interface SubscriberBlackboxVerificationRules {
void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable;
void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception;
void required_spec213_blackbox_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception;
void required_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public interface SubscriberWhiteboxVerificationRules {
void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable;
void untested_spec213_failingOnSignalInvocation() throws Exception;
void required_spec213_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable;
void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception;
void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable;
void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception;
Expand Down

0 comments on commit 46cdc0a

Please sign in to comment.