Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cursor handling to datastore.query.Query. #204

Merged
merged 3 commits into from
Oct 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def run_query(self, dataset_id, query_pb, namespace=None):
Under the hood this is doing...

>>> connection.run_query('dataset-id', query.to_protobuf())
[<list of Entity Protobufs>]
[<list of Entity Protobufs>], cursor, more_results, skipped_results

:type dataset_id: string
:param dataset_id: The ID of the dataset over which to run the query.
Expand All @@ -205,7 +205,11 @@ def run_query(self, dataset_id, query_pb, namespace=None):
request.query.CopyFrom(query_pb)
response = self._rpc(dataset_id, 'runQuery', request,
datastore_pb.RunQueryResponse)
return [e.entity for e in response.batch.entity_result]
return ([e.entity for e in response.batch.entity_result],
response.batch.end_cursor,
response.batch.more_results,
response.batch.skipped_results,
)

def lookup(self, dataset_id, key_pbs):
"""Lookup keys from a dataset in the Cloud Datastore.
Expand Down
47 changes: 46 additions & 1 deletion gcloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from gcloud.datastore import helpers
from gcloud.datastore.entity import Entity
from gcloud.datastore.key import Key
import base64


class Query(object):
Expand Down Expand Up @@ -53,6 +54,7 @@ class Query(object):
def __init__(self, kind=None, dataset=None):
self._dataset = dataset
self._pb = datastore_pb.Query()
self._cursor = None

if kind:
self._pb.kind.add().name = kind
Expand Down Expand Up @@ -303,12 +305,55 @@ def fetch(self, limit=None):
if limit:
clone = self.limit(limit)

entity_pbs = self.dataset().connection().run_query(
(entity_pbs,
end_cursor,
more_results,
skipped_results) = self.dataset().connection().run_query(
query_pb=clone.to_protobuf(), dataset_id=self.dataset().id())

self._cursor = end_cursor
return [Entity.from_protobuf(entity, dataset=self.dataset())
for entity in entity_pbs]

def cursor(self):
"""Returns cursor ID

.. Caution:: Invoking this method on a query that has not yet been
executed will raise a RuntimeError.

:rtype: string
:returns: base64-encoded cursor ID string denoting the last position
consumed in the query's result set.
"""
if not self._cursor:
raise RuntimeError('No cursor')
return base64.b64encode(self._cursor)

def with_cursor(self, start_cursor, end_cursor=None):
"""Specifies the starting / ending positions in a query's result set.

:type start_cursor: bytes
:param start_cursor: Base64-encoded cursor string specifying where to
start reading query results.

:type end_cursor: bytes
:param end_cursor: Base64-encoded cursor string specifying where to stop
reading query results.

:rtype: :class:`Query`
:returns: If neither cursor is passed, returns self; else, returns a
clone of the :class:`Query`, with cursors updated.

"""
clone = self
if start_cursor or end_cursor:
clone = self._clone()
if start_cursor:
clone._pb.start_cursor = base64.b64decode(start_cursor)
if end_cursor:
clone._pb.end_cursor = base64.b64decode(end_cursor)
return clone

def order(self, *properties):
"""Adds a sort order to the query.

Expand Down
10 changes: 7 additions & 3 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ def test_run_query_wo_namespace_empty_result(self):
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.run_query(DATASET_ID, q_pb), [])
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
self.assertEqual(pbs, [])
self.assertEqual(end, '')
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self.assertEqual(cw['uri'], URI)
self.assertEqual(cw['method'], 'POST')
Expand Down Expand Up @@ -357,8 +361,8 @@ def test_run_query_w_namespace_nonempty_result(self):
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
result = conn.run_query(DATASET_ID, q_pb, 'NS')
returned, = result # One entity.
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb, 'NS')
returned, = pbs, # One entity.
cw = http._called_with
self.assertEqual(cw['uri'], URI)
self.assertEqual(cw['method'], 'POST')
Expand Down
82 changes: 81 additions & 1 deletion gcloud/datastore/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def test_fetch_default_limit(self):

def test_fetch_explicit_limit(self):
from gcloud.datastore.datastore_v1_pb2 import Entity
_CURSOR = 'CURSOR'
_DATASET = 'DATASET'
_KIND = 'KIND'
_ID = 123
Expand All @@ -213,10 +214,12 @@ def test_fetch_explicit_limit(self):
prop.name = 'foo'
prop.value.string_value = 'Foo'
connection = _Connection(entity_pb)
connection._cursor = _CURSOR
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
limited = query.limit(13)
entities = query.fetch(13)
self.assertEqual(query._cursor, _CURSOR)
self.assertEqual(len(entities), 1)
self.assertEqual(entities[0].key().path(),
[{'kind': _KIND, 'id': _ID}])
Expand All @@ -225,6 +228,80 @@ def test_fetch_explicit_limit(self):
'query_pb': limited.to_protobuf(),
})

def test_cursor_not_fetched(self):
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
self.assertRaises(RuntimeError, query.cursor)

def test_cursor_fetched(self):
import base64
_CURSOR = 'CURSOR'
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
query._cursor = _CURSOR
self.assertEqual(query.cursor(), base64.b64encode(_CURSOR))

def test_with_cursor_neither(self):
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
self.assertTrue(query.with_cursor(None) is query)

def test_with_cursor_w_start(self):
import base64
_CURSOR = 'CURSOR'
_CURSOR_B64 = base64.b64encode(_CURSOR)
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
after = query.with_cursor(_CURSOR_B64)
self.assertFalse(after is query)
q_pb = after.to_protobuf()
self.assertEqual(q_pb.start_cursor, _CURSOR)
self.assertEqual(q_pb.end_cursor, '')

def test_with_cursor_w_end(self):
import base64
_CURSOR = 'CURSOR'
_CURSOR_B64 = base64.b64encode(_CURSOR)
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
after = query.with_cursor(None, _CURSOR_B64)
self.assertFalse(after is query)
q_pb = after.to_protobuf()
self.assertEqual(q_pb.start_cursor, '')
self.assertEqual(q_pb.end_cursor, _CURSOR)

def test_with_cursor_w_both(self):
import base64
_START = 'START'
_START_B64 = base64.b64encode(_START)
_END = 'CURSOR'
_END_B64 = base64.b64encode(_END)
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
after = query.with_cursor(_START_B64, _END_B64)
self.assertFalse(after is query)
q_pb = after.to_protobuf()
self.assertEqual(q_pb.start_cursor, _START)
self.assertEqual(q_pb.end_cursor, _END)

def test_order_empty(self):
_KIND = 'KIND'
before = self._makeOne(_KIND)
Expand Down Expand Up @@ -285,10 +362,13 @@ def connection(self):

class _Connection(object):
_called_with = None
_cursor = ''
_more = True
_skipped = 0

def __init__(self, *result):
self._result = list(result)

def run_query(self, **kw):
self._called_with = kw
return self._result
return self._result, self._cursor, self._more, self._skipped