Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: Add specific timeout for Partitioned DML with default of 2 hours #5709

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.spanner.v1.TransactionSelector;
import java.util.Map;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

/** Partitioned DML transaction for bulk updates and deletes. */
class PartitionedDMLTransaction implements SessionTransaction {
Expand Down Expand Up @@ -62,7 +63,7 @@ private ByteString initTransaction() {
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted.
*/
long executePartitionedUpdate(final Statement statement) {
long executePartitionedUpdate(final Statement statement, final Duration timeout) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
Callable<com.google.spanner.v1.ResultSet> callable =
new Callable<com.google.spanner.v1.ResultSet>() {
Expand All @@ -83,7 +84,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
return rpc.executeQuery(builder.build(), session.getOptions());
return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout);
}
};
com.google.spanner.v1.ResultSet resultSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public String getName() {
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
return txn.executePartitionedUpdate(stmt);
return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.URL;
import java.util.Map;
import java.util.Set;
import org.threeten.bp.Duration;

/** Options for the Cloud Spanner service. */
public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
Expand All @@ -68,6 +69,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final SpannerStubSettings spannerStubSettings;
private final InstanceAdminStubSettings instanceAdminStubSettings;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final Duration partitionedDmlTimeout;

/** Default implementation of {@code SpannerFactory}. */
private static class DefaultSpannerFactory implements SpannerFactory {
Expand Down Expand Up @@ -114,6 +116,7 @@ private SpannerOptions(Builder builder) {
} catch (IOException e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
partitionedDmlTimeout = builder.partitionedDmlTimeout;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor - I would move this above the try-catch, after sessionLabels.

}

/** Builder for {@link SpannerOptions} instances. */
Expand All @@ -139,6 +142,7 @@ public static class Builder
InstanceAdminStubSettings.newBuilder();
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
DatabaseAdminStubSettings.newBuilder();
private Duration partitionedDmlTimeout = Duration.ofHours(2L);

private Builder() {}

Expand All @@ -151,6 +155,7 @@ private Builder() {}
this.spannerStubSettingsBuilder = options.spannerStubSettings.toBuilder();
this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder();
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
this.channelProvider = options.channelProvider;
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
Expand Down Expand Up @@ -328,6 +333,15 @@ public DatabaseAdminStubSettings.Builder getDatabaseAdminStubSettingsBuilder() {
return databaseAdminStubSettingsBuilder;
}

/**
* Sets a timeout specifically for Partitioned DML statements executed through {@link
* DatabaseClient#executePartitionedUpdate(Statement)}. The default is 2 hours.
*/
public Builder setPartitionedDmlTimeout(Duration timeout) {
this.partitionedDmlTimeout = timeout;
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -396,6 +410,10 @@ public DatabaseAdminStubSettings getDatabaseAdminStubSettings() {
return databaseAdminStubSettings;
}

public Duration getPartitionedDmlTimeout() {
return partitionedDmlTimeout;
}

public int getPrefetchChunks() {
return prefetchChunks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,13 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?
return get(spannerStub.executeSqlCallable().futureCall(request, context));
}

@Override
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout) {
GrpcCallContext context = newCallContext(options, request.getSession(), timeout);
return get(spannerStub.executeSqlCallable().futureCall(request, context));
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
Expand Down Expand Up @@ -591,11 +598,19 @@ private static <T> T get(final Future<T> future) throws SpannerException {
}

private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource) {
return newCallContext(options, resource, null);
}

private GrpcCallContext newCallContext(
@Nullable Map<Option, ?> options, String resource, Duration timeout) {
GrpcCallContext context = GrpcCallContext.createDefault();
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (timeout != null) {
context = context.withTimeout(timeout);
}
return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(idleTimeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
Expand Down Expand Up @@ -213,6 +214,9 @@ StreamingCall read(

ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand All @@ -32,13 +36,15 @@
import io.grpc.Status;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class DatabaseClientImplTest {
Expand Down Expand Up @@ -89,21 +95,24 @@ public static void startStaticServer() throws IOException {
String uniqueName = InProcessServerBuilder.generateName();
server =
InProcessServerBuilder.forName(uniqueName)
.directExecutor()
// We need to use a real executor for timeouts to occur.
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
public void setUp() throws IOException {
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
spanner =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
Expand Down Expand Up @@ -158,4 +167,89 @@ public void testExecutePartitionedDmlWithException() {
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
client.executePartitionedUpdate(INVALID_UPDATE_STATEMENT);
}

@Test
public void testPartitionedDmlDoesNotTimeout() throws Exception {
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(10, 0));
final RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(1L))
.setMaxRpcTimeout(Duration.ofMillis(1L))
.setMaxAttempts(1)
.setTotalTimeout(Duration.ofMillis(1L))
.build();
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set normal DML timeout value.
builder.getSpannerStubSettingsBuilder().executeSqlSettings().setRetrySettings(retrySettings);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add an assertion that the PDML timeout is 2 hrs?

try (Spanner spanner = builder.build().getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));

// PDML should not timeout with these settings.
long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT);
assertThat(updateCount, is(equalTo(UPDATE_COUNT)));

// Normal DML should timeout.
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
transaction.executeUpdate(UPDATE_STATEMENT);
return null;
}
});
fail("expected DEADLINE_EXCEEDED");
} catch (SpannerException e) {
if (e.getErrorCode() != ErrorCode.DEADLINE_EXCEEDED) {
fail("expected DEADLINE_EXCEEDED");
}
}
}
}

@Test
public void testPartitionedDmlWithTimeout() throws Exception {
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(10, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set PDML timeout value.
builder.setPartitionedDmlTimeout(Duration.ofMillis(1L));
try (Spanner spanner = builder.build().getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));

// PDML should timeout with these settings.
try {
client.executePartitionedUpdate(UPDATE_STATEMENT);
fail("expected DEADLINE_EXCEEDED");
} catch (SpannerException e) {
if (e.getErrorCode() != ErrorCode.DEADLINE_EXCEEDED) {
fail("expected DEADLINE_EXCEEDED");
}
}

// Normal DML should not timeout.
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
assertThat(updateCount, is(equalTo(UPDATE_COUNT)));
}
}
}