Skip to content

Commit

Permalink
fix #536 When the connection is closed, stop publishing data
Browse files Browse the repository at this point in the history
When the remote closes the connection, ensure to stop
publishing data, otherwise an exception like "SSLEngine closed already"
will be observed in the logs. Enhance the message for the exception
that will be thrown, so that it is obvious that the connection was closed
while sending the request body.
  • Loading branch information
violetagg committed Nov 30, 2018
1 parent 58bfd67 commit b576b87
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,17 @@ public void onNext(Object t) {

@SuppressWarnings("FutureReturnValueIgnored")
private void onNextInternal(Object t, ChannelPromise promise) {
if (!parent.ctx.channel()
.isActive()) {
cancel();
if (log.isDebugEnabled()) {
log.debug(format(parent.ctx.channel(), "Dropping pending write, " +
"since connection has been closed: {}"), t);
}
ReferenceCountUtil.release(t);
return;
}

produced++;

// Returned value is deliberately ignored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ protected void onInboundClose() {
return;
}
if (responseState == null) {
parentContext().fireContextError(new IOException("Connection closed prematurely"));
if (markSentBody()) {
parentContext().fireContextError(new IOException("Connection has been closed, while sending request body"));
}
else {
parentContext().fireContextError(new IOException("Connection closed prematurely"));
}
return;
}
super.onInboundError(new IOException("Connection closed prematurely"));
Expand Down
35 changes: 28 additions & 7 deletions src/test/java/reactor/ipc/netty/http/server/HttpSendFileTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand All @@ -31,15 +32,18 @@
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.ipc.netty.NettyContext;
Expand Down Expand Up @@ -268,15 +272,29 @@ private void assertSendFile(Function<HttpServerResponse, NettyOutbound> fn, bool

@Test
public void sendFileAsync4096() throws IOException, URISyntaxException {
doTestSendFileAsync(4096);
doTestSendFileAsync((req, resp) -> resp.sendByteArray(req.receive()
.aggregate()
.asByteArray()),
4096, null);
}

@Test
public void sendFileAsync4096Negative() throws IOException, URISyntaxException {
doTestSendFileAsync((req, resp) -> resp.status(500)
.header(HttpHeaderNames.CONNECTION, "close"),
4096, "error".getBytes(Charset.defaultCharset()));
}

@Test
public void sendFileAsync1024() throws IOException, URISyntaxException {
doTestSendFileAsync(1024);
doTestSendFileAsync((req, resp) -> resp.sendByteArray(req.receive()
.aggregate()
.asByteArray()),
1024, null);
}

private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxException {
private void doTestSendFileAsync(BiFunction<? super HttpServerRequest, ? super
HttpServerResponse, ? extends Publisher<Void>> fn, int chunk, byte[] expectedContent) throws IOException, URISyntaxException {
Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI());
Path tempFile = Files.createTempFile(largeFile.getParent(),"temp", ".txt");
tempFile.toFile().deleteOnExit();
Expand Down Expand Up @@ -307,9 +325,7 @@ private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxExcepti

NettyContext context =
HttpServer.create(opt -> customizeServerOptions(opt.host("localhost")))
.newHandler((req, resp) -> resp.sendByteArray(req.receive()
.aggregate()
.asByteArray()))
.newHandler(fn)
.block();
assertThat(context).isNotNull();

Expand All @@ -319,9 +335,14 @@ private void doTestSendFileAsync(int chunk) throws IOException, URISyntaxExcepti
.flatMap(res -> res.receive()
.aggregate()
.asByteArray())
.onErrorReturn(t ->
t instanceof IOException &&
"Connection has been closed, while sending request body".equals(t.getMessage())
,
"error".getBytes(Charset.defaultCharset()))
.block();

assertThat(response).isEqualTo(Files.readAllBytes(tempFile));
assertThat(response).isEqualTo(expectedContent == null ? Files.readAllBytes(tempFile) : expectedContent);
context.dispose();
}

Expand Down

0 comments on commit b576b87

Please sign in to comment.