Skip to content

Commit

Permalink
[UNDERTOW-1858] Fix ReadTimeoutTestCase (#1050)
Browse files Browse the repository at this point in the history
[UNDERTOW-1858] Fix ReadTimeoutTestCase
  • Loading branch information
fl4via authored Jul 20, 2021
1 parent 21e6428 commit 4d3a8a3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 78 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/io/undertow/UndertowMessages.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,7 @@ public interface UndertowMessages {

@Message(id = 198, value = "Blocking write timed out after %s nanoseconds.")
WriteTimeoutException blockingWriteTimedOut(long timeoutNanoseconds);

@Message(id = 199, value = "Read timed out after %s milliseconds.")
ReadTimeoutException readTimedOut(long timeoutMilliseconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@

package io.undertow.conduits;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;

import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;
import io.undertow.UndertowOptions;
import io.undertow.server.OpenListener;
import io.undertow.util.WorkerUtils;

import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.Options;
import org.xnio.StreamConnection;
import org.xnio.XnioExecutor;
import org.xnio.channels.ReadTimeoutException;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.conduits.AbstractStreamSourceConduit;
import org.xnio.conduits.ConduitStreamSourceChannel;
import org.xnio.conduits.ReadReadyHandler;
import org.xnio.conduits.StreamSourceConduit;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;

/**
* Wrapper for read timeout. This should always be the first wrapper applied to the underlying channel.
*
Expand All @@ -53,6 +55,7 @@ public final class ReadTimeoutStreamSourceConduit extends AbstractStreamSourceCo
private final OpenListener openListener;

private static final int FUZZ_FACTOR = 50; //we add 50ms to the timeout to make sure the underlying channel has actually timed out
private volatile boolean expired;

private final Runnable timeoutCommand = new Runnable() {
@Override
Expand All @@ -68,13 +71,20 @@ public void run() {
return;
}
UndertowLogger.REQUEST_LOGGER.tracef("Timing out channel %s due to inactivity", connection.getSourceChannel());
IoUtils.safeClose(connection);
if (connection.getSourceChannel().isReadResumed()) {
ChannelListeners.invokeChannelListener(connection.getSourceChannel(), connection.getSourceChannel().getReadListener());
synchronized (ReadTimeoutStreamSourceConduit.this) {
expired = true;
}
boolean readResumed = connection.getSourceChannel().isReadResumed();
ChannelListener<? super ConduitStreamSourceChannel> readListener = connection.getSourceChannel().getReadListener();

if (readResumed) {
ChannelListeners.invokeChannelListener(connection.getSourceChannel(), readListener);
}
if (connection.getSinkChannel().isWriteResumed()) {
ChannelListeners.invokeChannelListener(connection.getSinkChannel(), connection.getSinkChannel().getWriteListener());
}
// close only after invoking listeners, to allow space for listener getting ReadTimeoutException
IoUtils.safeClose(connection);
}
};

Expand Down Expand Up @@ -108,56 +118,63 @@ private void handleReadTimeout(final long ret) throws IOException {
cleanup();
return;
}
if(ret == -1) {
if (ret == -1) {
cleanup();
return;
}
if (ret == 0 && handle != null) {
return;
}
Integer timeout = getTimeout();
if (timeout == null || timeout <= 0) {
return;
}
long currentTime = System.currentTimeMillis();
long expireTimeVar = expireTime;
if (expireTimeVar != -1 && currentTime > expireTimeVar) {
IoUtils.safeClose(connection);
throw new ClosedChannelException();
final long currentTime = System.currentTimeMillis();
if (ret == 0) {
final long expireTimeVar = expireTime;
if (expireTimeVar != -1 && currentTime > expireTimeVar) {
IoUtils.safeClose(connection);
throw UndertowMessages.MESSAGES.readTimedOut(this.getTimeout());
}
}
expireTime = currentTime + timeout;
if (handle == null) {
handle = connection.getIoThread().executeAfter(timeoutCommand, timeout, TimeUnit.MILLISECONDS);
}
}

@Override
public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
checkExpired();
long ret = super.transferTo(position, count, target);
handleReadTimeout(ret);
return ret;
}

@Override
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
checkExpired();
long ret = super.transferTo(count, throughBuffer, target);
handleReadTimeout(ret);
return ret;
}

@Override
public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
checkExpired();
long ret = super.read(dsts, offset, length);
handleReadTimeout(ret);
return ret;
}

@Override
public int read(final ByteBuffer dst) throws IOException {
checkExpired();
int ret = super.read(dst);
handleReadTimeout(ret);
return ret;
}

@Override
public void awaitReadable() throws IOException {
checkExpired();
Integer timeout = getTimeout();
if (timeout != null && timeout > 0) {
super.awaitReadable(timeout + FUZZ_FACTOR, TimeUnit.MILLISECONDS);
Expand All @@ -168,6 +185,7 @@ public void awaitReadable() throws IOException {

@Override
public void awaitReadable(long time, TimeUnit timeUnit) throws IOException {
checkExpired();
Integer timeout = getTimeout();
if (timeout != null && timeout > 0) {
long millis = timeUnit.toMillis(time);
Expand Down Expand Up @@ -195,49 +213,34 @@ private Integer getTimeout() {

@Override
public void terminateReads() throws IOException {
checkExpired();
super.terminateReads();
cleanup();
}

private void cleanup() {
if(handle != null) {
if (handle != null) {
handle.remove();
handle = null;
expireTime = -1;
}
}

@Override
public void resumeReads() {
super.resumeReads();
handleResumeTimeout();
}

@Override
public void suspendReads() {
super.suspendReads();
XnioExecutor.Key handle = this.handle;
if(handle != null) {
handle.remove();
this.handle = null;
}
cleanup();
}

@Override
public void wakeupReads() {
super.wakeupReads();
handleResumeTimeout();
private void checkExpired() throws ReadTimeoutException {
synchronized (this) {
if (expired) {
throw UndertowMessages.MESSAGES.readTimedOut(System.currentTimeMillis());
}
}
}

private void handleResumeTimeout() {
Integer timeout = getTimeout();
if (timeout == null || timeout <= 0) {
return;
}
long currentTime = System.currentTimeMillis();
expireTime = currentTime + timeout;
XnioExecutor.Key key = handle;
if (key == null) {
handle = connection.getIoThread().executeAfter(timeoutCommand, timeout, TimeUnit.MILLISECONDS);
}
public String toString() {
return super.toString() + " (next: " + next + ")";
}
}
83 changes: 51 additions & 32 deletions core/src/test/java/io/undertow/server/ReadTimeoutTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,25 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.SocketException;
import java.nio.channels.Channel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.undertow.testutils.DefaultServer;
import io.undertow.testutils.HttpOneOnly;
import io.undertow.testutils.TestHttpClient;
import io.undertow.util.Headers;
import io.undertow.util.StringWriteChannelListener;
import io.undertow.testutils.TestHttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.AbstractHttpEntity;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.channels.ReadTimeoutException;
import org.xnio.channels.StreamSinkChannel;
Expand All @@ -49,59 +50,55 @@
* Tests read timeout with a slow request
*
* @author Stuart Douglas
* @author Flavia Rainone
*/
@RunWith(DefaultServer.class)
@HttpOneOnly
@Ignore
public class ReadTimeoutTestCase {

private volatile Exception exception;
private static final CountDownLatch errorLatch = new CountDownLatch(1);

@DefaultServer.BeforeServerStarts
public static void beforeClass() {
DefaultServer.setServerOptions(OptionMap.create(Options.READ_TIMEOUT, 10));
}

@DefaultServer.AfterServerStops
public static void afterClass() {
DefaultServer.setServerOptions(OptionMap.EMPTY);
}

@Test
public void testReadTimeout() throws IOException, InterruptedException {
DefaultServer.setRootHandler(new HttpHandler() {
@Override
public void handleRequest(final HttpServerExchange exchange) throws Exception {
public void testReadTimeout() throws InterruptedException, IOException {
final CountDownLatch errorLatch = new CountDownLatch(1);
DefaultServer.setRootHandler((final HttpServerExchange exchange) -> {
final StreamSinkChannel response = exchange.getResponseChannel();
final StreamSourceChannel request = exchange.getRequestChannel();
try {
request.setOption(Options.READ_TIMEOUT, 100);
} catch (IOException e) {
throw new RuntimeException(e);
}

request.getReadSetter().set(ChannelListeners.drainListener(Long.MAX_VALUE, new ChannelListener<Channel>() {
@Override
public void handleEvent(final Channel channel) {
request.getReadSetter().set(ChannelListeners.drainListener(Long.MAX_VALUE, (final Channel channel) -> {
new StringWriteChannelListener("COMPLETED") {
@Override
protected void writeDone(final StreamSinkChannel channel) {
exchange.endExchange();
}
}.setup(response);
}
}, new ChannelExceptionHandler<StreamSourceChannel>() {
@Override
public void handleException(final StreamSourceChannel channel, final IOException e) {
}, (final StreamSourceChannel channel, final IOException e) -> {
e.printStackTrace();
exchange.endExchange();
exception = e;
errorLatch.countDown();
}
}
));
request.wakeupReads();

}
});
});

final TestHttpClient client = new TestHttpClient();
try {
HttpPost post = new HttpPost(DefaultServer.getDefaultServerURL());
post.setEntity(new AbstractHttpEntity() {

@Override
public InputStream getContent() throws IOException, IllegalStateException {
public InputStream getContent() throws IllegalStateException {
return null;
}

Expand Down Expand Up @@ -134,18 +131,40 @@ public long getContentLength() {
}
});
post.addHeader(Headers.CONNECTION_STRING, "close");
boolean socketFailure = false;
try {
client.execute(post);
} catch (IOException e) {

} catch (SocketException e) {
Assert.assertTrue(e.getMessage(), e.getMessage().contains("Broken pipe")
|| e.getMessage().contains("connection abort"));
socketFailure = true;
}
Assert.assertTrue("Test sent request without any exception", socketFailure);
if (errorLatch.await(5, TimeUnit.SECONDS)) {
Assert.assertEquals(ReadTimeoutException.class, exception.getClass());
} else {
Assert.fail("Read did not time out");
Assert.assertTrue(getExceptionDescription(exception), exception instanceof ReadTimeoutException ||
(DefaultServer.isProxy() && exception instanceof IOException));
if (exception.getSuppressed() != null && exception.getSuppressed().length > 0) {
for (Throwable supressed : exception.getSuppressed()) {
Assert.assertEquals(getExceptionDescription(supressed), ReadTimeoutException.class, exception.getClass());
}
}
} else if (!DefaultServer.isProxy()) {
// ignore if proxy, because when we're on proxy, we might not be able to see the exception
Assert.fail("Did not get ReadTimeoutException");
}
} finally {
client.getConnectionManager().shutdown();
}
}

// TODO move this to an utility class
private String getExceptionDescription(Throwable exception) {
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
exception.printStackTrace(pw);
return pw.toString();
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
}
}

0 comments on commit 4d3a8a3

Please sign in to comment.