Skip to content

Commit

Permalink
Merge pull request #374 from ktoso/wip-209-210-improve
Browse files Browse the repository at this point in the history
+tck #362 wait for request signal in 209, and new additional tests
  • Loading branch information
viktorklang authored Jun 16, 2017
2 parents 8d4b33b + 9910b8b commit 940a51f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,79 +249,49 @@ public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterH

@Override @Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
final Publisher<T> pub = new Publisher<T>() {
@Override public void subscribe(final Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
private boolean completed = false;

@Override public void request(long n) {
if (!completed) {
completed = true;
s.onComplete(); // Publisher now realises that it is in fact already completed
}
}

@Override public void cancel() {
// noop, ignore
}
});
}
};

final Subscriber<T> sub = createSubscriber();
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

pub.subscribe(probe);
triggerRequest(sub);
probe.expectCompletion();
probe.expectNone();

env.verifyNoAsyncErrorsNoDelay();
triggerRequest(stage.subProxy().sub());
final long notUsed = stage.expectRequest(); // received request signal
stage.sub().onComplete();
stage.subProxy().expectCompletion();
}
});
}

@Override @Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
final Publisher<T> pub = new Publisher<T>() {
@Override
public void subscribe(final Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
@Override public void request(long n) {
// do nothing...
}
@Override public void cancel() {
// do nothing...
}
});
// immediately complete
s.onComplete();
}
};

final Subscriber<T> sub = createSubscriber();
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);

pub.subscribe(probe);
probe.expectCompletion();

env.verifyNoAsyncErrorsNoDelay();
final Subscriber<? super T> sub = stage.sub();
sub.onComplete();
stage.subProxy().expectCompletion();
}
});
}

@Override @Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
triggerRequest(stage.subProxy().sub());
final long notUsed = stage.expectRequest(); // received request signal
stage.sub().onError(new TestException()); // in response to that, we fail
stage.subProxy().expectError(Throwable.class);
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
@Override @Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {

stage.sub().onError(new TestException());
stage.subProxy().expectError(Throwable.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ public interface SubscriberBlackboxVerificationRules {
* </ul>
*/
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable;

/**
* Asks for a {@code Subscriber}, signals {@code onSubscribe} followed by an {@code onError} synchronously.
* <p>
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#2.10'>2.10</a>
* <p>
* If this test fails, the following could be checked within the {@code Subscriber} implementation:
* <ul>
* <li>if the {@code Subscriber} throws an unchecked exception from its {@code onSubscribe} or
* {@code onError} methods.
* </ul>
*/
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable;

/**
* Currently, this test is skipped because it would require analyzing what the {@code Subscriber} implementation
* does.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
package org.reactivestreams.tck;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
// don't even request()
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
}
}, "did not call `registerOnComplete()`");
}, "Did not receive expected `request` call within");
}

@Test
Expand All @@ -160,11 +160,38 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
}

@Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldPass_withRequestingSubscriber() throws Throwable {
customSubscriberVerification(new NoopSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); // request anything
}
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
}

@Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withNoopSubscriber() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override
public void run() throws Throwable {
customSubscriberVerification(new NoopSubscriber() {
// not requesting, so we can't test the "request followed by failure" scenario
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
}
}, "Did not receive expected `request` call within");
}

@Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withThrowingInsideOnError() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {

customSubscriberVerification(new NoopSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}

@Override public void onError(Throwable t) {
// this is wrong in many ways (incl. spec violation), but aims to simulate user code which "blows up" when handling the onError signal
throw new RuntimeException("Wrong, don't do this!", t); // don't do this
Expand Down

0 comments on commit 940a51f

Please sign in to comment.