Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'eventual' flag to 'Connection.lookup()' / 'Connection.run_query()'. #449

Merged
merged 4 commits into from
Dec 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 69 additions & 22 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs):
kwargs['connection'] = self
return Dataset(*args, **kwargs)

def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
def lookup(self, dataset_id, key_pbs,
missing=None, deferred=None, eventual=False):
"""Lookup keys from a dataset in the Cloud Datastore.

Maps the ``DatastoreService.Lookup`` protobuf RPC.
Expand Down Expand Up @@ -211,13 +212,20 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
by the backend as "deferred" will be copied into it.
Use only as a keyword param.

:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.

:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:raises: ValueError if ``eventual`` is True
"""
if missing is not None and missing != []:
raise ValueError('missing must be None or an empty list')
Expand All @@ -226,6 +234,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
raise ValueError('deferred must be None or an empty list')

lookup_request = datastore_pb.LookupRequest()
self._set_read_options(lookup_request, eventual)

single_key = isinstance(key_pbs, datastore_pb.Key)

Expand All @@ -235,28 +244,14 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)

results = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)
results, missing_found, deferred_found = self._lookup(
lookup_request, dataset_id, deferred is not None)

results.extend(
[result.entity for result in lookup_response.found])
if missing is not None:
missing.extend(missing_found)

if missing is not None:
missing.extend(
[result.entity for result in lookup_response.missing])

if deferred is not None:
deferred.extend([key for key in lookup_response.deferred])
break

if not lookup_response.deferred:
break

# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
if deferred is not None:
deferred.extend(deferred_found)

if single_key:
if results:
Expand All @@ -266,7 +261,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):

return results

def run_query(self, dataset_id, query_pb, namespace=None):
def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
"""Run a query on the Cloud Datastore.

Maps the ``DatastoreService.RunQuery`` protobuf RPC.
Expand Down Expand Up @@ -310,8 +305,15 @@ def run_query(self, dataset_id, query_pb, namespace=None):

:type namespace: string
:param namespace: The namespace over which to run the query.

:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.
"""
request = datastore_pb.RunQueryRequest()
self._set_read_options(request, eventual)

if namespace:
request.partition_id.namespace = namespace
Expand Down Expand Up @@ -514,6 +516,51 @@ def delete_entities(self, dataset_id, key_pbs):

return True

def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
"""Repeat lookup until all keys found (unless stop requested).

Helper method for ``lookup()``.
"""
results = []
missing = []
deferred = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results.extend(
[result.entity for result in lookup_response.found])

missing.extend(
[result.entity for result in lookup_response.missing])

if stop_on_deferred:
deferred.extend([key for key in lookup_response.deferred])
break

if not lookup_response.deferred:
break

# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
return results, missing, deferred

def _set_read_options(self, request, eventual):
"""Validate rules for read options, and assign to the request.

Helper method for ``lookup()`` and ``run_query``.
"""
transaction = self.transaction()
if eventual and transaction:
raise ValueError('eventual must be False when in a transaction')

opts = request.read_options
if eventual:
opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL
elif transaction:
opts.transaction = transaction


def _copy_deferred_keys(lookup_request, lookup_response):
"""Clear requested keys and copy deferred keys back in.
Expand Down
171 changes: 171 additions & 0 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,77 @@ def test_lookup_single_key_empty_response(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)

def test_lookup_single_key_empty_response_w_eventual(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
self.assertEqual(request.read_options.read_consistency,
datastore_pb.ReadOptions.EVENTUAL)
self.assertEqual(request.read_options.transaction, '')

def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)

def test_lookup_single_key_empty_response_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_lookup_single_key_nonempty_response(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key
Expand Down Expand Up @@ -443,6 +514,106 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb2)

def test_run_query_w_eventual_no_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb,
eventual=True)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
self.assertEqual(request.read_options.read_consistency,
datastore_pb.ReadOptions.EVENTUAL)
self.assertEqual(request.read_options.transaction, '')

def test_run_query_wo_eventual_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
self.assertEqual(request.read_options.read_consistency,
datastore_pb.ReadOptions.DEFAULT)
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_run_query_w_eventual_and_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)

def test_run_query_wo_namespace_empty_result(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query
Expand Down