Skip to content

Commit

Permalink
HttpCacheClient: make upload async
Browse files Browse the repository at this point in the history
This matches the behavior for download, and improves performance for actions
with a lot of outputs.

The code was already using futures in the API and in the implementation,
but the glue code blocked on the internal future and returned an immediate
future to the caller, rather than doing it async.

Fixes #6091.

Change-Id: I3151aa96b879323e0000d3209f6b9bc8be0066d4

Closes #12115.

Change-Id: I72032f7b3e88de4d2e2b5abd2d89380f7fcc31e5
PiperOrigin-RevId: 332796965
  • Loading branch information
ulfjack authored and copybara-github committed Sep 21, 2020
1 parent 4c0febc commit 159c3fe
Showing 1 changed file with 123 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
import com.google.auth.Credentials;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashingOutputStream;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -31,7 +33,6 @@
import com.google.protobuf.ByteString;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -66,7 +67,6 @@
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -285,7 +285,7 @@ public void channelCreated(Channel ch) {
}

@SuppressWarnings("FutureReturnValueIgnored")
private Channel acquireUploadChannel() throws InterruptedException {
private Promise<Channel> acquireUploadChannel() {
Promise<Channel> channelReady = eventLoop.next().newPromise();
channelPool
.acquire()
Expand All @@ -297,52 +297,46 @@ private Channel acquireUploadChannel() throws InterruptedException {
}

try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
Channel channel = channelAcquired.getNow();
ChannelPipeline pipeline = channel.pipeline();

if (!isChannelPipelineEmpty(p)) {
if (!isChannelPipelineEmpty(pipeline)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}

p.addFirst(
pipeline.addFirst(
"timeout-handler",
new IdleTimeoutHandler(timeoutSeconds, WriteTimeoutException.INSTANCE));
p.addLast(new HttpResponseDecoder());
pipeline.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen arbitrarily. We only expect HTTP servers to respond
// with an error message in the body, and that should always be less than 10KiB. If
// the response is larger than 10KiB, HttpUploadHandler will catch the
// TooLongFrameException that HttpObjectAggregator throws and convert it to an
// IOException.
p.addLast(new HttpObjectAggregator(10 * 1024));
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(10 * 1024));
pipeline.addLast(new HttpRequestEncoder());
pipeline.addLast(new ChunkedWriteHandler());
synchronized (credentialsLock) {
p.addLast(new HttpUploadHandler(creds, extraHttpHeaders));
pipeline.addLast(new HttpUploadHandler(creds, extraHttpHeaders));
}

if (!ch.eventLoop().inEventLoop()) {
if (!channel.eventLoop().inEventLoop()) {
// If addLast is called outside an event loop, then it doesn't complete until the
// event loop is run again. In that case, a message sent to the last handler gets
// delivered to the last non-pending handler, which will most likely end up
// throwing UnsupportedMessageTypeException. Therefore, we only complete the
// promise in the event loop.
ch.eventLoop().execute(() -> channelReady.setSuccess(ch));
channel.eventLoop().execute(() -> channelReady.setSuccess(channel));
} else {
channelReady.setSuccess(ch);
channelReady.setSuccess(channel);
}
} catch (Throwable t) {
channelReady.setFailure(t);
}
});

try {
return channelReady.get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
return channelReady;
}

@SuppressWarnings("FutureReturnValueIgnored")
Expand Down Expand Up @@ -378,32 +372,32 @@ private Future<Channel> acquireDownloadChannel() {
}

try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
Channel channel = channelAcquired.getNow();
ChannelPipeline pipeline = channel.pipeline();

if (!isChannelPipelineEmpty(p)) {
if (!isChannelPipelineEmpty(pipeline)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}
p.addFirst(
pipeline.addFirst(
"timeout-handler",
new IdleTimeoutHandler(timeoutSeconds, ReadTimeoutException.INSTANCE));
p.addLast(new HttpClientCodec());
p.addLast("inflater", new HttpContentDecompressor());
pipeline.addLast(new HttpClientCodec());
pipeline.addLast("inflater", new HttpContentDecompressor());
synchronized (credentialsLock) {
p.addLast(new HttpDownloadHandler(creds, extraHttpHeaders));
pipeline.addLast(new HttpDownloadHandler(creds, extraHttpHeaders));
}

if (!ch.eventLoop().inEventLoop()) {
if (!channel.eventLoop().inEventLoop()) {
// If addLast is called outside an event loop, then it doesn't complete until the
// event loop is run again. In that case, a message sent to the last handler gets
// delivered to the last non-pending handler, which will most likely end up
// throwing UnsupportedMessageTypeException. Therefore, we only complete the
// promise in the event loop.
ch.eventLoop().execute(() -> channelReady.setSuccess(ch));
channel.eventLoop().execute(() -> channelReady.setSuccess(channel));
} else {
channelReady.setSuccess(ch);
channelReady.setSuccess(channel);
}
} catch (Throwable t) {
channelReady.setFailure(t);
Expand Down Expand Up @@ -491,13 +485,13 @@ public void flush() throws IOException {
SettableFuture<Void> outerF = SettableFuture.create();
acquireDownloadChannel()
.addListener(
(Future<Channel> chP) -> {
if (!chP.isSuccess()) {
outerF.setException(chP.cause());
(Future<Channel> channelPromise) -> {
if (!channelPromise.isSuccess()) {
outerF.setException(channelPromise.cause());
return;
}

Channel ch = chP.getNow();
Channel ch = channelPromise.getNow();
ch.writeAndFlush(downloadCmd)
.addListener(
(f) -> {
Expand Down Expand Up @@ -537,13 +531,13 @@ public void flush() throws IOException {
private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture<Void> outerF) {
acquireDownloadChannel()
.addListener(
(Future<Channel> chP) -> {
if (!chP.isSuccess()) {
outerF.setException(chP.cause());
(Future<Channel> channelPromise) -> {
if (!channelPromise.isSuccess()) {
outerF.setException(channelPromise.cause());
return;
}

Channel ch = chP.getNow();
Channel ch = channelPromise.getNow();
ch.writeAndFlush(cmd)
.addListener(
(f) -> {
Expand Down Expand Up @@ -576,8 +570,8 @@ public ListenableFuture<ActionResult> downloadActionResult(
}

@SuppressWarnings("FutureReturnValueIgnored")
private void uploadBlocking(String key, long length, InputStream in, boolean casUpload)
throws IOException, InterruptedException {
private ListenableFuture<Void> uploadAsync(
String key, long length, InputStream in, boolean casUpload) {
InputStream wrappedIn =
new FilterInputStream(in) {
@Override
Expand All @@ -588,85 +582,99 @@ public void close() {
}
};
UploadCommand upload = new UploadCommand(uri, casUpload, key, wrappedIn, length);
Channel ch = null;
boolean success = false;
if (storedBlobs.putIfAbsent((casUpload ? CAS_PREFIX : AC_PREFIX) + key, true) == null) {
try {
ch = acquireUploadChannel();
ChannelFuture uploadFuture = ch.writeAndFlush(upload);
uploadFuture.sync();
success = true;
} catch (Exception e) {
// e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a
// checked exception that hasn't been declared in the method signature.
if (e instanceof HttpException) {
HttpResponse response = ((HttpException) e).response();
if (authTokenExpired(response)) {
refreshCredentials();
// The error is due to an auth token having expired. Let's try again.
if (!reset(in)) {
// The InputStream can't be reset and thus we can't retry as most likely
// bytes have already been read from the InputStream.
throw e;
}
putAfterCredentialRefresh(upload);
success = true;
return;
}
}
throw e;
} finally {
if (!success) {
storedBlobs.remove(key);
}
in.close();
if (ch != null) {
releaseUploadChannel(ch);
}
}
SettableFuture<Void> result = SettableFuture.create();
acquireUploadChannel()
.addListener(
(Future<Channel> channelPromise) -> {
if (!channelPromise.isSuccess()) {
result.setException(channelPromise.cause());
return;
}

Channel ch = channelPromise.getNow();
ch.writeAndFlush(upload)
.addListener(
(f) -> {
releaseUploadChannel(ch);
if (f.isSuccess()) {
result.set(null);
} else {
Throwable cause = f.cause();
if (cause instanceof HttpException) {
HttpResponse response = ((HttpException) cause).response();
try {
// If the error is due to an expired auth token and we can reset
// the input stream, then try again.
if (authTokenExpired(response) && reset(in)) {
refreshCredentials();
uploadAfterCredentialRefresh(upload, result);
} else {
result.setException(cause);
}
} catch (IOException e) {
result.setException(e);
}
} else {
result.setException(cause);
}
}
});
});
result.addListener(() -> Closeables.closeQuietly(in), MoreExecutors.directExecutor());
return result;
} else {
Closeables.closeQuietly(in);
return Futures.immediateFuture(null);
}
}

@SuppressWarnings("FutureReturnValueIgnored")
private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture<Void> result) {
acquireUploadChannel()
.addListener(
(Future<Channel> channelPromise) -> {
if (!channelPromise.isSuccess()) {
result.setException(channelPromise.cause());
return;
}

Channel ch = channelPromise.getNow();
ch.writeAndFlush(upload)
.addListener(
(f) -> {
releaseUploadChannel(ch);
if (f.isSuccess()) {
result.set(null);
} else {
result.setException(f.cause());
}
});
});
}

@Override
public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
try (InputStream in = file.getInputStream()) {
uploadBlocking(digest.getHash(), digest.getSizeBytes(), in, /* casUpload= */ true);
} catch (IOException | InterruptedException e) {
try {
return uploadAsync(
digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
} catch (IOException e) {
// Can be thrown from file.getInputStream.
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
}

@Override
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
try (InputStream in = data.newInput()) {
uploadBlocking(digest.getHash(), digest.getSizeBytes(), in, /* casUpload= */ true);
} catch (IOException | InterruptedException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
return uploadAsync(
digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true);
}

@Override
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
return Futures.immediateFuture(ImmutableSet.copyOf(digests));
}

@SuppressWarnings("FutureReturnValueIgnored")
private void putAfterCredentialRefresh(UploadCommand cmd) throws InterruptedException {
Channel ch = null;
try {
// TODO(buchgr): Look into simplifying the retry logic after a credentials refresh.
ch = acquireUploadChannel();
ChannelFuture uploadFuture = ch.writeAndFlush(cmd);
uploadFuture.sync();
} finally {
if (ch != null) {
releaseUploadChannel(ch);
}
}
}

private boolean reset(InputStream in) throws IOException {
if (in.markSupported()) {
in.reset();
Expand All @@ -684,8 +692,19 @@ private boolean reset(InputStream in) throws IOException {
public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException {
ByteString serialized = actionResult.toByteString();
try (InputStream in = serialized.newInput()) {
uploadBlocking(actionKey.getDigest().getHash(), serialized.size(), in, false);
ListenableFuture<Void> uploadFuture =
uploadAsync(
actionKey.getDigest().getHash(),
serialized.size(),
serialized.newInput(),
/* casUpload= */ false);
try {
uploadFuture.get();
} catch (ExecutionException e) {
Throwables.throwIfUnchecked(e.getCause());
Throwables.throwIfInstanceOf(e.getCause(), IOException.class);
Throwables.throwIfInstanceOf(e.getCause(), InterruptedException.class);
throw new IOException(e.getCause());
}
}

Expand Down

0 comments on commit 159c3fe

Please sign in to comment.