From ba9e2f8fe666b4fc6110d86ea93cc11779e77068 Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Tue, 7 Feb 2023 03:28:47 -0800 Subject: [PATCH] Remove usage of gRPC Context cancellation in the remote execution client. The gRPC remote execution client frequently "converts" gRPC calls into `ListenableFuture`s by setting a `SettableFuture` in the `onCompleted` or `onError` gRPC stub callbacks. If the future has direct executor callbacks, those callbacks will execute with the gRPC Context of the freshly completed call. That is problematic if the `Context` was canceled (canceling the call `Context` is good hygiene after completing a gRPC call), and the future callback goes to make further gRPC calls. Therefore, this change removes all usage of gRPC `Context` cancellation. It would be nice if there was instead some way to avoid leaking `Context`s between calls instead of having totally forswear `Context` cancellation. However, I can't see a good way to enforce proper isolation. Fixes https://github.com/bazelbuild/bazel/issues/17298. Closes #17426. PiperOrigin-RevId: 507730469 Change-Id: Iea74acad4592952700e41d34672f6478de509d5e --- .../build/lib/remote/ByteStreamUploader.java | 34 ++-- .../build/lib/remote/GrpcCacheClient.java | 159 +++++++++--------- 2 files changed, 96 insertions(+), 97 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 22b3706eb2f994..b7774ecf942a19 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -42,8 +42,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; import io.grpc.Channel; -import io.grpc.Context; -import io.grpc.Context.CancellableContext; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -231,7 +229,6 @@ private ListenableFuture startAsyncUpload( ListenableFuture currUpload = newUpload.start(); currUpload.addListener( () -> { - newUpload.cancel(); if (openedFilePermits != null) { openedFilePermits.release(); } @@ -249,7 +246,6 @@ private static final class AsyncUpload implements AsyncCallable { private final String resourceName; private final Chunker chunker; private final ProgressiveBackoff progressiveBackoff; - private final CancellableContext grpcContext; private long lastCommittedOffset = -1; @@ -269,7 +265,6 @@ private static final class AsyncUpload implements AsyncCallable { this.progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff); this.resourceName = resourceName; this.chunker = chunker; - this.grpcContext = Context.current().withCancellation(); } ListenableFuture start() { @@ -369,13 +364,11 @@ private ListenableFuture query() { Futures.transform( channel.withChannelFuture( channel -> - grpcContext.call( - () -> - bsFutureStub(channel) - .queryWriteStatus( - QueryWriteStatusRequest.newBuilder() - .setResourceName(resourceName) - .build()))), + bsFutureStub(channel) + .queryWriteStatus( + QueryWriteStatusRequest.newBuilder() + .setResourceName(resourceName) + .build())), QueryWriteStatusResponse::getCommittedSize, MoreExecutors.directExecutor()); return Futures.catchingAsync( @@ -397,18 +390,10 @@ private ListenableFuture upload(long pos) { return channel.withChannelFuture( channel -> { SettableFuture uploadResult = SettableFuture.create(); - grpcContext.run( - () -> - bsAsyncStub(channel) - .write(new Writer(resourceName, chunker, pos, uploadResult))); + bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult)); return uploadResult; }); } - - void cancel() { - grpcContext.cancel( - Status.CANCELLED.withDescription("Cancelled by user").asRuntimeException()); - } } private static final class Writer @@ -432,6 +417,13 @@ private Writer( @Override public void beforeStart(ClientCallStreamObserver requestObserver) { this.requestObserver = requestObserver; + uploadResult.addListener( + () -> { + if (uploadResult.isCancelled()) { + requestObserver.cancel("cancelled by user", null); + } + }, + MoreExecutors.directExecutor()); requestObserver.setOnReadyHandler(this); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index c605335a8b81d4..dfd9c2a61a571c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -34,6 +34,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ascii; import com.google.common.base.Preconditions; +import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; @@ -58,11 +59,11 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.grpc.Channel; -import io.grpc.Context; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -371,81 +372,87 @@ private ListenableFuture requestRead( } catch (IOException e) { return Futures.immediateFailedFuture(e); } - Context.CancellableContext grpcContext = Context.current().withCancellation(); - future.addListener(() -> grpcContext.cancel(null), MoreExecutors.directExecutor()); - grpcContext.run( - () -> - bsAsyncStub(context, channel) - .read( - ReadRequest.newBuilder() - .setResourceName(resourceName) - .setReadOffset(rawOut.getCount()) - .build(), - new StreamObserver() { - @Override - public void onNext(ReadResponse readResponse) { - ByteString data = readResponse.getData(); - try { - data.writeTo(out); - } catch (IOException e) { - // Cancel the call. - throw new RuntimeException(e); - } - // reset the stall backoff because we've made progress or been kept alive - progressiveBackoff.reset(); - } - - @Override - public void onError(Throwable t) { - if (rawOut.getCount() == digest.getSizeBytes()) { - // If the file was fully downloaded, it doesn't matter if there was an - // error at - // the end of the stream. - logger.atInfo().withCause(t).log( - "ignoring error because file was fully received"); - onCompleted(); - return; - } - releaseOut(); - Status status = Status.fromThrowable(t); - if (status.getCode() == Status.Code.NOT_FOUND) { - future.setException(new CacheNotFoundException(digest)); - } else { - future.setException(t); - } - } - - @Override - public void onCompleted() { - try { - try { - out.flush(); - } finally { - releaseOut(); - } - if (digestSupplier != null) { - Utils.verifyBlobContents(digest, digestSupplier.get()); - } - } catch (IOException e) { - future.setException(e); - } catch (RuntimeException e) { - logger.atWarning().withCause(e).log("Unexpected exception"); - future.setException(e); - } - future.set(rawOut.getCount()); + bsAsyncStub(context, channel) + .read( + ReadRequest.newBuilder() + .setResourceName(resourceName) + .setReadOffset(rawOut.getCount()) + .build(), + new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + future.addListener( + () -> { + if (future.isCancelled()) { + requestStream.cancel("canceled by user", null); } - - private void releaseOut() { - if (out instanceof ZstdDecompressingOutputStream) { - try { - ((ZstdDecompressingOutputStream) out).closeShallow(); - } catch (IOException e) { - logger.atWarning().withCause(e).log( - "failed to cleanly close output stream"); - } - } - } - })); + }, + MoreExecutors.directExecutor()); + } + + @Override + public void onNext(ReadResponse readResponse) { + ByteString data = readResponse.getData(); + try { + data.writeTo(out); + } catch (IOException e) { + // Cancel the call. + throw new VerifyException(e); + } + // reset the stall backoff because we've made progress or been kept alive + progressiveBackoff.reset(); + } + + @Override + public void onError(Throwable t) { + if (rawOut.getCount() == digest.getSizeBytes()) { + // If the file was fully downloaded, it doesn't matter if there was an + // error at + // the end of the stream. + logger.atInfo().withCause(t).log( + "ignoring error because file was fully received"); + onCompleted(); + return; + } + releaseOut(); + Status status = Status.fromThrowable(t); + if (status.getCode() == Status.Code.NOT_FOUND) { + future.setException(new CacheNotFoundException(digest)); + } else { + future.setException(t); + } + } + + @Override + public void onCompleted() { + try { + try { + out.flush(); + } finally { + releaseOut(); + } + if (digestSupplier != null) { + Utils.verifyBlobContents(digest, digestSupplier.get()); + } + } catch (IOException e) { + future.setException(e); + } catch (RuntimeException e) { + logger.atWarning().withCause(e).log("Unexpected exception"); + future.setException(e); + } + future.set(rawOut.getCount()); + } + + private void releaseOut() { + if (out instanceof ZstdDecompressingOutputStream) { + try { + ((ZstdDecompressingOutputStream) out).closeShallow(); + } catch (IOException e) { + logger.atWarning().withCause(e).log("failed to cleanly close output stream"); + } + } + } + }); return future; }