Skip to content

Commit

Permalink
feat: handle retry info so client respect the delay server sets
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 14, 2023
1 parent ccc2764 commit c56b1cf
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
retrying1, innerSettings, clientContext);

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -568,7 +569,8 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -607,7 +609,8 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -810,7 +813,8 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -851,7 +855,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -235,7 +236,8 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!failedMutation.getError().isRetryable()) {
if (ApiResultRetryAlgorithm.extractRetryDelay(failedMutation.getError()) == null
&& !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,46 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/** For internal use, public for technical reasons. */
/**
* For internal use, public for technical reasons. This retry algorithm checks the metadata of an
* exception for additional error details. If the metadata has a RetryInfo field, use the retry
* delay to set the wait time between attempts.
*/
@InternalApi
public class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
Duration retryDelay = extractRetryDelay(prevThrowable);
if (retryDelay != null) {
return prevSettings
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
}

/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
@Override
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
return (previousThrowable instanceof ApiException)
&& ((ApiException) previousThrowable).isRetryable();
return (extractRetryDelay(previousThrowable) != null)
|| (previousThrowable instanceof ApiException
&& ((ApiException) previousThrowable).isRetryable());
}

/**
Expand All @@ -43,11 +72,30 @@ public boolean shouldRetry(
if (context.getRetryableCodes() != null) {
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
// of codes that should be retried.
return (previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode());
return extractRetryDelay(previousThrowable) != null
|| ((previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode()));
}
return shouldRetry(previousThrowable, previousResponse);
}

public static Duration extractRetryDelay(Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
return null;
}
RetryInfo retryInfo = trailers.get(KEY_RETRY_INFO);
if (retryInfo == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
return null;
}
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(
}

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new UnaryRetryAlgorithm<>(
new ApiResultRetryAlgorithm<ResponseT>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.retrying.TimedRetryAlgorithmWithContext;
import java.util.concurrent.CancellationException;

/**
* Retry algorithm for unary calls. It'll use the result from result algorithm first and only fall
* back to timedAlgorithm if resultAlgorithm#shouldRetry is false.
*/
@InternalApi
public class UnaryRetryAlgorithm<ResponseT> extends RetryAlgorithm<ResponseT> {

public UnaryRetryAlgorithm(
ResultRetryAlgorithmWithContext<ResponseT> resultAlgorithm,
TimedRetryAlgorithmWithContext timedAlgorithm) {
super(resultAlgorithm, timedAlgorithm);
}

@Override
public boolean shouldRetry(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings nextAttemptSettings)
throws CancellationException {
if (getResultAlgorithm().shouldRetry(previousThrowable, previousResponse)) {
return true;
}
return super.shouldRetry(context, previousThrowable, previousResponse, nextAttemptSettings);
}
}
Loading

0 comments on commit c56b1cf

Please sign in to comment.