Skip to content

Commit

Permalink
[6.3.0] Implement failure circuit breaker (#18541)
Browse files Browse the repository at this point in the history
* feat: Implement failure circuit breaker

Copy of #18120: I accidentally closed #18120 during rebase and doesn't have permission to reopen.

We have noticed that any problems with the remote cache have a detrimental effect on build times. On investigation we found that the interface for the circuit breaker was left unimplemented.

To address this issue, implemented a failure circuit breaker, which includes three new Bazel flags: 1) experimental_circuitbreaker_strategy, 2) experimental_remote_failure_threshold, and 3) experimental_emote_failure_window.

In this implementation, I have implemented failure strategy for circuit breaker and used failure count to trip the circuit.

Reasoning behind using failure count instead of failure rate : To measure failure rate I also need the success count. While both the failure and success count need to be an AtomicInteger as both will be modified concurrently by multiple threads. Even though getAndIncrement is very light weight operation, at very high request it might contribute to latency.

Reasoning behind using failure circuit breaker : A new instance of Retrier.CircuitBreaker is created for each build. Therefore, if the circuit breaker trips during a build, the remote cache will be disabled for that build. However, it will be enabled again
for the next build as a new instance of Retrier.CircuitBreaker will be created. If needed in the future we may add cool down strategy also. e.g. failure_and_cool_down_startegy.

closes #18136

Closes #18359.

PiperOrigin-RevId: 536349954
Change-Id: I5e1c57d4ad0ce07ddc4808bf1f327bc5df6ce704

* remove target included in cherry-pick by mistake
  • Loading branch information
amishra-u authored Jun 1, 2023
1 parent 28ebcdc commit 31f07cc
Show file tree
Hide file tree
Showing 17 changed files with 578 additions and 176 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package(
filegroup(
name = "srcs",
srcs = glob(["*"]) + [
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/downloader:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
Expand Down Expand Up @@ -84,6 +85,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/exec/local",
"//src/main/java/com/google/devtools/build/lib/packages/semantics",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,4 +495,8 @@ ListenableFuture<Void> uploadChunker(
MoreExecutors.directExecutor());
return f;
}

Retrier getRetrier() {
return this.retrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,8 @@ public void close() {
}
channel.release();
}

RemoteRetrier getRetrier() {
return this.retrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
import com.google.devtools.build.lib.remote.ToplevelArtifactsDownloader.PathToMetadataConverter;
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
Expand Down Expand Up @@ -475,12 +476,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
GoogleAuthUtils.newCallCredentialsProvider(credentials);
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();

Retrier.CircuitBreaker circuitBreaker =
CircuitBreakerFactory.createCircuitBreaker(remoteOptions);
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker);

// We only check required capabilities for a given endpoint.
//
Expand Down Expand Up @@ -598,7 +598,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);
remoteExecutor =
new ExperimentalGrpcRemoteExecutor(
remoteOptions, execChannel.retain(), callCredentialsProvider, execRetrier);
Expand All @@ -608,7 +608,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);
remoteExecutor =
new GrpcRemoteExecutor(execChannel.retain(), callCredentialsProvider, execRetrier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs;
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
Expand Down Expand Up @@ -660,6 +661,8 @@ private void report(Event evt) {
private static RemoteRetrier createExecuteRetrier(
RemoteOptions options, ListeningScheduledExecutorService retryService) {
return new ExecuteRetrier(
options.remoteMaxRetryAttempts, retryService, Retrier.ALLOW_ALL_CALLS);
options.remoteMaxRetryAttempts,
retryService,
CircuitBreakerFactory.createCircuitBreaker(options));
}
}
34 changes: 27 additions & 7 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ enum State {
State state();

/** Called after an execution failed. */
void recordFailure();
void recordFailure(Exception e);

/** Called after an execution succeeded. */
void recordSuccess();
Expand Down Expand Up @@ -130,7 +130,7 @@ public State state() {
}

@Override
public void recordFailure() {}
public void recordFailure(Exception e) {}

@Override
public void recordSuccess() {}
Expand Down Expand Up @@ -245,7 +245,7 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
circuitBreaker.recordSuccess();
return r;
} catch (Exception e) {
circuitBreaker.recordFailure();
circuitBreaker.recordFailure(e);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
if (State.TRIAL_CALL.equals(circuitState)) {
throw e;
Expand All @@ -272,19 +272,35 @@ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call) {
* backoff.
*/
public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, Backoff backoff) {
final State circuitState = circuitBreaker.state();
if (State.REJECT_CALLS.equals(circuitState)) {
return Futures.immediateFailedFuture(new CircuitBreakerException());
}
try {
ListenableFuture<T> future =
Futures.transformAsync(
call.call(),
(f) -> {
circuitBreaker.recordSuccess();
return Futures.immediateFuture(f);
},
MoreExecutors.directExecutor());
return Futures.catchingAsync(
call.call(),
future,
Exception.class,
t -> onExecuteAsyncFailure(t, call, backoff),
t -> onExecuteAsyncFailure(t, call, backoff, circuitState),
MoreExecutors.directExecutor());
} catch (Exception e) {
return onExecuteAsyncFailure(e, call, backoff);
return onExecuteAsyncFailure(e, call, backoff, circuitState);
}
}

private <T> ListenableFuture<T> onExecuteAsyncFailure(
Exception t, AsyncCallable<T> call, Backoff backoff) {
Exception t, AsyncCallable<T> call, Backoff backoff, State circuitState) {
circuitBreaker.recordFailure(t);
if (circuitState.equals(State.TRIAL_CALL)) {
return Futures.immediateFailedFuture(t);
}
if (isRetriable(t)) {
long waitMillis = backoff.nextDelayMillis(t);
if (waitMillis >= 0) {
Expand All @@ -310,4 +326,8 @@ public Backoff newBackoff() {
public boolean isRetriable(Exception e) {
return shouldRetry.test(e);
}

CircuitBreaker getCircuitBreaker() {
return this.circuitBreaker;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@rules_java//java:defs.bzl", "java_library")

package(
default_applicable_licenses = ["//:license"],
default_visibility = ["//src:__subpackages__"],
)

filegroup(
name = "srcs",
srcs = glob(["*"]),
visibility = ["//src:__subpackages__"],
)

java_library(
name = "circuitbreaker",
srcs = glob(["*.java"]),
deps = [
"//src/main/java/com/google/devtools/build/lib/remote:Retrier",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//third_party:guava",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.options.RemoteOptions;

/** Factory for {@link Retrier.CircuitBreaker} */
public class CircuitBreakerFactory {

public static final ImmutableSet<Class<? extends Exception>> DEFAULT_IGNORED_ERRORS =
ImmutableSet.of(CacheNotFoundException.class);

private CircuitBreakerFactory() {}

/**
* Creates the instance of the {@link Retrier.CircuitBreaker} as per the strategy defined in
* {@link RemoteOptions}. In case of undefined strategy defaults to {@link
* Retrier.ALLOW_ALL_CALLS} implementation.
*
* @param remoteOptions The configuration for the CircuitBreaker implementation.
* @return an instance of CircuitBreaker.
*/
public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) {
if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) {
return new FailureCircuitBreaker(
remoteOptions.remoteFailureThreshold,
(int) remoteOptions.remoteFailureWindowInterval.toMillis());
}
return Retrier.ALLOW_ALL_CALLS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents
* further calls to a remote cache once the number of failures within a given window exceeds a
* specified threshold for a build. In the context of Bazel, a new instance of {@link
* Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit breaker trips during
* a build, the remote cache will be disabled for that build. However, it will be enabled again for
* the next build as a new instance of {@link Retrier.CircuitBreaker} will be created.
*/
public class FailureCircuitBreaker implements Retrier.CircuitBreaker {

private State state;
private final AtomicInteger failures;
private final int failureThreshold;
private final int slidingWindowSize;
private final ScheduledExecutorService scheduledExecutor;
private final ImmutableSet<Class<? extends Exception>> ignoredErrors;

/**
* Creates a {@link FailureCircuitBreaker}.
*
* @param failureThreshold is used to set the number of failures required to trip the circuit
* breaker in given time window.
* @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number
* of failures.
*/
public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) {
this.failureThreshold = failureThreshold;
this.failures = new AtomicInteger(0);
this.slidingWindowSize = slidingWindowSize;
this.state = State.ACCEPT_CALLS;
this.scheduledExecutor =
slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS;
}

@Override
public State state() {
return this.state;
}

@Override
public void recordFailure(Exception e) {
if (!ignoredErrors.contains(e.getClass())) {
int failureCount = failures.incrementAndGet();
if (slidingWindowSize > 0) {
var unused =
scheduledExecutor.schedule(
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
}
// Since the state can only be changed to the open state, synchronization is not required.
if (failureCount > this.failureThreshold) {
this.state = State.REJECT_CALLS;
}
}
}

@Override
public void recordSuccess() {
// do nothing, implement if we need to set threshold on failure rate instead of count.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.options;

import com.google.devtools.common.options.Converter;
import com.google.devtools.common.options.Converters;
import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionDocumentationCategory;
import com.google.devtools.common.options.OptionEffectTag;
import com.google.devtools.common.options.OptionsBase;
import com.google.devtools.common.options.OptionsParsingException;
import java.time.Duration;
import java.util.List;
import java.util.regex.Pattern;

/** Options for remote execution and distributed caching that shared between Bazel and Blaze. */
public class CommonRemoteOptions extends OptionsBase {
Expand All @@ -33,4 +38,23 @@ public class CommonRemoteOptions extends OptionsBase {
+ " the client to request certain artifacts that might be needed locally (e.g. IDE"
+ " support)")
public List<String> remoteDownloadRegex;

/** Returns the specified duration. Assumes seconds if unitless. */
public static class RemoteDurationConverter extends Converter.Contextless<Duration> {

private static final Pattern UNITLESS_REGEX = Pattern.compile("^[0-9]+$");

@Override
public Duration convert(String input) throws OptionsParsingException {
if (UNITLESS_REGEX.matcher(input).matches()) {
input += "s";
}
return new Converters.DurationConverter().convert(input, /* conversionContext= */ null);
}

@Override
public String getTypeDescription() {
return "An immutable length of time.";
}
}
}
Loading

0 comments on commit 31f07cc

Please sign in to comment.