Skip to content

Commit

Permalink
Merge pull request #691 from ajkannan/v1beta3-read-consistency
Browse files Browse the repository at this point in the history
Eventual consistency for Datastore reads in v1beta3
  • Loading branch information
ajkannan committed Mar 1, 2016
2 parents b4bd1b9 + 0d794ea commit 47aae51
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 290 deletions.
12 changes: 0 additions & 12 deletions gcloud-java-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,6 @@
<artifactId>datastore-v1beta3-proto-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
<version>v1beta2-rev1-2.1.2</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.gcloud.Service;

import java.util.Iterator;
import java.util.List;

/**
Expand All @@ -32,7 +33,6 @@ public interface Datastore extends Service<DatastoreOptions>, DatastoreReaderWri
*/
Transaction newTransaction();


/**
* A callback for running with a transactional
* {@link com.google.gcloud.datastore.DatastoreReaderWriter}.
Expand All @@ -45,7 +45,6 @@ interface TransactionCallable<T> {
T run(DatastoreReaderWriter readerWriter) throws Exception;
}


/**
* Invokes the callback's {@link Datastore.TransactionCallable#run} method with a
* {@link DatastoreReaderWriter} that is associated with a new transaction.
Expand Down Expand Up @@ -105,4 +104,39 @@ interface TransactionCallable<T> {
* Returns a new KeyFactory for this service
*/
KeyFactory newKeyFactory();

/**
* Returns an {@link Entity} for the given {@link Key} or {@code null} if it doesn't exist.
* {@link ReadOption}s can be specified if desired.
*
* @throws DatastoreException upon failure
*/
Entity get(Key key, ReadOption... options);

/**
* Returns an {@link Entity} for each given {@link Key} that exists in the Datastore. The order of
* the result is unspecified. Results are loaded lazily, so it is possible to get a
* {@code DatastoreException} from the returned {@code Iterator}'s
* {@link Iterator#hasNext hasNext} or {@link Iterator#next next} methods. {@link ReadOption}s can
* be specified if desired.
*
* @throws DatastoreException upon failure
* @see #get(Key)
*/
Iterator<Entity> get(Iterable<Key> keys, ReadOption... options);

/**
* Returns a list with a value for each given key (ordered by input). {@code null} values are
* returned for nonexistent keys. When possible prefer using {@link #get(Key...)} to avoid eagerly
* loading the results. {@link ReadOption}s can be specified if desired.
*/
List<Entity> fetch(Iterable<Key> keys, ReadOption... options);

/**
* Submits a {@link Query} and returns its result. {@link ReadOption}s can be specified if
* desired.
*
* @throws DatastoreException upon failure
*/
<T> QueryResults<T> run(Query<T> query, ReadOption... options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -33,13 +35,16 @@ class DatastoreHelper {
private DatastoreHelper() {
}


static Key allocateId(Datastore service, IncompleteKey key) {
return service.allocateId(new IncompleteKey[]{key}).get(0);
}

static Entity get(DatastoreReader reader, Key key) {
return Iterators.getNext(reader.get(new Key[]{key}), null);
static Entity get(Transaction reader, Key key) {
return Iterators.getNext(reader.get(new Key[] {key}), null);
}

static Entity get(Datastore reader, Key key, ReadOption... options) {
return Iterators.getNext(reader.get(Collections.singletonList(key), options), null);
}

static Entity add(DatastoreWriter writer, FullEntity<?> entity) {
Expand All @@ -51,19 +56,30 @@ static KeyFactory newKeyFactory(DatastoreOptions options) {
}

/**
* Returns a list with a value for each given key (ordered by input).
* A {@code null} would be returned for non-existing keys.
* Returns a list with a value for each given key (ordered by input). {@code null} values are
* returned for nonexistent keys.
*/
static List<Entity> fetch(DatastoreReader reader, Key... keys) {
Iterator<Entity> entities = reader.get(keys);
static List<Entity> fetch(Transaction reader, Key... keys) {
return compileEntities(keys, reader.get(keys));
}

/**
* Returns a list with a value for each given key (ordered by input). {@code null} values are
* returned for nonexistent keys.
*/
static List<Entity> fetch(Datastore reader, Key[] keys, ReadOption... options) {
return compileEntities(keys, reader.get(Arrays.asList(keys), options));
}

private static List<Entity> compileEntities(Key[] keys, Iterator<Entity> entities) {
Map<Key, Entity> map = Maps.newHashMapWithExpectedSize(keys.length);
while (entities.hasNext()) {
Entity entity = entities.next();
map.put(entity.key(), entity);
}
List<Entity> list = new ArrayList<>(keys.length);
for (Key key : keys) {
// this will include nulls for non-existing keys
// this will include nulls for nonexistent keys
list.add(map.get(key));
}
return list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.datastore.v1beta3.ReadOptions.ReadConsistency;
import com.google.gcloud.BaseService;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.RetryHelper.RetryHelperException;
import com.google.gcloud.RetryParams;
import com.google.gcloud.datastore.ReadOption.EventualConsistency;
import com.google.gcloud.spi.DatastoreRpc;
import com.google.protobuf.ByteString;

Expand Down Expand Up @@ -70,6 +73,11 @@ public <T> QueryResults<T> run(Query<T> query) {
return run(null, query);
}

@Override
public <T> QueryResults<T> run(Query<T> query, ReadOption... options) {
return run(toReadOptionsPb(options), query);
}

<T> QueryResults<T> run(com.google.datastore.v1beta3.ReadOptions readOptionsPb, Query<T> query) {
return new QueryResultsImpl<>(this, readOptionsPb, query);
}
Expand Down Expand Up @@ -185,16 +193,42 @@ public Entity get(Key key) {
return DatastoreHelper.get(this, key);
}

@Override
public Entity get(Key key, ReadOption... options) {
return DatastoreHelper.get(this, key, options);
}

@Override
public Iterator<Entity> get(Key... keys) {
return get(null, keys);
}

@Override
public Iterator<Entity> get(Iterable<Key> keys, ReadOption... options) {
return get(toReadOptionsPb(options), Iterables.toArray(keys, Key.class));
}

private static com.google.datastore.v1beta3.ReadOptions toReadOptionsPb(ReadOption... options) {
com.google.datastore.v1beta3.ReadOptions readOptionsPb = null;
if (options != null
&& ReadOption.asImmutableMap(options).containsKey(EventualConsistency.class)) {
readOptionsPb = com.google.datastore.v1beta3.ReadOptions.newBuilder()
.setReadConsistency(ReadConsistency.EVENTUAL)
.build();
}
return readOptionsPb;
}

@Override
public List<Entity> fetch(Key... keys) {
return DatastoreHelper.fetch(this, keys);
}

@Override
public List<Entity> fetch(Iterable<Key> keys, ReadOption... options) {
return DatastoreHelper.fetch(this, Iterables.toArray(keys, Key.class), options);
}

Iterator<Entity> get(com.google.datastore.v1beta3.ReadOptions readOptionsPb, final Key... keys) {
if (keys.length == 0) {
return Collections.emptyIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,32 @@
public interface DatastoreReader {

/**
* Returns an {@link Entity} for the given {@link Key} or {@code null} if does not exists.
* Returns an {@link Entity} for the given {@link Key} or {@code null} if it doesn't exist.
*
* @throws DatastoreException upon failure
*/
Entity get(Key key);

/**
* Returns an {@link Entity} for each given {@link Key} that exists in the Datastore.
* The order of the result is unspecified.
* Results are loaded lazily therefore it is possible to get a {@code DatastoreException}
* from the returned {@code Iterator}'s {@link Iterator#hasNext hasNext} or
* {@link Iterator#next next} methods.
* Returns an {@link Entity} for each given {@link Key} that exists in the Datastore. The order of
* the result is unspecified. Results are loaded lazily, so it is possible to get a
* {@code DatastoreException} from the returned {@code Iterator}'s
* {@link Iterator#hasNext hasNext} or {@link Iterator#next next} methods.
*
* @throws DatastoreException upon failure
* @see #get(Key)
*/
Iterator<Entity> get(Key... key);

/**
* Returns a list with a value for each given key (ordered by input).
* A {@code null} would be returned for non-existing keys.
* When possible prefer using {@link #get(Key...)} which does not eagerly loads the results.
* Returns a list with a value for each given key (ordered by input). {@code null} values are
* returned for nonexistent keys. When possible prefer using {@link #get(Key...)} to avoid eagerly
* loading the results.
*/
List<Entity> fetch(Key... keys);

/**
* Submit a {@link Query} and returns its result.
* Submits a {@link Query} and returns its result.
*
* @throws DatastoreException upon failure
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud.datastore;

import com.google.common.collect.ImmutableMap;

import java.io.Serializable;
import java.util.Map;

/**
* Specifies options for read operations in Datastore, namely getting/fetching entities and running
* queries.
*/
public abstract class ReadOption implements Serializable {

private static final long serialVersionUID = -4406964829189800528L;

/**
* Specifies eventual consistency for reads from Datastore. Lookups and ancestor queries using
* this option permit Datastore to return stale results.
*/
public static final class EventualConsistency extends ReadOption {

private static final long serialVersionUID = -6959530217724666172L;

private final boolean eventualConsistency;

private EventualConsistency(boolean eventualConsistency) {
this.eventualConsistency = eventualConsistency;
}

public boolean isEventual() {
return eventualConsistency;
}
}

private ReadOption() {}

/**
* Returns a {@code ReadOption} that specifies eventual consistency, allowing Datastore to return
* stale results from gets, fetches, and ancestor queries.
*/
public static EventualConsistency eventualConsistency() {
return new EventualConsistency(true);
}

static Map<Class<? extends ReadOption>, ReadOption> asImmutableMap(ReadOption... options) {
ImmutableMap.Builder<Class<? extends ReadOption>, ReadOption> builder = ImmutableMap.builder();
for (ReadOption option : options) {
builder.put(option.getClass(), option);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static class ResponseImpl implements Transaction.Response {

@Override
public List<Key> generatedKeys() {
Iterator<com.google.datastore.v1beta3.MutationResult> results =
Iterator<com.google.datastore.v1beta3.MutationResult> results =
response.getMutationResultsList().iterator();
List<Key> generated = new ArrayList<>(numAutoAllocatedIds);
for (int i = 0; i < numAutoAllocatedIds; i++) {
Expand All @@ -66,7 +66,7 @@ public Entity get(Key key) {
@Override
public Iterator<Entity> get(Key... keys) {
validateActive();
com.google.datastore.v1beta3.ReadOptions.Builder readOptionsPb =
com.google.datastore.v1beta3.ReadOptions.Builder readOptionsPb =
com.google.datastore.v1beta3.ReadOptions.newBuilder();
readOptionsPb.setTransaction(transaction);
return datastore.get(readOptionsPb.build(), keys);
Expand All @@ -81,7 +81,7 @@ public List<Entity> fetch(Key... keys) {
@Override
public <T> QueryResults<T> run(Query<T> query) {
validateActive();
com.google.datastore.v1beta3.ReadOptions.Builder readOptionsPb =
com.google.datastore.v1beta3.ReadOptions.Builder readOptionsPb =
com.google.datastore.v1beta3.ReadOptions.newBuilder();
readOptionsPb.setTransaction(transaction);
return datastore.run(readOptionsPb.build(), query);
Expand All @@ -91,7 +91,7 @@ public <T> QueryResults<T> run(Query<T> query) {
public Transaction.Response commit() {
validateActive();
List<com.google.datastore.v1beta3.Mutation> mutationsPb = toMutationPbList();
com.google.datastore.v1beta3.CommitRequest.Builder requestPb =
com.google.datastore.v1beta3.CommitRequest.Builder requestPb =
com.google.datastore.v1beta3.CommitRequest.newBuilder();
requestPb.setMode(com.google.datastore.v1beta3.CommitRequest.Mode.TRANSACTIONAL);
requestPb.setTransaction(transaction);
Expand Down
Loading

0 comments on commit 47aae51

Please sign in to comment.