Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 6.4.0 remote #18959

Merged
merged 5 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ ExecuteResponse start() throws IOException, InterruptedException {
// retrying when received a unauthenticated error, and propagate to refreshIfUnauthenticated
// which will then call retrier again. It will reset the retry time counter so we could
// retry more than --remote_retry times which is not expected.
response =
retrier.execute(
() -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider),
executeBackoff);
if (lastOperation == null) {
response =
retrier.execute(
() -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider),
executeBackoff);
}

// If no response from Execute(), use WaitExecution() in a "loop" which is implemented
// inside the retry block.
Expand Down Expand Up @@ -177,7 +179,7 @@ ExecuteResponse execute() throws IOException {

try {
Iterator<Operation> operationStream = executeFunction.apply(request);
return handleOperationStream(operationStream);
return handleOperationStream(operationStream, /* waitExecution= */ false);
} catch (Throwable e) {
// If lastOperation is not null, we know the execution request is accepted by the server. In
// this case, we will fallback to WaitExecution() loop when the stream is broken.
Expand All @@ -197,33 +199,43 @@ ExecuteResponse waitExecution() throws IOException {
WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
try {
Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
return handleOperationStream(operationStream);
return handleOperationStream(operationStream, /* waitExecution= */ true);
} catch (StatusRuntimeException e) {
throw new IOException(e);
} catch (Throwable e) {
// A NOT_FOUND error means Operation was lost on the server, retry Execute().
//
// However, we only retry Execute() if executeBackoff should retry. Also increase the retry
// counter at the same time (done by nextDelayMillis()).
if (e instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) e;
if (sre.getStatus().getCode() == Code.NOT_FOUND
&& executeBackoff.nextDelayMillis(sre) >= 0) {
lastOperation = null;
return null;
}
}
lastOperation = null;
throw new IOException(e);
}
}

/** Process a stream of operations from Execute() or WaitExecution(). */
ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throws IOException {
@Nullable
ExecuteResponse handleOperationStream(
Iterator<Operation> operationStream, boolean waitExecution) throws IOException {
try {
while (operationStream.hasNext()) {
Operation operation = operationStream.next();
ExecuteResponse response = extractResponseOrThrowIfError(operation);

// At this point, we successfully received a response that is not an error.
lastOperation = operation;
// Either done or should be repeated
lastOperation = operation.getDone() ? null : operation;

ExecuteResponse response;
try {
response = extractResponseOrThrowIfError(operation);
} catch (StatusRuntimeException e) {
// An operation error means Operation has been terminally completed, retry Execute().
//
// However, we only retry Execute() if executeBackoff should retry. Also increase the
// retry
// counter at the same time (done by nextDelayMillis()).
if (waitExecution
&& (retrier.isRetriable(e) || e.getStatus().getCode() == Code.NOT_FOUND)
&& executeBackoff.nextDelayMillis(e) >= 0) {
lastOperation = null;
return null;
}
throw e;
}

// We don't want to reset executeBackoff since if there is an error:
// 1. If happened before we received a first response, we want to ensure the retry
Expand Down Expand Up @@ -258,8 +270,8 @@ ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throw
}
}

// The operation completed successfully but without a result.
throw new IOException("Remote server error: execution terminated with no result");
// The operation stream completed successfully but without a result.
return null;
} finally {
close(operationStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection;
import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool;
Expand All @@ -28,9 +29,12 @@
import io.netty.util.ReferenceCounted;
import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count.
Expand Down Expand Up @@ -80,19 +84,48 @@ public <T> ListenableFuture<T> withChannelFuture(
}

public <T> T withChannelBlocking(Function<Channel, T> source)
throws IOException, InterruptedException {
throws ExecutionException, IOException, InterruptedException {
try {
return withChannel(channel -> Single.just(source.apply(channel))).blockingGet();
} catch (RuntimeException e) {
return withChannelBlockingGet(source);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, IOException.class);
throwIfInstanceOf(cause, InterruptedException.class);
}
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfUnchecked(cause);
throw e;
}
}

// prevents rxjava silent possible wrap of RuntimeException and misinterpretation
private <T> T withChannelBlockingGet(Function<Channel, T> source)
throws ExecutionException, InterruptedException {
SettableFuture<T> future = SettableFuture.create();
withChannel(channel -> Single.just(source.apply(channel)))
.subscribe(
new SingleObserver<T>() {
@Override
public void onError(Throwable t) {
future.setException(t);
}

@Override
public void onSuccess(T t) {
future.set(t);
}

@Override
public void onSubscribe(Disposable d) {
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}
});
return future.get();
}

@CheckReturnValue
public <T> Single<T> withChannel(Function<Channel, ? extends SingleSource<? extends T>> source) {
return dynamicConnectionPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private static boolean shouldEnableRemoteDownloader(RemoteOptions options) {
return !Strings.isNullOrEmpty(options.remoteDownloader);
}

@Nullable
private static ServerCapabilities getAndVerifyServerCapabilities(
RemoteOptions remoteOptions,
ReferenceCountedChannel channel,
Expand Down Expand Up @@ -535,9 +536,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
ServerCapabilitiesRequirement.CACHE);
}
} catch (IOException e) {
} catch (AbruptExitException e) {
throw e; // prevent abrupt interception
} catch (Exception e) {
String errorMessage =
"Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e);
"Failed to query remote execution capabilities: "
+ Utils.grpcAwareErrorMessage(e, verboseFailures);
if (remoteOptions.remoteLocalFallback) {
if (verboseFailures) {
errorMessage += System.lineSeparator() + Throwables.getStackTraceAsString(e);
Expand All @@ -559,12 +563,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (IOException e) {
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
} catch (InterruptedException e) {
handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
Expand All @@ -587,7 +591,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
!remoteOptions.remoteOutputsMode.downloadAllOutputs(),
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -652,7 +656,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
!remoteOptions.remoteOutputsMode.downloadAllOutputs(),
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -698,7 +702,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

private static void handleInitFailure(
CommandEnvironment env, IOException e, Code remoteExecutionCode) {
CommandEnvironment env, Exception e, Code remoteExecutionCode) {
env.getReporter().handle(Event.error(e.getMessage()));
env.getBlazeModuleEnvironment()
.exit(
Expand Down Expand Up @@ -793,7 +797,7 @@ private static void checkClientServerCompatibility(
}

@Override
public void afterCommand() throws AbruptExitException {
public void afterCommand() {
Preconditions.checkNotNull(blockWaitingModule, "blockWaitingModule must not be null");

// Some cleanup tasks must wait until every other BlazeModule's afterCommand() has run, as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,21 +72,17 @@ public ServerCapabilities get(String buildRequestId, String commandId)
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
try {
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
return retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
} catch (StatusRuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException(e);
}
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
ServerCapabilities caps =
retrier.execute(
() ->
channel.withChannelBlocking(
channel ->
capabilitiesBlockingStub(context, channel).getCapabilities(request)));
return caps;
}

static class ClientServerCompatibilityStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FileArtifactValue;
Expand Down Expand Up @@ -141,13 +140,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
// Intentionally left blank
} else {
String errorMessage;
if (!verboseFailures) {
errorMessage = Utils.grpcAwareErrorMessage(e);
} else {
// On --verbose_failures print the whole stack trace
errorMessage = "\n" + Throwables.getStackTraceAsString(e);
}
String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures);
if (isNullOrEmpty(errorMessage)) {
errorMessage = e.getClass().getSimpleName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.CommandLines.ParamFileActionInput;
Expand Down Expand Up @@ -573,11 +572,7 @@ private SpawnResult handleError(
catastrophe = false;
}

String errorMessage = Utils.grpcAwareErrorMessage(exception);
if (verboseFailures) {
// On --verbose_failures print the whole stack trace
errorMessage += "\n" + Throwables.getStackTraceAsString(exception);
}
String errorMessage = Utils.grpcAwareErrorMessage(exception, verboseFailures);

if (exception.getCause() instanceof ExecutionStatusException) {
ExecutionStatusException e = (ExecutionStatusException) exception.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private static String executionStatusExceptionErrorMessage(ExecutionStatusExcept
+ errorDetailsMessage(status.getDetailsList());
}

public static String grpcAwareErrorMessage(IOException e) {
private static String grpcAwareErrorMessage(IOException e) {
io.grpc.Status errStatus = io.grpc.Status.fromThrowable(e);
if (e.getCause() instanceof ExecutionStatusException) {
// Display error message returned by the remote service.
Expand Down
Loading