Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel buffers are altered before writing to OS if exceeding Jersey's buffering size #18

Open
scholzi100 opened this issue May 4, 2021 · 13 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@scholzi100
Copy link
Contributor

Current unit tests only test a very small content transfers from the server to client and visa-versa. I added a test sending lots of data (multiple megabytes), soon I noticed that the content was not received as created in JAX-RS.

After some testing, I switched the ByteBufCreator from Unpooled.wrappedBuffer to Unpooled.copiedBuffer, this made the test work.
I did some searching and found out that http body content does only get corrupted after going over 8192 bytes (Jersey's default buffering size in CommittingOutputStream). Adding a trace log to the ByteBufCreator revealed that after going over the buffer size, Jersey uses direct writing (from the same byte array rest.log). I created a test over at scholzi100@68394b5 as a prototype, checking this behaviour.

This some what breaks the one of the main features "no copying of byte arrays", the current solutions I see are:

  • turn the Jersey buffer to a massive size
  • using Unpooled.copiedBuffer as default ByteBufCreator (and turning Jersey buffering off by default)
  • some how skip CommittingOutputStream (to reduce coping from and to it) and directly copying to Netty's ByteBuf

Let me know if there a any further solutions.

@ljnelson
Copy link
Member

ljnelson commented May 4, 2021

Oh this is interesting; thanks for the report. I'll definitely investigate as well as I have time.

@ljnelson
Copy link
Member

ljnelson commented May 4, 2021

A quick thought (which may very well be wrong): I wonder if there is something you could do with the flushThreshold:

If that's set to Integer.MAX_VALUE maybe things would work? I'll try messing around with this after my day job today.

@scholzi100
Copy link
Contributor Author

Just tested setting flushThreshold to int max, behavior did not change.

@ljnelson
Copy link
Member

ljnelson commented May 4, 2021

Same sort of thing, just because it's easy to try: set it to 0?

case 0:
// Flush previous writes, if any
this.channelOutboundInvoker.flush();
break;

@scholzi100
Copy link
Contributor Author

Well it did not change any thing, just more flushing. The buffer is already altered before flushing.

@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

OK. I'd like to make sure I fully understand the observed problem.

You are saying: inside a JAX-RS resource, something begins writing 9K of data to an OutputStream, as it is allowed to do. When this happens, the observed effect is that the client does not receive the data so written. Is that correct?

Whether Jersey is doing direct writes or not, you can see that ultimately it is sending bytes out over an OutputStream:

https://github.com/eclipse-ee4j/jersey/blob/7b6d2f84391f24310baae7d8e55e644703642826/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java#L198-L200

I'm not sure what microbean-jersey-netty can do here. By contract, all I can supply is an OutputStream so that its write method may be called by Jersey whenever it wants, however it wants. So, I do that:

@Override
public final void write(final byte[] bytes, final int offset, final int length) throws IOException {
if (offset < 0 || length < 0 || offset + length > bytes.length) {
throw new IndexOutOfBoundsException();
}
this.write(this.createMessage(bytes, offset, length), length);
}

This does, of course, involve creating byte arrays, because that's in the contract of the OutputStream class (you can't write a bunch of bytes without creating a byte array).

Anyway, this write method implementation ultimately involves sending a "message" (usually a ByteBuf) to the channel:

I'm not sure how changing the kind of ByteBuf would solve any problems. I believe you, but I can't understand how that would work.

I'm not sure what else can be done or changed in this implementation. I also think maybe I'm misunderstanding you. Is there a problem here? Maybe you have found a Jersey bug? I apologize if I have not understood the issue.

@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

The other thing I just double-checked is that one can write directly to the channelOutboundInvoker which in all cases will be an AbstractChannelHandlerContext, and as you can see it handles the event loop checking for us:

https://github.com/netty/netty/blob/03c2644c19ca7f800caaf75488c02a92b818dbde/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java#L788-L804

So given all of that, I'm not sure how to proceed. Do you have any ideas?

@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

(Notes to myself.)

Here is (the only place) where Jersey interacts with microbean-jersey-netty on the outbound path:

https://github.com/eclipse-ee4j/jersey/blob/7b6d2f84391f24310baae7d8e55e644703642826/core-server/src/main/java/org/glassfish/jersey/server/ServerRuntime.java#L625

In HTTP 1.1 that calls this:

@Override
public final OutputStream writeResponseStatusAndHeaders(final long contentLength,
final ContainerResponse containerResponse) {
final OutputStream returnValue;
if (this.writeStatusAndHeaders(contentLength, Objects.requireNonNull(containerResponse)) && contentLength != 0L) {
returnValue = this.createOutputStream(contentLength, containerResponse);
} else {
returnValue = null;
}
return returnValue;
}

That calls this:

@Override
protected final boolean writeStatusAndHeaders(final long contentLength,
final ContainerResponse containerResponse) {
final ContainerRequest containerRequest = containerResponse.getRequestContext();
if (containerRequest == null) {
throw new IllegalArgumentException("containerResponse.getRequestContext() == null");
}
final HttpRequest httpRequest;
final Object httpRequestValue = containerRequest.getProperty(HttpRequest.class.getName());
if (!(httpRequestValue instanceof HttpRequest)) {
throw new IllegalArgumentException("containerResponse; !(containerResponse.getRequestContext().getProperty(\"" +
HttpRequest.class.getName() +
"\") instanceof HttpRequest): " + httpRequestValue);
} else {
httpRequest = (HttpRequest)httpRequestValue;
}
final HttpVersion httpVersion = httpRequest.protocolVersion();
this.httpVersion = httpVersion;
final HttpResponseStatus status;
final StatusType responseStatusType = containerResponse.getStatusInfo();
if (responseStatusType == null) {
status = HttpResponseStatus.valueOf(containerResponse.getStatus());
} else {
final String reasonPhrase = responseStatusType.getReasonPhrase();
if (reasonPhrase == null) {
status = HttpResponseStatus.valueOf(containerResponse.getStatus());
} else {
status = HttpResponseStatus.valueOf(containerResponse.getStatus(), reasonPhrase);
}
}
final HttpMessage httpResponse;
final boolean needsOutputStream;
if (contentLength < 0L) {
needsOutputStream = !HttpMethod.HEAD.equalsIgnoreCase(containerRequest.getMethod());
httpResponse = new DefaultHttpResponse(httpVersion, status);
copyHeaders(containerResponse.getStringHeaders(), httpResponse.headers());
HttpUtil.setTransferEncodingChunked(httpResponse, true);
} else if (contentLength == 0L) {
needsOutputStream = false;
httpResponse = new DefaultFullHttpResponse(httpVersion, status);
copyHeaders(containerResponse.getStringHeaders(), httpResponse.headers());
HttpUtil.setContentLength(httpResponse, 0L);
} else {
needsOutputStream = !HttpMethod.HEAD.equalsIgnoreCase(containerRequest.getMethod());
httpResponse = new DefaultHttpResponse(httpVersion, status);
copyHeaders(containerResponse.getStringHeaders(), httpResponse.headers());
HttpUtil.setContentLength(httpResponse, contentLength);
}
if (HttpUtil.isKeepAlive(httpRequest)) {
HttpUtil.setKeepAlive(httpResponse, true);
}
final ChannelHandlerContext channelHandlerContext = Objects.requireNonNull(this.getChannelHandlerContext());
final ChannelPromise channelPromise = channelHandlerContext.newPromise();
assert channelPromise != null;
channelPromise.addListener(listener);
// Remember that
// AbstractContainerRequestHandlingResponseWriter#channelReadComplete(ChannelHandlerContext)
// will call ChannelHandlerContext#flush() in all cases.
channelHandlerContext.write(httpResponse, channelPromise);
return needsOutputStream;
}

…and then this:

@Override
protected final AbstractChannelOutboundInvokingOutputStream<? extends HttpContent> createOutputStream(final long contentLength,
final ContainerResponse containerResponse) {
if (contentLength == 0L) {
throw new IllegalArgumentException("contentLength == 0L");
}
return new ByteBufBackedChannelOutboundInvokingHttpContentOutputStream(this.getChannelHandlerContext(),
this.getFlushThreshold(),
false,
this.getByteBufCreator()) {
@Override
protected final ChannelPromise newPromise() {
final ChannelPromise returnValue = super.newPromise();
if (returnValue != null && !returnValue.isVoid()) {
returnValue.addListener(listener);
}
return returnValue;
}
};
}

If changing the kind of ByteBuf fixes the problem, this suggests that something in Jersey somewhere is reusing a byte[] when a write call is issued to the OutputStream on the Jersey thread, but, perhaps, only in the directWrite case. Need to dig to see where the write is actually being called. It will be a MessageBodyWriter of some kind, I guess.

@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

Hmm, I wonder if this is the cause of the problem:

https://github.com/eclipse-ee4j/jersey/blob/7b6d2f84391f24310baae7d8e55e644703642826/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java#L89-L95

On the Jersey thread, this byte[] comes in to my OutputStream implementation. I use (by default) a ByteBuf that simply wraps it. That ByteBuf is then handed off to the event loop (another thread). Meanwhile, the Jersey thread reuses that byte[] and "fills" it with new content. I think this would result in some kind of corruption.

This would not happen in the case where Jersey buffers, because all of the OutputStream write calls would occur on the CommittingOutputStream where they would be batched up and then committed at the end in one shot.

@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

(More notes to myself.)

OK, I think I now understand the problem. I will now see if there is an easy way to detect if the current write will be the only one, ever. If so, then no copying is needed. Otherwise copying is needed.

@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

OK, @scholzi100, yuck. 😄

The quickest workaround for you is—as you've already discovered—using Unpooled::copiedBuffer as your ByteBufCreator. If Jersey buffering is on, then this will perform, in one case, an unnecessary byte array copy. That case is when the entity fully fits within the Jersey buffer size. In that case, only one write call will be made and so it is unnecessary to copy the byte array. Again, this is only in one case.

In all other cases, because the Jersey thread is not the same as the Netty event loop thread, the byte[] that is supplied to my OutputStream::write implementation must be copied (when it is turned into a ByteBuf) because it is reused by Jersey, and there is no way to override or replace this reuse.

A longer workaround would be to supply a custom ByteBufCreator implementation that somehow can tell what Jersey's buffer size is. If its toByteBuf method can detect that the byte[] supplied to it is less than that size, then I think you can use Unpooled::wrappedBuffer in that case, and Unpooled::copiedBuffer in all others.

The proper fix for all this is a breaking change, which I won't be making for now. The breaking change would be to augment the ByteBufCreator abstract method to accept something like a boolean indicating whether further writes are going to happen.

The intermediate fix is to change the default ByteBufCreator implementation to Unpooled::copiedBuffer, because that is the safest.

Hopefully this all makes sense.

@ljnelson ljnelson self-assigned this May 5, 2021
@ljnelson ljnelson added the bug Something isn't working label May 5, 2021
@ljnelson ljnelson added this to the Next Release milestone May 5, 2021
@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

Finally, for completeness, there is no way (other than really retooling a lot of Jersey's innards) to avoid CommittingOutputStream. Eventually, though, we should be considering contentLength here:

@Override
protected final AbstractChannelOutboundInvokingOutputStream<? extends HttpContent> createOutputStream(final long contentLength,
final ContainerResponse containerResponse) {
if (contentLength == 0L) {
throw new IllegalArgumentException("contentLength == 0L");
}
return new ByteBufBackedChannelOutboundInvokingHttpContentOutputStream(this.getChannelHandlerContext(),
this.getFlushThreshold(),
false,
this.getByteBufCreator()) {
@Override
protected final ChannelPromise newPromise() {
final ChannelPromise returnValue = super.newPromise();
if (returnValue != null && !returnValue.isVoid()) {
returnValue.addListener(listener);
}
return returnValue;
}
};
}

…rather than ignoring it (as we currently do). If, for example, getByteBufCreator() took a long contentLength we might be able to detect whether a wrapping or copying ByteBuf should be returned. Or something like that, anyway.

ljnelson added a commit that referenced this issue May 5, 2021
Signed-off-by: Laird Nelson <[email protected]>
@ljnelson
Copy link
Member

ljnelson commented May 5, 2021

I've switched the default ByteBufCreator to be basically the Unpooled::copiedBuffer method. I've made some other small improvements to hopefully lay the groundwork for future improvements in this area. I'm going to leave this issue open.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants