Skip to content

Commit

Permalink
blacken api_core and core (googleapis#6668)
Browse files Browse the repository at this point in the history
* blacken api_core and core
  • Loading branch information
crwilcox authored and erikwebb committed Dec 3, 2018
1 parent 36b7ac1 commit 3fd85ed
Show file tree
Hide file tree
Showing 65 changed files with 2,204 additions and 2,100 deletions.
1 change: 1 addition & 0 deletions api_core/.flake8
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import-order-style=google
# Note: this forces all google imports to be in the third group. See
# https://github.com/PyCQA/flake8-import-order/issues/111
application-import-names=google
ignore = E203, E266, E501, W503
exclude =
__pycache__,
.git,
Expand Down
2 changes: 2 additions & 0 deletions api_core/google/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

try:
import pkg_resources

pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil

__path__ = pkgutil.extend_path(__path__, __name__)
2 changes: 1 addition & 1 deletion api_core/google/api_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
from pkg_resources import get_distribution


__version__ = get_distribution('google-api-core').version
__version__ = get_distribution("google-api-core").version
89 changes: 46 additions & 43 deletions api_core/google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from google.api_core import exceptions

_LOGGER = logging.getLogger(__name__)
_BIDIRECTIONAL_CONSUMER_NAME = 'Thread-ConsumeBidirectionalStream'
_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"


class _RequestQueueGenerator(object):
Expand Down Expand Up @@ -79,6 +79,7 @@ class _RequestQueueGenerator(object):
easily restarting streams that require some initial configuration
request.
"""

def __init__(self, queue, period=1, initial_request=None):
self._queue = queue
self._period = period
Expand Down Expand Up @@ -107,8 +108,8 @@ def __iter__(self):
except queue.Empty:
if not self._is_active():
_LOGGER.debug(
'Empty queue and inactive call, exiting request '
'generator.')
"Empty queue and inactive call, exiting request " "generator."
)
return
else:
# call is still active, keep waiting for queue items.
Expand All @@ -117,16 +118,17 @@ def __iter__(self):
# The consumer explicitly sent "None", indicating that the request
# should end.
if item is None:
_LOGGER.debug('Cleanly exiting request generator.')
_LOGGER.debug("Cleanly exiting request generator.")
return

if not self._is_active():
# We have an item, but the call is closed. We should put the
# item back on the queue so that the next call can consume it.
self._queue.put(item)
_LOGGER.debug(
'Inactive call, replacing item on queue and exiting '
'request generator.')
"Inactive call, replacing item on queue and exiting "
"request generator."
)
return

yield item
Expand Down Expand Up @@ -164,6 +166,7 @@ class BidiRpc(object):
yield. This is useful if an initial request is needed to start the
stream.
"""

def __init__(self, start_rpc, initial_request=None):
self._start_rpc = start_rpc
self._initial_request = initial_request
Expand Down Expand Up @@ -192,17 +195,18 @@ def _on_call_done(self, future):
def open(self):
"""Opens the stream."""
if self.is_active:
raise ValueError('Can not open an already open stream.')
raise ValueError("Can not open an already open stream.")

request_generator = _RequestQueueGenerator(
self._request_queue, initial_request=self._initial_request)
self._request_queue, initial_request=self._initial_request
)
call = self._start_rpc(iter(request_generator))

request_generator.call = call

# TODO: api_core should expose the future interface for wrapped
# callables as well.
if hasattr(call, '_wrapped'): # pragma: NO COVER
if hasattr(call, "_wrapped"): # pragma: NO COVER
call._wrapped.add_done_callback(self._on_call_done)
else:
call.add_done_callback(self._on_call_done)
Expand Down Expand Up @@ -232,8 +236,7 @@ def send(self, request):
request (protobuf.Message): The request to send.
"""
if self.call is None:
raise ValueError(
'Can not send() on an RPC that has never been open()ed.')
raise ValueError("Can not send() on an RPC that has never been open()ed.")

# Don't use self.is_active(), as ResumableBidiRpc will overload it
# to mean something semantically different.
Expand All @@ -254,8 +257,7 @@ def recv(self):
protobuf.Message: The received message.
"""
if self.call is None:
raise ValueError(
'Can not recv() on an RPC that has never been open()ed.')
raise ValueError("Can not recv() on an RPC that has never been open()ed.")

return next(self.call)

Expand Down Expand Up @@ -309,6 +311,7 @@ def should_recover(exc):
True if the stream should be recovered. This will be called
whenever an error is encountered on the stream.
"""

def __init__(self, start_rpc, should_recover, initial_request=None):
super(ResumableBidiRpc, self).__init__(start_rpc, initial_request)
self._should_recover = should_recover
Expand All @@ -334,14 +337,14 @@ def _on_call_done(self, future):
if not self._should_recover(future):
self._finalize(future)
else:
_LOGGER.debug('Re-opening stream from gRPC callback.')
_LOGGER.debug("Re-opening stream from gRPC callback.")
self._reopen()

def _reopen(self):
with self._operational_lock:
# Another thread already managed to re-open this stream.
if self.call is not None and self.call.is_active():
_LOGGER.debug('Stream was already re-established.')
_LOGGER.debug("Stream was already re-established.")
return

self.call = None
Expand All @@ -362,11 +365,11 @@ def _reopen(self):
# If re-opening or re-calling the method fails for any reason,
# consider it a terminal error and finalize the stream.
except Exception as exc:
_LOGGER.debug('Failed to re-open stream due to %s', exc)
_LOGGER.debug("Failed to re-open stream due to %s", exc)
self._finalize(exc)
raise

_LOGGER.info('Re-established stream')
_LOGGER.info("Re-established stream")

def _recoverable(self, method, *args, **kwargs):
"""Wraps a method to recover the stream and retry on error.
Expand All @@ -388,18 +391,15 @@ def _recoverable(self, method, *args, **kwargs):

except Exception as exc:
with self._operational_lock:
_LOGGER.debug(
'Call to retryable %r caused %s.', method, exc)
_LOGGER.debug("Call to retryable %r caused %s.", method, exc)

if not self._should_recover(exc):
self.close()
_LOGGER.debug(
'Not retrying %r due to %s.', method, exc)
_LOGGER.debug("Not retrying %r due to %s.", method, exc)
self._finalize(exc)
raise exc

_LOGGER.debug(
'Re-opening stream from retryable %r.', method)
_LOGGER.debug("Re-opening stream from retryable %r.", method)
self._reopen()

def _send(self, request):
Expand All @@ -414,8 +414,7 @@ def _send(self, request):
call = self.call

if call is None:
raise ValueError(
'Can not send() on an RPC that has never been open()ed.')
raise ValueError("Can not send() on an RPC that has never been open()ed.")

# Don't use self.is_active(), as ResumableBidiRpc will overload it
# to mean something semantically different.
Expand All @@ -434,8 +433,7 @@ def _recv(self):
call = self.call

if call is None:
raise ValueError(
'Can not recv() on an RPC that has never been open()ed.')
raise ValueError("Can not recv() on an RPC that has never been open()ed.")

return next(call)

Expand Down Expand Up @@ -493,6 +491,7 @@ def on_response(response):
on_response (Callable[[protobuf.Message], None]): The callback to
be called for every response on the stream.
"""

def __init__(self, bidi_rpc, on_response):
self._bidi_rpc = bidi_rpc
self._on_response = on_response
Expand Down Expand Up @@ -522,43 +521,47 @@ def _thread_main(self):
# Python 2.7.
with self._wake:
if self._paused:
_LOGGER.debug('paused, waiting for waking.')
_LOGGER.debug("paused, waiting for waking.")
self._wake.wait()
_LOGGER.debug('woken.')
_LOGGER.debug("woken.")

_LOGGER.debug('waiting for recv.')
_LOGGER.debug("waiting for recv.")
response = self._bidi_rpc.recv()
_LOGGER.debug('recved response.')
_LOGGER.debug("recved response.")
self._on_response(response)

except exceptions.GoogleAPICallError as exc:
_LOGGER.debug(
'%s caught error %s and will exit. Generally this is due to '
'the RPC itself being cancelled and the error will be '
'surfaced to the calling code.',
_BIDIRECTIONAL_CONSUMER_NAME, exc, exc_info=True)
"%s caught error %s and will exit. Generally this is due to "
"the RPC itself being cancelled and the error will be "
"surfaced to the calling code.",
_BIDIRECTIONAL_CONSUMER_NAME,
exc,
exc_info=True,
)

except Exception as exc:
_LOGGER.exception(
'%s caught unexpected exception %s and will exit.',
_BIDIRECTIONAL_CONSUMER_NAME, exc)
"%s caught unexpected exception %s and will exit.",
_BIDIRECTIONAL_CONSUMER_NAME,
exc,
)

else:
_LOGGER.error(
'The bidirectional RPC exited.')
_LOGGER.error("The bidirectional RPC exited.")

_LOGGER.info('%s exiting', _BIDIRECTIONAL_CONSUMER_NAME)
_LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)

def start(self):
"""Start the background thread and begin consuming the thread."""
with self._operational_lock:
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._thread_main)
name=_BIDIRECTIONAL_CONSUMER_NAME, target=self._thread_main
)
thread.daemon = True
thread.start()
self._thread = thread
_LOGGER.debug('Started helper thread %s', thread.name)
_LOGGER.debug("Started helper thread %s", thread.name)

def stop(self):
"""Stop consuming the stream and shutdown the background thread."""
Expand Down
Loading

0 comments on commit 3fd85ed

Please sign in to comment.