Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'eventual' flag to 'Connection.lookup()' / 'Connection.run_query()'. #449

Merged
merged 4 commits into from
Dec 19, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs):
kwargs['connection'] = self
return Dataset(*args, **kwargs)

def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
def lookup(self, dataset_id, key_pbs,
missing=None, deferred=None, eventual=False):
"""Lookup keys from a dataset in the Cloud Datastore.

Maps the ``DatastoreService.Lookup`` protobuf RPC.
Expand Down Expand Up @@ -211,22 +212,40 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
by the backend as "deferred" will be copied into it.
Use only as a keyword param.

:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.

:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:raises: ValueError if ``eventual`` is True
"""
if missing is not None and missing != []:
raise ValueError('missing must be None or an empty list')

if deferred is not None and deferred != []:
raise ValueError('deferred must be None or an empty list')

transaction = self.transaction()
if eventual and transaction:
raise ValueError('eventual must be False when in a transaction')

lookup_request = datastore_pb.LookupRequest()

opts = lookup_request.read_options
ro_enum = datastore_pb.ReadOptions.ReadConsistency
if eventual:
opts.read_consistency = ro_enum.Value('EVENTUAL')

This comment was marked as spam.

This comment was marked as spam.

elif transaction:
opts.transaction = transaction

single_key = isinstance(key_pbs, datastore_pb.Key)

if single_key:
Expand All @@ -235,19 +254,42 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)

results, m_found, d_found = self._lookup(lookup_request, dataset_id,

This comment was marked as spam.

deferred is not None)

if missing is not None:
missing.extend(m_found)

if deferred is not None:
deferred.extend(d_found)

if single_key:
if results:
return results[0]
else:
return None

return results

def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
"""Repeat lookup until all keys found (unless stop requested).

Helper method for ``lookup()``.
"""
results = []
missing = []
deferred = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results.extend(
[result.entity for result in lookup_response.found])

if missing is not None:
missing.extend(
[result.entity for result in lookup_response.missing])
missing.extend(
[result.entity for result in lookup_response.missing])

if deferred is not None:
if stop_on_deferred:
deferred.extend([key for key in lookup_response.deferred])
break

Expand All @@ -257,16 +299,9 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
return results, missing, deferred

if single_key:
if results:
return results[0]
else:
return None

return results

def run_query(self, dataset_id, query_pb, namespace=None):
def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
"""Run a query on the Cloud Datastore.

Maps the ``DatastoreService.RunQuery`` protobuf RPC.
Expand Down Expand Up @@ -310,8 +345,24 @@ def run_query(self, dataset_id, query_pb, namespace=None):

:type namespace: string
:param namespace: The namespace over which to run the query.

:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.
"""
transaction = self.transaction()

This comment was marked as spam.

This comment was marked as spam.

if eventual and transaction:
raise ValueError('eventual must be False when in a transaction')

request = datastore_pb.RunQueryRequest()
opts = request.read_options

This comment was marked as spam.

ro_enum = datastore_pb.ReadOptions.ReadConsistency
if eventual:
opts.read_consistency = ro_enum.Value('EVENTUAL')
elif transaction:
opts.transaction = transaction

if namespace:
request.partition_id.namespace = namespace
Expand Down
174 changes: 174 additions & 0 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,78 @@ def test_lookup_single_key_empty_response(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)

def test_lookup_single_key_empty_response_w_eventual(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
ro_enum = datastore_pb.ReadOptions.ReadConsistency
self.assertEqual(request.read_options.read_consistency,
ro_enum.Value('EVENTUAL'))
self.assertEqual(request.read_options.transaction, '')

def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)

def test_lookup_single_key_empty_response_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_lookup_single_key_nonempty_response(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key
Expand Down Expand Up @@ -443,6 +515,108 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb2)

def test_run_query_w_eventual_no_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb,
eventual=True)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
ro_enum = datastore_pb.ReadOptions.ReadConsistency
self.assertEqual(request.read_options.read_consistency,
ro_enum.Value('EVENTUAL'))
self.assertEqual(request.read_options.transaction, '')

def test_run_query_wo_eventual_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
ro_enum = datastore_pb.ReadOptions.ReadConsistency
self.assertEqual(request.read_options.read_consistency,
ro_enum.Value('DEFAULT'))
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_run_query_w_eventual_and_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)

def test_run_query_wo_namespace_empty_result(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query
Expand Down