From b576b8761e61ebaaf0f885ff3088c9179ea2ac6e Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 29 Nov 2018 21:33:09 +0200 Subject: [PATCH] fix #536 When the connection is closed, stop publishing data When the remote closes the connection, ensure to stop publishing data, otherwise an exception like "SSLEngine closed already" will be observed in the logs. Enhance the message for the exception that will be thrown, so that it is obvious that the connection was closed while sending the request body. --- .../channel/ChannelOperationsHandler.java | 11 ++++++ .../http/client/HttpClientOperations.java | 7 +++- .../netty/http/server/HttpSendFileTests.java | 35 +++++++++++++++---- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java b/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java index ca7664af12..d044b40d8c 100755 --- a/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java +++ b/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java @@ -692,6 +692,17 @@ public void onNext(Object t) { @SuppressWarnings("FutureReturnValueIgnored") private void onNextInternal(Object t, ChannelPromise promise) { + if (!parent.ctx.channel() + .isActive()) { + cancel(); + if (log.isDebugEnabled()) { + log.debug(format(parent.ctx.channel(), "Dropping pending write, " + + "since connection has been closed: {}"), t); + } + ReferenceCountUtil.release(t); + return; + } + produced++; // Returned value is deliberately ignored diff --git a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java index c1289ca798..b4a382f06e 100644 --- a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java +++ b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java @@ -271,7 +271,12 @@ protected void onInboundClose() { return; } if (responseState == null) { - parentContext().fireContextError(new IOException("Connection closed prematurely")); + if (markSentBody()) { + parentContext().fireContextError(new IOException("Connection has been closed, while sending request body")); + } + else { + parentContext().fireContextError(new IOException("Connection closed prematurely")); + } return; } super.onInboundError(new IOException("Connection closed prematurely")); diff --git a/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java b/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java index 75512a1407..89a0d00976 100644 --- a/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java +++ b/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystem; import java.nio.file.FileSystems; @@ -31,6 +32,7 @@ import java.nio.file.StandardOpenOption; import java.time.Duration; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; @@ -38,8 +40,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.ipc.netty.NettyContext; @@ -268,15 +272,29 @@ private void assertSendFile(Function fn, bool @Test public void sendFileAsync4096() throws IOException, URISyntaxException { - doTestSendFileAsync(4096); + doTestSendFileAsync((req, resp) -> resp.sendByteArray(req.receive() + .aggregate() + .asByteArray()), + 4096, null); + } + + @Test + public void sendFileAsync4096Negative() throws IOException, URISyntaxException { + doTestSendFileAsync((req, resp) -> resp.status(500) + .header(HttpHeaderNames.CONNECTION, "close"), + 4096, "error".getBytes(Charset.defaultCharset())); } @Test public void sendFileAsync1024() throws IOException, URISyntaxException { - doTestSendFileAsync(1024); + doTestSendFileAsync((req, resp) -> resp.sendByteArray(req.receive() + .aggregate() + .asByteArray()), + 1024, null); } - private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxException { + private void doTestSendFileAsync(BiFunction> fn, int chunk, byte[] expectedContent) throws IOException, URISyntaxException { Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI()); Path tempFile = Files.createTempFile(largeFile.getParent(),"temp", ".txt"); tempFile.toFile().deleteOnExit(); @@ -307,9 +325,7 @@ private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxExcepti NettyContext context = HttpServer.create(opt -> customizeServerOptions(opt.host("localhost"))) - .newHandler((req, resp) -> resp.sendByteArray(req.receive() - .aggregate() - .asByteArray())) + .newHandler(fn) .block(); assertThat(context).isNotNull(); @@ -319,9 +335,14 @@ private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxExcepti .flatMap(res -> res.receive() .aggregate() .asByteArray()) + .onErrorReturn(t -> + t instanceof IOException && + "Connection has been closed, while sending request body".equals(t.getMessage()) + , + "error".getBytes(Charset.defaultCharset())) .block(); - assertThat(response).isEqualTo(Files.readAllBytes(tempFile)); + assertThat(response).isEqualTo(expectedContent == null ? Files.readAllBytes(tempFile) : expectedContent); context.dispose(); }