Skip to content

Commit

Permalink
Avoid queueing in UnicastProcessor receivers
Browse files Browse the repository at this point in the history
Closes gh-887

Signed-off-by: Rossen Stoyanchev <[email protected]>
  • Loading branch information
rstoyanchev committed Sep 22, 2020
1 parent 5a3dc4b commit 8d9a54a
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -348,7 +348,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
}

final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());
final AtomicBoolean once = new AtomicBoolean();

return Flux.defer(
Expand Down Expand Up @@ -456,7 +456,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;

final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());

return receiver
.transform(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@
import reactor.core.Exceptions;
import reactor.core.publisher.*;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements RSocket {
Expand Down Expand Up @@ -537,7 +538,7 @@ protected void hookOnError(Throwable throwable) {
}

private void handleChannel(int streamId, Payload payload, long initialRequestN) {
UnicastProcessor<Payload> frames = UnicastProcessor.create();
UnicastProcessor<Payload> frames = UnicastProcessor.create(Queues.<Payload>one().get());
channelProcessors.put(streamId, frames);

Flux<Payload> payloads =
Expand Down
6 changes: 4 additions & 2 deletions rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
}

@Test(timeout = 2000)
public void testStream() throws Exception {
public void testStream() {
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));
StepVerifier.create(responses).expectNextCount(10).expectComplete().verify();
}

@Test(timeout = 2000)
public void testChannel() throws Exception {
public void testChannel() {
Flux<Payload> requests =
Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i));
Flux<Payload> responses = rule.crs.requestChannel(requests);
Expand Down Expand Up @@ -543,6 +543,7 @@ public Mono<Payload> requestResponse(Payload payload) {
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.range(1, 10)
.delaySubscription(Duration.ofMillis(100))
.map(
i -> DefaultPayload.create("server got -> [" + payload.toString() + "]"));
}
Expand All @@ -556,6 +557,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
.subscribe();

return Flux.range(1, 10)
.delaySubscription(Duration.ofMillis(100))
.map(
payload ->
DefaultPayload.create("server got -> [" + payload.toString() + "]"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void testRangeButThrowException() {
}
})
.map(l -> DefaultPayload.create("l -> " + l))
.delaySubscription(Duration.ofMillis(100))
.cast(Payload.class)))
.bind(serverTransport)
.block();
Expand All @@ -71,6 +72,7 @@ public void testRangeOfConsumers() {
payload ->
Flux.range(1, 1000)
.map(l -> DefaultPayload.create("l -> " + l))
.delaySubscription(Duration.ofMillis(100))
.cast(Payload.class)))
.bind(serverTransport)
.block();
Expand Down Expand Up @@ -104,6 +106,7 @@ public void testSingleConsumer() {
payload ->
Flux.range(1, 10_000)
.map(l -> DefaultPayload.create("l -> " + l))
.delaySubscription(Duration.ofMillis(100))
.cast(Payload.class)))
.bind(serverTransport)
.block();
Expand Down

0 comments on commit 8d9a54a

Please sign in to comment.