From 8dcfed4836d26c57ef78be68214d186e9ca78b84 Mon Sep 17 00:00:00 2001 From: Venkat <39379800+010gvr@users.noreply.github.com> Date: Fri, 29 Nov 2019 02:58:02 -0500 Subject: [PATCH] Rewritten Netty Jersey implementation using direct ByteBuf consumption (#4312) Signed-off-by: Venkat Ganesh <010gvr@gmail.com> --- .../netty/connector/JerseyClientHandler.java | 21 ++- .../internal/JerseyChunkedInput.java | 7 +- .../connector/internal/NettyInputStream.java | 132 ++++++++---------- .../httpserver/JerseyHttp2ServerHandler.java | 12 +- .../netty/httpserver/JerseyServerHandler.java | 93 ++++++------ .../netty/httpserver/NettyResponseWriter.java | 8 +- 6 files changed, 132 insertions(+), 141 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java index 1a9cbe5f2f..5531498358 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java @@ -16,7 +16,6 @@ package org.glassfish.jersey.netty.connector; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -31,6 +30,7 @@ import org.glassfish.jersey.netty.connector.internal.NettyInputStream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpContent; @@ -50,7 +50,7 @@ class JerseyClientHandler extends SimpleChannelInboundHandler { private final NettyConnector connector; - private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); private final AsyncConnectorCallback asyncConnectorCallback; private final ClientRequest jerseyRequest; @@ -89,7 +89,7 @@ public String getReasonPhrase() { for (Map.Entry entry : response.headers().entries()) { jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue()); } - + isList.clear(); // clearing the content - possible leftover from previous request processing. // request entity handling. if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0) || HttpUtil.isTransferEncodingChunked(response)) { @@ -97,7 +97,7 @@ public String getReasonPhrase() { ctx.channel().closeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { - isList.add(NettyInputStream.END_OF_INPUT_ERROR); + isList.add(Unpooled.EMPTY_BUFFER); } }); @@ -123,21 +123,16 @@ public void run() { } if (msg instanceof HttpContent) { - HttpContent httpContent = (HttpContent) msg; ByteBuf content = httpContent.content(); - if (content.isReadable()) { - // copy bytes - when netty reads last chunk, it automatically closes the channel, which invalidates all - // relates ByteBuffs. - byte[] bytes = new byte[content.readableBytes()]; - content.getBytes(content.readerIndex(), bytes); - isList.add(new ByteArrayInputStream(bytes)); + content.retain(); + isList.add(content); } if (msg instanceof LastHttpContent) { - isList.add(NettyInputStream.END_OF_INPUT); + isList.add(Unpooled.EMPTY_BUFFER); } } } @@ -153,6 +148,6 @@ public void run() { }); } future.completeExceptionally(cause); - isList.add(NettyInputStream.END_OF_INPUT_ERROR); + ctx.close(); } } diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java index ef14ba3a1e..e5262d3474 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java @@ -42,10 +42,9 @@ public class JerseyChunkedInput extends OutputStream implements ChunkedInput, ChannelFutureListener { private static final ByteBuffer VOID = ByteBuffer.allocate(0); - private static final int CAPACITY = 8; - // TODO this needs to be configurable, see JERSEY-3228 - private static final int WRITE_TIMEOUT = 10000; - private static final int READ_TIMEOUT = 10000; + private static final int CAPACITY = Integer.getInteger("jersey.ci.capacity", 8); + private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.ci.read.timeout", 10000); + private static final int READ_TIMEOUT = Integer.getInteger("jersey.ci.write.timeout", 10000); private final LinkedBlockingDeque queue = new LinkedBlockingDeque<>(CAPACITY); private final Channel ctx; diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java index aab83b3535..741121f506 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java @@ -20,77 +20,47 @@ import java.io.InputStream; import java.util.concurrent.LinkedBlockingDeque; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + /** * Input stream which servers as Request entity input. *

- * Converts Netty NIO buffers to an input streams and stores them in the queue, - * waiting for Jersey to process it. - * - * @author Pavel Bucek + * Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey */ public class NettyInputStream extends InputStream { - private volatile boolean end = false; - - /** - * End of input. - */ - public static final InputStream END_OF_INPUT = new InputStream() { - @Override - public int read() throws IOException { - return 0; - } - - @Override - public String toString() { - return "END_OF_INPUT " + super.toString(); - } - }; - - /** - * Unexpected end of input. - */ - public static final InputStream END_OF_INPUT_ERROR = new InputStream() { - @Override - public int read() throws IOException { - return 0; - } - - @Override - public String toString() { - return "END_OF_INPUT_ERROR " + super.toString(); - } - }; - - private final LinkedBlockingDeque isList; + private final LinkedBlockingDeque isList; - public NettyInputStream(LinkedBlockingDeque isList) { + public NettyInputStream(LinkedBlockingDeque isList) { this.isList = isList; } - private interface ISReader { - int readFrom(InputStream take) throws IOException; - } - - private int readInternal(ISReader isReader) throws IOException { - if (end) { - return -1; - } + @Override + public int read(byte[] b, int off, int len) throws IOException { - InputStream take; + ByteBuf take; try { take = isList.take(); - - if (checkEndOfInput(take)) { + boolean isReadable = take.isReadable(); + int read = -1; + if (checkEndOfInputOrError(take)) { + take.release(); return -1; } - int read = isReader.readFrom(take); - - if (take.available() > 0) { - isList.addFirst(take); + if (isReadable) { + int readableBytes = take.readableBytes(); + read = Math.min(readableBytes, len); + take.readBytes(b, off, read); + if (read < len) { + take.release(); + } else { + isList.addFirst(take); + } } else { - take.close(); + read = 0; + take.release(); //We don't need `0` } return read; @@ -100,33 +70,53 @@ private int readInternal(ISReader isReader) throws IOException { } @Override - public int read(byte[] b, int off, int len) throws IOException { - return readInternal(take -> take.read(b, off, len)); + public int read() throws IOException { + + ByteBuf take; + try { + take = isList.take(); + boolean isReadable = take.isReadable(); + if (checkEndOfInputOrError(take)) { + take.release(); + return -1; + } + + if (isReadable) { + return take.readInt(); + } else { + take.release(); //We don't need `0` + } + + return 0; + } catch (InterruptedException e) { + throw new IOException("Interrupted.", e); + } } @Override - public int read() throws IOException { - return readInternal(InputStream::read); + public void close() throws IOException { + if (isList != null) { + while (!isList.isEmpty()) { + try { + isList.take().release(); + } catch (InterruptedException e) { + throw new IOException("Interrupted. Potential ByteBuf Leak.", e); + } + } + } + super.close(); } @Override public int available() throws IOException { - InputStream peek = isList.peek(); - if (peek != null) { - return peek.available(); + ByteBuf peek = isList.peek(); + if (peek != null && peek.isReadable()) { + return peek.readableBytes(); } - return 0; } - private boolean checkEndOfInput(InputStream take) throws IOException { - if (take == END_OF_INPUT) { - end = true; - return true; - } else if (take == END_OF_INPUT_ERROR) { - end = true; - throw new IOException("Connection was closed prematurely."); - } - return false; + private boolean checkEndOfInputOrError(ByteBuf take) throws IOException { + return take == Unpooled.EMPTY_BUFFER; } } diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java index b8da5fe88e..e64476db71 100644 --- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java +++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java @@ -28,8 +28,8 @@ import java.util.concurrent.LinkedBlockingDeque; import javax.ws.rs.core.SecurityContext; - -import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -55,7 +55,7 @@ class JerseyHttp2ServerHandler extends ChannelDuplexHandler { private final URI baseUri; - private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); private final NettyHttpContainer container; private final ResourceConfig resourceConfig; @@ -92,9 +92,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception * Process incoming data. */ private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception { - isList.add(new ByteBufInputStream(data.content(), true)); + isList.add(data.content()); if (data.isEndStream()) { - isList.add(NettyInputStream.END_OF_INPUT); + isList.add(Unpooled.EMPTY_BUFFER); } } @@ -163,7 +163,7 @@ public void removeProperty(String name) { ctx.channel().closeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { - isList.add(NettyInputStream.END_OF_INPUT_ERROR); + isList.add(Unpooled.EMPTY_BUFFER); } }); diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java index 06d1782503..712cb1f531 100644 --- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java +++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java @@ -16,16 +16,17 @@ package org.glassfish.jersey.netty.httpserver; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.MediaType; + import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -36,8 +37,6 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import org.glassfish.jersey.internal.PropertiesDelegate; import org.glassfish.jersey.netty.connector.internal.NettyInputStream; import org.glassfish.jersey.server.ContainerRequest; @@ -46,23 +45,26 @@ /** * {@link io.netty.channel.ChannelInboundHandler} which servers as a bridge - * between Netty and Jersey. + * between Netty and Jersey. Handles additional validation on the payload size + * that is controlled by a JVM property {@code max.http.request.entitySizeMb}. * - * @author Pavel Bucek + * @author Pavel Bucek (pavel.bucek at oracle.com) */ class JerseyServerHandler extends ChannelInboundHandlerAdapter { private final URI baseUri; - private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque isList = new LinkedBlockingDeque<>(); private final NettyHttpContainer container; private final ResourceConfig resourceConfig; + private static final long MAX_REQUEST_ENTITY_BYTES = Long.getLong("jersey.max.http.request.entitySizeMb", new Long(50000)) + .longValue() * 1024 * 1024; //50 MB default limit + /** * Constructor. * * @param baseUri base {@link URI} of the container (includes context path, if any). * @param container Netty container implementation. - * @param resourceConfig the application {@link ResourceConfig} */ public JerseyServerHandler(URI baseUri, NettyHttpContainer container, ResourceConfig resourceConfig) { this.baseUri = baseUri; @@ -85,6 +87,33 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { requestContext.setWriter(new NettyResponseWriter(ctx, req, container)); + long contentLength = req.headers().contains(HttpHeaderNames.CONTENT_LENGTH) ? HttpUtil.getContentLength(req) + : -1L; + if (contentLength >= MAX_REQUEST_ENTITY_BYTES) { + requestContext.abortWith(javax.ws.rs.core.Response.status(Status.REQUEST_ENTITY_TOO_LARGE).build()); + } else { + /** + * Jackson JSON decoder tries to read a minimum of 2 bytes (4 + * for BOM). So, during an empty or 1-byte input, we'd want to + * avoid reading the entity to safely handle this edge case by + * eventually throwing a malformed JSON exception. + */ + String contentType = req.headers().get(HttpHeaderNames.CONTENT_TYPE); + boolean isJson = contentType != null ? contentType.toLowerCase().contains(MediaType.APPLICATION_JSON) + : false; + //process entity streams only if there is an entity issued in the request (i.e., content-length >=0). + //Otherwise, it's safe to discard during next processing + if ((!isJson && contentLength != -1) || HttpUtil.isTransferEncodingChunked(req) + || (isJson && contentLength >= 2)) { + requestContext.setEntityStream(new NettyInputStream(isList)); + } + } + + // copying headers from netty request to jersey container request context. + for (String name : req.headers().names()) { + requestContext.headers(name, req.headers().getAll(name)); + } + // must be like this, since there is a blocking read from Jersey container.getExecutorService().execute(new Runnable() { @Override @@ -95,18 +124,17 @@ public void run() { } if (msg instanceof HttpContent) { - HttpContent httpContent = (HttpContent) msg; - - ByteBuf content = httpContent.content(); + HttpContent httpContent = (HttpContent) msg; - if (content.isReadable()) { - isList.add(new ByteBufInputStream(content, true)); - } + ByteBuf content = httpContent.content(); + if (content.isReadable()) { + isList.add(content); + } - if (msg instanceof LastHttpContent) { - isList.add(NettyInputStream.END_OF_INPUT); - } - } + if (msg instanceof LastHttpContent) { + isList.add(Unpooled.EMPTY_BUFFER); + } + } } /** @@ -148,35 +176,10 @@ public void removeProperty(String name) { } }, resourceConfig); - // request entity handling. - if ((req.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(req) > 0) - || HttpUtil.isTransferEncodingChunked(req)) { - - ctx.channel().closeFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - isList.add(NettyInputStream.END_OF_INPUT_ERROR); - } - }); - - requestContext.setEntityStream(new NettyInputStream(isList)); - } else { - requestContext.setEntityStream(new InputStream() { - @Override - public int read() throws IOException { - return -1; - } - }); - } - - // copying headers from netty request to jersey container request context. - for (String name : req.headers().names()) { - requestContext.headers(name, req.headers().getAll(name)); - } - return requestContext; } + private NettySecurityContext getSecurityContext(ChannelHandlerContext ctx) { return new NettySecurityContext(ctx); } diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java index 00998c347b..e2a4931bb8 100644 --- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java +++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java @@ -46,7 +46,7 @@ /** * Netty implementation of {@link ContainerResponseWriter}. * - * @author Pavel Bucek + * @author Pavel Bucek (pavel.bucek at oracle.com) */ class NettyResponseWriter implements ContainerResponseWriter { @@ -119,7 +119,11 @@ public synchronized OutputStream writeResponseStatusAndHeaders(long contentLengt JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ctx.channel()); - ctx.write(new HttpChunkedInput(jerseyChunkedInput)).addListener(FLUSH_FUTURE); + if (HttpUtil.isTransferEncodingChunked(response)) { + ctx.writeAndFlush(new HttpChunkedInput(jerseyChunkedInput)); + } else { + ctx.write(new HttpChunkedInput(jerseyChunkedInput)).addListener(FLUSH_FUTURE); + } return jerseyChunkedInput; } else {