Skip to content

Commit

Permalink
provides tests, fixes bugs and polish API
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Oct 16, 2020
1 parent 8c7dfa7 commit cc4595b
Show file tree
Hide file tree
Showing 22 changed files with 1,605 additions and 338 deletions.
1 change: 1 addition & 0 deletions rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {

implementation 'org.slf4j:slf4j-api'

testImplementation (project(":rsocket-transport-local"))
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,15 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
public void subscribe(CoreSubscriber<? super Void> actual) {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
Operators.error(
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
final IllegalStateException e =
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
}

Operators.error(actual, e);
return;
}

Expand All @@ -81,14 +88,28 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
try {
if (!isValid(mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);

p.release();
actual.onError(

final IllegalArgumentException e =
new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
}

actual.onError(e);
return;
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
}

actual.onError(e);
return;
}
Expand All @@ -98,14 +119,22 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);

p.release();
actual.onError(Exceptions.unwrap(t));

final Throwable ut = Exceptions.unwrap(t);
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(ut, FrameType.REQUEST_FNF);
}

actual.onError(ut);
return;
}

final RequestInterceptor interceptor = this.requestInterceptor;
if (interceptor != null) {
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.sliceMetadata());
interceptor.onStart(streamId, FrameType.REQUEST_FNF);
}

try {
Expand Down Expand Up @@ -162,19 +191,40 @@ public Void block(Duration m) {
public Void block() {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
final IllegalStateException e =
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
}
throw e;
}

final Payload p = this.payload;
try {
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);

p.release();
throw new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));

final IllegalArgumentException e =
new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
}

throw e;
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF);
}

throw Exceptions.propagate(e);
}

Expand All @@ -183,13 +233,20 @@ public Void block() {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);

p.release();

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(t, FrameType.REQUEST_FNF);
}

throw Exceptions.propagate(t);
}

final RequestInterceptor interceptor = this.requestInterceptor;
if (interceptor != null) {
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.sliceMetadata());
interceptor.onStart(streamId, FrameType.REQUEST_FNF);
}

try {
Expand Down
Loading

0 comments on commit cc4595b

Please sign in to comment.