Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: add await replication #3658

Merged
merged 5 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
import com.google.bigtable.admin.v2.DeleteTableRequest;
import com.google.bigtable.admin.v2.DropRowRangeRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.bigtable.admin.v2.GetTableRequest;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.ListTablesRequest;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest;
import com.google.cloud.bigtable.admin.v2.models.Table;
import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -649,91 +644,57 @@ public ApiFuture<Void> dropAllRowsAsync(String tableId) {
}

/**
* Generates a token to verify the replication status of table mutations invoked before this call.
* Token expires in 90 days
* Blocks until replication has caught up to the point this method was called. This allows callers
* to make sure that their mutations have been replicated across all of their clusters.
*
* <p>Sample code:
* <p>Sample code
*
* <pre>{@code
* ConsistencyToken consistencyToken = client.generateConsistencyToken("my-table");
* client.awaitReplication("my-table");
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ConsistencyToken generateConsistencyToken(String tableId) {
return awaitFuture(generateConsistencyTokenAsync(tableId));
}

/**
* Asynchornously generates a token to verify the replication status of table mutations invoked
* before this call. Token expires in 90 days
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<ConsistencyToken> consistencyTokenFuture = client.generateConsistencyToken("my-table");
* }</pre>
* @throws com.google.api.gax.retrying.PollException when polling exceeds the total timeout
*/
// TODO(igorbernstein2): add sample code for waiting for the fetch consistency token
@SuppressWarnings("WeakerAccess")
public ApiFuture<ConsistencyToken> generateConsistencyTokenAsync(final String tableId) {
GenerateConsistencyTokenRequest request = GenerateConsistencyTokenRequest.newBuilder()
.setName(getTableName(tableId))
.build();

return ApiFutures.transform(
stub.generateConsistencyTokenCallable().futureCall(request),
new ApiFunction<GenerateConsistencyTokenResponse, ConsistencyToken>() {
@Override
public ConsistencyToken apply(GenerateConsistencyTokenResponse proto) {
TableName tableName = TableName
.of(instanceName.getProject(), instanceName.getInstance(), tableId);
return ConsistencyToken.of(tableName, proto.getConsistencyToken());
}
},
MoreExecutors.directExecutor());
public void awaitReplication(String tableId) {
TableName tableName = TableName
.of(instanceName.getProject(), instanceName.getInstance(), tableId);
awaitFuture(stub.awaitReplicationCallable().futureCall(tableName));
}

/**
* Checks replication consistency for the specified token consistency token
* Returns a future that is resolved when replication has caught up to the point this method was
* called. This allows callers to make sure that their mutations have been replicated across all
* of their clusters.
*
* <p>Sample code:
*
* <pre>{@code
* try(BigtableTableAdminClient client = BigtableTableAdminClient.create(InstanceName.of("[PROJECT]", "[INSTANCE]"))) {
* // Perform some mutations.
* ApiFuture<Void> replicationFuture = client.awaitReplicationAsync("my-table");
*
* ConsistencyToken token = client.generateConsistencyToken("table-id");
* while(!client.isConsistent(token)) {
* Thread.sleep(100);
* }
* ApiFutures.addCallback(
* replicationFuture,
* new ApiFutureCallback<Void>() {
* public void onSuccess(Table table) {
* System.out.println("All clusters are now consistent");
* }
*
* public void onFailure(Throwable t) {
* t.printStackTrace();
* }
* },
* MoreExecutors.directExecutor()
* );
*
* // Now all clusters are consistent
* }
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public boolean isConsistent(ConsistencyToken token) {
return awaitFuture(isConsistentAsync(token));
public ApiFuture<Void> awaitReplicationAsync(final String tableId) {
TableName tableName = TableName
.of(instanceName.getProject(), instanceName.getInstance(), tableId);
return stub.awaitReplicationCallable().futureCall(tableName);
}

@VisibleForTesting
ApiFuture<Boolean> isConsistentAsync(ConsistencyToken token) {
ApiFuture<CheckConsistencyResponse> checkConsResp = stub.checkConsistencyCallable()
.futureCall(token.toProto(instanceName));

return ApiFutures.transform(
checkConsResp,
new ApiFunction<CheckConsistencyResponse, Boolean>() {
@Override
public Boolean apply(CheckConsistencyResponse input) {
return input.getConsistent();
}
},
MoreExecutors.directExecutor());
}

// TODO(igorbernstein2): add awaitConsist() & awaitConsistAsync() that generate & poll a token

/**
* Helper method to construct the table name in format: projects/{project}/instances/{instance}/tables/{tableId}
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.admin.v2.stub;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.ExponentialPollAlgorithm;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.bigtable.admin.v2.TableName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;

/**
* Callable that waits until replication has caught up to the point it was called.
*
* <p>This callable wraps GenerateConsistencyToken and CheckConsistency RPCs. It will generate a
* token then poll until isConsistent is true.
*/
class AwaitReplicationCallable extends UnaryCallable<TableName, Void> {
private final UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse> generateCallable;
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
private final RetryingExecutor<CheckConsistencyResponse> executor;

static AwaitReplicationCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse> generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings) {

This comment was marked as spam.

This comment was marked as spam.


RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm = new RetryAlgorithm<>(
new PollResultAlgorithm(),
new ExponentialPollAlgorithm(pollingSettings, clientContext.getClock())
);

RetryingExecutor<CheckConsistencyResponse> retryingExecutor = new ScheduledRetryingExecutor<>(
retryAlgorithm,
clientContext.getExecutor()
);

return new AwaitReplicationCallable(
generateCallable,
checkCallable,
retryingExecutor
);
}

@VisibleForTesting
AwaitReplicationCallable(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse> generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
RetryingExecutor<CheckConsistencyResponse> executor) {
this.generateCallable = generateCallable;
this.checkCallable = checkCallable;
this.executor = executor;
}

@Override
public ApiFuture<Void> futureCall(final TableName tableName, final ApiCallContext context) {
ApiFuture<GenerateConsistencyTokenResponse> tokenFuture = generateToken(tableName, context);

return ApiFutures.transformAsync(
tokenFuture,
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
@Override
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
CheckConsistencyRequest request = CheckConsistencyRequest.newBuilder()
.setName(tableName.toString())
.setConsistencyToken(input.getConsistencyToken())
.build();

return pollToken(request, context);
}
},
MoreExecutors.directExecutor()
);
}

private ApiFuture<GenerateConsistencyTokenResponse> generateToken(TableName tableName,
ApiCallContext context) {
GenerateConsistencyTokenRequest generateRequest = GenerateConsistencyTokenRequest.newBuilder()
.setName(tableName.toString())
.build();
return generateCallable.futureCall(generateRequest, context);
}

private ApiFuture<Void> pollToken(CheckConsistencyRequest request, ApiCallContext context) {
AttemptCallable<CheckConsistencyRequest, CheckConsistencyResponse> attemptCallable =
new AttemptCallable<>(checkCallable, request, context);
RetryingFuture<CheckConsistencyResponse> retryingFuture = executor
.createFuture(attemptCallable);
attemptCallable.setExternalFuture(retryingFuture);
attemptCallable.call();

return ApiFutures.transform(
retryingFuture,
new ApiFunction<CheckConsistencyResponse, Void>() {
@Override
public Void apply(CheckConsistencyResponse input) {
return null;
}
},
MoreExecutors.directExecutor()
);
}

/**
* A callable representing an attempt to make an RPC call.
*/
private static class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT request;

private volatile RetryingFuture<ResponseT> externalFuture;
private volatile ApiCallContext callContext;

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = callable;
this.request = request;
this.callContext = callContext;
}

void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = externalFuture;
}

@Override
public ResponseT call() {
try {
// NOTE: unlike gax's AttemptCallable, this ignores rpc timeouts
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}
ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
}

return null;
}
}

/**
* A polling algorithm for waiting for a consistent {@link CheckConsistencyResponse}. Please note
* that this class doesn't handle retryable errors and expects the underlying callable chain to
* handle this.
*/
private static class PollResultAlgorithm implements
ResultRetryAlgorithm<CheckConsistencyResponse> {
@Override
public TimedAttemptSettings createNextAttempt(Throwable prevThrowable,
CheckConsistencyResponse prevResponse, TimedAttemptSettings prevSettings) {
return null;
}

@Override
public boolean shouldRetry(Throwable prevThrowable, CheckConsistencyResponse prevResponse)
throws CancellationException {
return prevResponse != null && !prevResponse.getConsistent();
}
}
}
Loading