diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 2ecdec215..272194bb2 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -348,7 +348,7 @@ private Flux handleRequestStream(final Payload payload) { } final UnboundedProcessor sendProcessor = this.sendProcessor; - final UnicastProcessor receiver = UnicastProcessor.create(); + final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); final AtomicBoolean once = new AtomicBoolean(); return Flux.defer( @@ -456,7 +456,7 @@ private Flux handleChannel(Flux request) { private Flux handleChannel(Payload initialPayload, Flux inboundFlux) { final UnboundedProcessor sendProcessor = this.sendProcessor; - final UnicastProcessor receiver = UnicastProcessor.create(); + final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); return receiver .transform( diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 581605ff4..3e2c06e92 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.*; import reactor.util.annotation.Nullable; +import reactor.util.concurrent.Queues; /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ class RSocketResponder implements RSocket { @@ -537,7 +538,7 @@ protected void hookOnError(Throwable throwable) { } private void handleChannel(int streamId, Payload payload, long initialRequestN) { - UnicastProcessor frames = UnicastProcessor.create(); + UnicastProcessor frames = UnicastProcessor.create(Queues.one().get()); channelProcessors.put(streamId, frames); Flux payloads = diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index 1e7bb337f..d3e614e1c 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -158,13 +158,13 @@ public Flux requestChannel(Publisher payloads) { } @Test(timeout = 2000) - public void testStream() throws Exception { + public void testStream() { Flux responses = rule.crs.requestStream(DefaultPayload.create("Payload In")); StepVerifier.create(responses).expectNextCount(10).expectComplete().verify(); } @Test(timeout = 2000) - public void testChannel() throws Exception { + public void testChannel() { Flux requests = Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i)); Flux responses = rule.crs.requestChannel(requests); @@ -543,6 +543,7 @@ public Mono requestResponse(Payload payload) { @Override public Flux requestStream(Payload payload) { return Flux.range(1, 10) + .delaySubscription(Duration.ofMillis(100)) .map( i -> DefaultPayload.create("server got -> [" + payload.toString() + "]")); } @@ -556,6 +557,7 @@ public Flux requestChannel(Publisher payloads) { .subscribe(); return Flux.range(1, 10) + .delaySubscription(Duration.ofMillis(100)) .map( payload -> DefaultPayload.create("server got -> [" + payload.toString() + "]")); diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java index 7d34ba478..f583355e6 100644 --- a/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java +++ b/rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java @@ -49,6 +49,7 @@ public void testRangeButThrowException() { } }) .map(l -> DefaultPayload.create("l -> " + l)) + .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); @@ -71,6 +72,7 @@ public void testRangeOfConsumers() { payload -> Flux.range(1, 1000) .map(l -> DefaultPayload.create("l -> " + l)) + .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block(); @@ -104,6 +106,7 @@ public void testSingleConsumer() { payload -> Flux.range(1, 10_000) .map(l -> DefaultPayload.create("l -> " + l)) + .delaySubscription(Duration.ofMillis(100)) .cast(Payload.class))) .bind(serverTransport) .block();