diff --git a/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java b/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java index ca7664af12..d044b40d8c 100755 --- a/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java +++ b/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java @@ -692,6 +692,17 @@ public void onNext(Object t) { @SuppressWarnings("FutureReturnValueIgnored") private void onNextInternal(Object t, ChannelPromise promise) { + if (!parent.ctx.channel() + .isActive()) { + cancel(); + if (log.isDebugEnabled()) { + log.debug(format(parent.ctx.channel(), "Dropping pending write, " + + "since connection has been closed: {}"), t); + } + ReferenceCountUtil.release(t); + return; + } + produced++; // Returned value is deliberately ignored diff --git a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java index c1289ca798..b4a382f06e 100644 --- a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java +++ b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java @@ -271,7 +271,12 @@ protected void onInboundClose() { return; } if (responseState == null) { - parentContext().fireContextError(new IOException("Connection closed prematurely")); + if (markSentBody()) { + parentContext().fireContextError(new IOException("Connection has been closed, while sending request body")); + } + else { + parentContext().fireContextError(new IOException("Connection closed prematurely")); + } return; } super.onInboundError(new IOException("Connection closed prematurely")); diff --git a/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java b/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java index 75512a1407..89a0d00976 100644 --- a/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java +++ b/src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystem; import java.nio.file.FileSystems; @@ -31,6 +32,7 @@ import java.nio.file.StandardOpenOption; import java.time.Duration; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; @@ -38,8 +40,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.ipc.netty.NettyContext; @@ -268,15 +272,29 @@ private void assertSendFile(Function fn, bool @Test public void sendFileAsync4096() throws IOException, URISyntaxException { - doTestSendFileAsync(4096); + doTestSendFileAsync((req, resp) -> resp.sendByteArray(req.receive() + .aggregate() + .asByteArray()), + 4096, null); + } + + @Test + public void sendFileAsync4096Negative() throws IOException, URISyntaxException { + doTestSendFileAsync((req, resp) -> resp.status(500) + .header(HttpHeaderNames.CONNECTION, "close"), + 4096, "error".getBytes(Charset.defaultCharset())); } @Test public void sendFileAsync1024() throws IOException, URISyntaxException { - doTestSendFileAsync(1024); + doTestSendFileAsync((req, resp) -> resp.sendByteArray(req.receive() + .aggregate() + .asByteArray()), + 1024, null); } - private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxException { + private void doTestSendFileAsync(BiFunction> fn, int chunk, byte[] expectedContent) throws IOException, URISyntaxException { Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI()); Path tempFile = Files.createTempFile(largeFile.getParent(),"temp", ".txt"); tempFile.toFile().deleteOnExit(); @@ -307,9 +325,7 @@ private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxExcepti NettyContext context = HttpServer.create(opt -> customizeServerOptions(opt.host("localhost"))) - .newHandler((req, resp) -> resp.sendByteArray(req.receive() - .aggregate() - .asByteArray())) + .newHandler(fn) .block(); assertThat(context).isNotNull(); @@ -319,9 +335,14 @@ private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxExcepti .flatMap(res -> res.receive() .aggregate() .asByteArray()) + .onErrorReturn(t -> + t instanceof IOException && + "Connection has been closed, while sending request body".equals(t.getMessage()) + , + "error".getBytes(Charset.defaultCharset())) .block(); - assertThat(response).isEqualTo(Files.readAllBytes(tempFile)); + assertThat(response).isEqualTo(expectedContent == null ? Files.readAllBytes(tempFile) : expectedContent); context.dispose(); }