Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry streaming exceptions (redux) #3930

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions docs/spanner/snapshot-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,6 @@ fails if the result set is too large,
manually, perform all iteration within the context of the
``with database.snapshot()`` block.

.. note::

If streaming a chunk raises an exception, the application can
retry the ``read``, passing the ``resume_token`` from ``StreamingResultSet``
which raised the error. E.g.:

.. code:: python

result = snapshot.read(table, columns, keys)
while True:
try:
for row in result.rows:
print row
except Exception:
result = snapshot.read(
table, columns, keys, resume_token=result.resume_token)
continue
else:
break



Execute a SQL Select Statement
Expand Down Expand Up @@ -112,26 +92,6 @@ fails if the result set is too large,
manually, perform all iteration within the context of the
``with database.snapshot()`` block.

.. note::

If streaming a chunk raises an exception, the application can
retry the query, passing the ``resume_token`` from ``StreamingResultSet``
which raised the error. E.g.:

.. code:: python

result = snapshot.execute_sql(QUERY)
while True:
try:
for row in result.rows:
print row
except Exception:
result = snapshot.execute_sql(
QUERY, resume_token=result.resume_token)
continue
else:
break


Next Step
---------
Expand Down
6 changes: 0 additions & 6 deletions docs/spanner/transaction-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ fails if the result set is too large,
for row in result.rows:
print(row)

.. note::

If streaming a chunk fails due to a "resumable" error,
:meth:`Session.read` retries the ``StreamingRead`` API reqeust,
passing the ``resume_token`` from the last partial result streamed.


Execute a SQL Select Statement
------------------------------
Expand Down
17 changes: 4 additions & 13 deletions spanner/google/cloud/spanner/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ def snapshot(self, **kw):

return Snapshot(self, **kw)

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
def read(self, table, columns, keyset, index='', limit=0):
"""Perform a ``StreamingRead`` API request for rows in a table.

:type table: str
Expand All @@ -185,17 +184,12 @@ def read(self, table, columns, keyset, index='', limit=0,
:type limit: int
:param limit: (Optional) maxiumn number of rows to return

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted read

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().read(
table, columns, keyset, index, limit, resume_token)
return self.snapshot().read(table, columns, keyset, index, limit)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):
def execute_sql(self, sql, params=None, param_types=None, query_mode=None):
"""Perform an ``ExecuteStreamingSql`` API request.

:type sql: str
Expand All @@ -216,14 +210,11 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted query

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().execute_sql(
sql, params, param_types, query_mode, resume_token)
sql, params, param_types, query_mode)

def batch(self):
"""Factory to create a batch for this session.
Expand Down
34 changes: 18 additions & 16 deletions spanner/google/cloud/spanner/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Model a set of read-only queries to a database as a snapshot."""

import functools

from google.protobuf.struct_pb2 import Struct
from google.cloud.proto.spanner.v1.transaction_pb2 import TransactionOptions
from google.cloud.proto.spanner.v1.transaction_pb2 import TransactionSelector
Expand Down Expand Up @@ -49,8 +51,7 @@ def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
"""
raise NotImplementedError

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
def read(self, table, columns, keyset, index='', limit=0):
"""Perform a ``StreamingRead`` API request for rows in a table.

:type table: str
Expand All @@ -69,9 +70,6 @@ def read(self, table, columns, keyset, index='', limit=0,
:type limit: int
:param limit: (Optional) maxiumn number of rows to return

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted read

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises ValueError:
Expand All @@ -92,17 +90,20 @@ def read(self, table, columns, keyset, index='', limit=0,
iterator = api.streaming_read(
self._session.name, table, columns, keyset.to_pb(),
transaction=transaction, index=index, limit=limit,
resume_token=resume_token, options=options)
options=options)

self._read_request_count += 1

restart = functools.partial(
api.streaming_read, self._session.name, table, columns, keyset,

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

index=index, limit=limit)

if self._multi_use:
return StreamedResultSet(iterator, source=self)
return StreamedResultSet(iterator, restart, source=self)
else:
return StreamedResultSet(iterator)
return StreamedResultSet(iterator, restart)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):
def execute_sql(self, sql, params=None, param_types=None, query_mode=None):
"""Perform an ``ExecuteStreamingSql`` API request for rows in a table.

:type sql: str
Expand All @@ -122,9 +123,6 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted query

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises ValueError:
Expand Down Expand Up @@ -153,14 +151,18 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
iterator = api.execute_streaming_sql(
self._session.name, sql,
transaction=transaction, params=params_pb, param_types=param_types,
query_mode=query_mode, resume_token=resume_token, options=options)
query_mode=query_mode, options=options)

self._read_request_count += 1

restart = functools.partial(
api.execute_streaming_sql, self._session.name, sql,
params=params, param_types=param_types, query_mode=query_mode)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


if self._multi_use:
return StreamedResultSet(iterator, source=self)
return StreamedResultSet(iterator, restart, source=self)
else:
return StreamedResultSet(iterator)
return StreamedResultSet(iterator, restart)


class Snapshot(_SnapshotBase):
Expand Down
87 changes: 61 additions & 26 deletions spanner/google/cloud/spanner/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ class StreamedResultSet(object):
:class:`google.cloud.proto.spanner.v1.result_set_pb2.PartialResultSet`
instances.

:type restart: callable
:param restart:
Function (typically curried via :func:`functools.partial`) used to
restart the initial request if a retriable error is raised during
streaming.

:type source: :class:`~google.cloud.spanner.snapshot.Snapshot`
:param source: Snapshot from which the result set was fetched.
"""
def __init__(self, response_iterator, source=None):
def __init__(self, response_iterator, restart, source=None):
self._response_iterator = response_iterator
self._rows = [] # Fully-processed rows
self._restart = restart
self._pending_rows = [] # Rows pending new token / EOT
self._complete_rows = [] # Fully-processed rows
self._counter = 0 # Counter for processed responses
self._metadata = None # Until set from first PRS
self._stats = None # Until set from last PRS
Expand All @@ -55,7 +63,7 @@ def rows(self):
:rtype: list of row-data lists.
:returns: list of completed row data, from proceesd PRS responses.
"""
return self._rows
return self._complete_rows

@property
def fields(self):
Expand Down Expand Up @@ -122,17 +130,50 @@ def _merge_values(self, values):
field = self.fields[index]
self._current_row.append(_parse_value_pb(value, field.type))
if len(self._current_row) == width:
self._rows.append(self._current_row)
self._pending_rows.append(self._current_row)
self._current_row = []

def _flush_pending_rows(self):
"""Helper for :meth:`consume_next`."""
flushed = self._pending_rows[:]
self._pending_rows[:] = ()
self._complete_rows.extend(flushed)

def _do_restart(self):
"""Helper for :meth:`consume_next`."""
self._pending_chunk = None
self._pending_rows[:] = ()
self._current_row[:] = ()

This comment was marked as spam.

This comment was marked as spam.


if self._resume_token:
self._response_iterator = self._restart(self._resume_token)
else:
self._response_iterator = self._restart()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


def consume_next(self):

This comment was marked as spam.

This comment was marked as spam.

"""Consume the next partial result set from the stream.

Parse the result set into new/existing rows in :attr:`_rows`
Parse the result set into new/existing rows in :attr:`_complete_rows`

:raises StopIteration: if the iterator is empty.
"""
response = six.next(self._response_iterator)
try:
response = six.next(self._response_iterator)
except exceptions.ServiceUnavailable:
self._do_restart()
return
except StopIteration:
self._flush_pending_rows()
raise

self._counter += 1
self._resume_token = response.resume_token
if response.resume_token:
self._flush_pending_rows()
self._resume_token = response.resume_token

if response.HasField('stats'): # last response
self._flush_pending_rows()
self._stats = response.stats

if self._metadata is None: # first response
metadata = self._metadata = response.metadata
Expand All @@ -141,9 +182,6 @@ def consume_next(self):
if source is not None and source._transaction_id is None:
source._transaction_id = metadata.transaction.id

if response.HasField('stats'): # last response
self._stats = response.stats

values = list(response.values)
if self._pending_chunk is not None:
values[0] = self._merge_chunk(values[0])
Expand All @@ -162,11 +200,15 @@ def consume_all(self):
break

def __iter__(self):
iter_rows, self._rows[:] = self._rows[:], ()
iter_rows, self._complete_rows[:] = self._complete_rows[:], ()
while True:
if not iter_rows:
self.consume_next() # raises StopIteration
iter_rows, self._rows[:] = self._rows[:], ()
try:
self.consume_next() # raises StopIteration
except StopIteration:
if not self._complete_rows:
raise
iter_rows, self._complete_rows[:] = self._complete_rows[:], ()
while iter_rows:
yield iter_rows.pop(0)

Expand All @@ -179,6 +221,7 @@ def one(self):
in whole or in part.
"""
answer = self.one_or_none()

if answer is None:
raise exceptions.NotFound('No rows matched the given query.')
return answer
Expand All @@ -196,21 +239,13 @@ def one_or_none(self):
raise RuntimeError('Can not call `.one` or `.one_or_none` after '
'stream consumption has already started.')

# Consume the first result of the stream.
# If there is no first result, then return None.
iterator = iter(self)
try:
answer = next(iterator)
except StopIteration:
return None
self.consume_all()

# Attempt to consume more. This should no-op; if we get additional
# rows, then this is an error case.
try:
next(iterator)
if len(self._complete_rows) > 1:
raise ValueError('Expected one result; got more.')
except StopIteration:
return answer

if self._complete_rows:
return self._complete_rows[0]


class Unmergeable(ValueError):
Expand Down
Loading