From 8383e794e70fb910cefafb61248d8842b27d9895 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 15 Aug 2019 08:59:27 +0300 Subject: [PATCH] #533 drafts `receiveCloseStatus` method Signed-off-by: Oleh Dokuka --- .../client/WebsocketClientOperations.java | 22 +- .../server/WebsocketServerOperations.java | 35 ++- .../http/websocket/WebsocketInbound.java | 8 + .../netty/http/server/HttpServerTests.java | 201 ++++++++++++++++++ 4 files changed, 259 insertions(+), 7 deletions(-) diff --git a/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java b/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java index ef3e05da49..56da7e4dde 100644 --- a/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java +++ b/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java @@ -33,10 +33,12 @@ import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.netty.FutureMono; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; @@ -55,6 +57,7 @@ final class WebsocketClientOperations extends HttpClientOperations implements WebsocketInbound, WebsocketOutbound { final WebSocketClientHandshaker handshaker; + final MonoProcessor onCloseState; volatile int closeSent; @@ -64,6 +67,7 @@ final class WebsocketClientOperations extends HttpClientOperations HttpClientOperations replaced) { super(replaced); Channel channel = channel(); + onCloseState = MonoProcessor.create(); handshaker = WebSocketClientHandshakerFactory.newHandshaker(currentURI, WebSocketVersion.V13, @@ -195,6 +199,12 @@ public Mono sendClose(int rsv, int statusCode, @javax.annotation.Nullable return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText)); } + @Override + @SuppressWarnings("unchecked") + public Mono receiveCloseStatus() { + return onCloseState.or((Mono)onTerminate()); + } + Mono sendClose(CloseWebSocketFrame frame) { if (CLOSE_SENT.get(this) == 0) { //commented for now as we assume the close is always scheduled (deferFuture runs) @@ -202,6 +212,7 @@ Mono sendClose(CloseWebSocketFrame frame) { return FutureMono.deferFuture(() -> { if (CLOSE_SENT.getAndSet(this, 1) == 0) { discard(); + onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); return channel().writeAndFlush(frame) .addListener(ChannelFutureListener.CLOSE); } @@ -219,8 +230,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame) { return; } if (CLOSE_SENT.getAndSet(this, 1) == 0) { - channel().writeAndFlush(frame == null ? new CloseWebSocketFrame() : frame) - .addListener(ChannelFutureListener.CLOSE); + if (frame != null) { + onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); + channel().writeAndFlush(frame) + .addListener(ChannelFutureListener.CLOSE); + } else { + onCloseState.onNext(new WebSocketCloseStatus(-1, "")); + channel().writeAndFlush(new CloseWebSocketFrame()) + .addListener(ChannelFutureListener.CLOSE); + } } else if (frame != null) { frame.release(); diff --git a/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java b/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java index 110704250c..23643e430e 100644 --- a/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java +++ b/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java @@ -33,12 +33,14 @@ import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.netty.FutureMono; import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; @@ -58,8 +60,9 @@ final class WebsocketServerOperations extends HttpServerOperations implements WebsocketInbound, WebsocketOutbound { - final WebSocketServerHandshaker handshaker; - final ChannelPromise handshakerResult; + final WebSocketServerHandshaker handshaker; + final ChannelPromise handshakerResult; + final MonoProcessor onCloseState; volatile int closeSent; @@ -70,6 +73,7 @@ final class WebsocketServerOperations extends HttpServerOperations super(replaced); Channel channel = replaced.channel(); + onCloseState = MonoProcessor.create(); // Handshake WebSocketServerHandshakerFactory wsFactory = @@ -162,6 +166,14 @@ protected void onOutboundError(Throwable err) { } } + @Override + protected void onInboundCancel() { + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Cancelling Websocket inbound. Closing Websocket")); + } + sendCloseNow(null, null); + } + @Override public Mono sendClose() { return sendClose(new CloseWebSocketFrame()); @@ -182,6 +194,12 @@ public Mono sendClose(int rsv, int statusCode, @Nullable String reasonText return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText)); } + @Override + @SuppressWarnings("unchecked") + public Mono receiveCloseStatus() { + return onCloseState.or((Mono)onTerminate()); + } + Mono sendClose(CloseWebSocketFrame frame) { if (CLOSE_SENT.get(this) == 0) { //commented for now as we assume the close is always scheduled (deferFuture runs) @@ -189,6 +207,7 @@ Mono sendClose(CloseWebSocketFrame frame) { return FutureMono.deferFuture(() -> { if (CLOSE_SENT.getAndSet(this, 1) == 0) { discard(); + onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); return channel().writeAndFlush(frame) .addListener(ChannelFutureListener.CLOSE); } @@ -206,9 +225,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame, ChannelFutureListener lis return; } if (CLOSE_SENT.getAndSet(this, 1) == 0) { - ChannelFuture f = channel().writeAndFlush( - frame == null ? new CloseWebSocketFrame() : frame); - f.addListener(listener); + if (frame != null) { + onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText())); + channel().writeAndFlush(frame) + .addListener(ChannelFutureListener.CLOSE); + } else { + onCloseState.onNext(new WebSocketCloseStatus(-1, "")); + channel().writeAndFlush(new CloseWebSocketFrame()) + .addListener(ChannelFutureListener.CLOSE); + } } else if (frame != null) { frame.release(); diff --git a/src/main/java/reactor/netty/http/websocket/WebsocketInbound.java b/src/main/java/reactor/netty/http/websocket/WebsocketInbound.java index 98a67b6502..ef36505f63 100644 --- a/src/main/java/reactor/netty/http/websocket/WebsocketInbound.java +++ b/src/main/java/reactor/netty/http/websocket/WebsocketInbound.java @@ -17,9 +17,11 @@ package reactor.netty.http.websocket; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.netty.NettyInbound; import reactor.util.annotation.Nullable; @@ -48,6 +50,12 @@ public interface WebsocketInbound extends NettyInbound { */ HttpHeaders headers(); + /** + * Receive the close status code and reason if sent by the remote peer, + * or empty if the connection completes otherwise. + */ + Mono receiveCloseStatus(); + /** * Turn this {@link WebsocketInbound} into aggregating mode which will only produce * fully formed frame that have been received fragmented. diff --git a/src/test/java/reactor/netty/http/server/HttpServerTests.java b/src/test/java/reactor/netty/http/server/HttpServerTests.java index baec41cd2a..e18f6f7ed1 100644 --- a/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -56,6 +56,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -64,8 +65,11 @@ import org.junit.Ignore; import org.junit.Test; import org.reactivestreams.Publisher; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.UnicastProcessor; import reactor.netty.ByteBufFlux; import reactor.netty.ChannelBindException; import reactor.netty.Connection; @@ -1153,4 +1157,201 @@ public void testExpectErrorWhenConnectionClosed() throws Exception { assertThat(error.get()).isInstanceOf(AbortedException.class); server.dispose(); } + + @Test + public void testNormalConnectionCloseForWebSocketClient() { + Flux flux = Flux.range(0, 100) + .map(n -> String.format("%010d", n)); + UnicastProcessor receiver = UnicastProcessor.create(); + MonoProcessor statusServer = MonoProcessor.create(); + MonoProcessor statusClient = MonoProcessor.create(); + List test = + flux.collectList() + .block(); + assertThat(test).isNotNull(); + + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> resp.sendWebsocket((in, out) -> out.send( + flux.map(s -> ByteBufAllocator.DEFAULT.buffer().writeBytes(s.getBytes())) + ) + .then(out.sendClose(4404, "test")) + .then(in.receiveCloseStatus().subscribeWith(statusServer).then()) + )) + .wiretap(true) + .bindNow(); + + HttpClient.create() + .port(c.address() + .getPort()) + .wiretap(true) + .websocket() + .uri("/") + .handle((in, out) -> { + MonoProcessor done = MonoProcessor.create(); + in.receiveCloseStatus() + .subscribeWith(statusClient); + in.receive() + .map(bb -> { + byte[] content = new byte[bb.capacity()]; + bb.readBytes(content); + return new String(content); + }) + .doFinally((s) -> done.onComplete()) + .subscribeWith(receiver); + return done.then(Mono.delay(Duration.ofMillis(500))); + }) + .blockLast(); + + StepVerifier.create(receiver) + .expectNextSequence(test) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + StepVerifier.create(statusClient) + .expectNext(new WebSocketCloseStatus(4404, "test")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + + StepVerifier.create(statusServer) + .expectNext(new WebSocketCloseStatus(4404, "test")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + c.disposeNow(); + } + + + @Test + public void testNormalConnectionCloseForWebSocketServer() { + MonoProcessor statusServer = MonoProcessor.create(); + MonoProcessor statusClient = MonoProcessor.create(); + + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> + resp.sendWebsocket((in, out) -> in.receiveCloseStatus() + .subscribeWith(statusServer) + .then()) + ) + .wiretap(true) + .bindNow(); + + HttpClient.create() + .port(c.address() + .getPort()) + .wiretap(true) + .websocket() + .uri("/") + .handle((in, out) -> out.sendClose(4404, "test") + .then(in.receiveCloseStatus() + .subscribeWith(statusClient))) + .blockLast(); + + StepVerifier.create(statusClient) + .expectNext(new WebSocketCloseStatus(4404, "test")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + StepVerifier.create(statusServer) + .expectNext(new WebSocketCloseStatus(4404, "test")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + c.disposeNow(); + } + + @Test + public void testCancelConnectionCloseForWebSocketClient() { + MonoProcessor statusServer = MonoProcessor.create(); + MonoProcessor statusClient = MonoProcessor.create(); + + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> + resp.sendWebsocket((in, out) -> in.receiveCloseStatus() + .subscribeWith(statusServer) + .then()) + ) + .wiretap(true) + .bindNow(); + + HttpClient.create() + .port(c.address() + .getPort()) + .wiretap(true) + .websocket() + .uri("/") + .handle((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusClient); + + ((Connection) in).dispose(); + + return Mono.never(); + }) + .subscribe(); + + + StepVerifier.create(statusClient) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + StepVerifier.create(statusServer) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + c.disposeNow(); + } + + @Test + public void testCancelConnectionCloseForWebSocketServer() { + MonoProcessor statusServer = MonoProcessor.create(); + MonoProcessor statusClient = MonoProcessor.create(); + + DisposableServer c = HttpServer + .create() + .port(0) + .handle((req, resp) -> resp.sendWebsocket((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusServer) + .then(); + + ((Connection) in).dispose(); + + return Mono.never(); + })) + .wiretap(true) + .bindNow(); + + HttpClient.create() + .port(c.address() + .getPort()) + .wiretap(true) + .websocket() + .uri("/") + .handle((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusClient); + + return Mono.never(); + }) + .subscribe(); + + + StepVerifier.create(statusClient) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + StepVerifier.create(statusServer) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + c.disposeNow(); + } }