Skip to content

Commit

Permalink
+tck reactive-streams#362 wait for request signal in 209, and new add…
Browse files Browse the repository at this point in the history
…itional tests
  • Loading branch information
ktoso committed May 29, 2017
1 parent b7ca6d9 commit 5750f87
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,61 +244,29 @@ public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterH
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
@Override @Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
final Publisher<T> pub = new Publisher<T>() {
@Override public void subscribe(final Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
private boolean completed = false;

@Override public void request(long n) {
if (!completed) {
completed = true;
s.onComplete(); // Publisher now realises that it is in fact already completed
}
}

@Override public void cancel() {
// noop, ignore
}
});
}
};

final Subscriber<T> sub = createSubscriber();
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

pub.subscribe(probe);
final Subscriber<? super T> sub = stage.sub();

triggerRequest(sub);
probe.expectCompletion();
probe.expectNone();

env.verifyNoAsyncErrorsNoDelay();
final long notUsed = stage.expectRequest(); // received request signal
sub.onComplete();
stage.subProxy().expectCompletion();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
@Override @Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
final Publisher<T> pub = new Publisher<T>() {
@Override
public void subscribe(Subscriber<? super T> s) {
s.onComplete();
}
};

final Subscriber<T> sub = createSubscriber();
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

pub.subscribe(probe);
probe.expectCompletion();
final Subscriber<? super T> sub = stage.sub();

env.verifyNoAsyncErrorsNoDelay();
sub.onComplete();
stage.subProxy().expectCompletion();
}
});
}
Expand All @@ -307,9 +275,25 @@ public void subscribe(Subscriber<? super T> s) {
@Override @Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
final Subscriber<? super T> sub = stage.sub();

triggerRequest(sub);
final long notUsed = stage.expectRequest(); // received request signal
sub.onError(new TestException()); // in response to that, we fail
stage.subProxy().expectError(Throwable.class);
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
@Override @Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {

stage.sub().onError(new TestException());
stage.subProxy().expectError(Throwable.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface SubscriberBlackboxVerificationRules {
void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable;
void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable;
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable;
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable;
void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable;
void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
package org.reactivestreams.tck;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
// don't even request()
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
}
}, "did not call `registerOnComplete()`");
}, "Did not receive expected `request` call within");
}

@Test
Expand All @@ -130,11 +130,38 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
}

@Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldPass_withRequestingSubscriber() throws Throwable {
customSubscriberVerification(new NoopSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); // request anything
}
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
}

@Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withNoopSubscriber() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override
public void run() throws Throwable {
customSubscriberVerification(new NoopSubscriber() {
// not requesting, so we can't test the "request followed by failure" scenario
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
}
}, "Did not receive expected `request` call within");
}

@Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withThrowingInsideOnError() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {

customSubscriberVerification(new NoopSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}

@Override public void onError(Throwable t) {
// this is wrong in many ways (incl. spec violation), but aims to simulate user code which "blows up" when handling the onError signal
throw new RuntimeException("Wrong, don't do this!", t); // don't do this
Expand Down

0 comments on commit 5750f87

Please sign in to comment.