Skip to content

Commit

Permalink
Fixing broken exception handling after GAX 0.13.0 upgrade.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Sep 19, 2016
1 parent 5f1cda9 commit f916093
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 215 deletions.
19 changes: 0 additions & 19 deletions google/cloud/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@
except ImportError:
app_identity = None
try:
from google.gax.grpc import exc_to_code as beta_exc_to_code
import grpc
from grpc._channel import _Rendezvous
except ImportError: # pragma: NO COVER
beta_exc_to_code = None
grpc = None
_Rendezvous = Exception
import six
from six.moves import http_client
from six.moves import configparser
Expand Down Expand Up @@ -685,21 +681,6 @@ def make_insecure_stub(stub_class, host, port=None):
return stub_class(channel)


def exc_to_code(exc):
"""Retrieves the status code from a gRPC exception.
:type exc: :class:`Exception`
:param exc: An exception from gRPC beta or stable.
:rtype: :class:`grpc.StatusCode`
:returns: The status code attached to the exception.
"""
if isinstance(exc, _Rendezvous):
return exc.code()
else:
return beta_exc_to_code(exc)


try:
from pytz import UTC # pylint: disable=unused-import,wrong-import-order
except ImportError:
Expand Down
39 changes: 19 additions & 20 deletions google/cloud/logging/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@

from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.logging.type.log_severity_pb2 import LogSeverity
from google.logging.v2.logging_config_pb2 import LogSink
from google.logging.v2.logging_metrics_pb2 import LogMetric
from google.logging.v2.log_entry_pb2 import LogEntry
from google.protobuf.json_format import Parse
from grpc import StatusCode
from grpc._channel import _Rendezvous

# pylint: disable=ungrouped-imports
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud._helpers import exc_to_code
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import NotFound
# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -123,8 +122,8 @@ def logger_delete(self, project, logger_name):
path = 'projects/%s/logs/%s' % (project, logger_name)
try:
self._gax_api.delete_log(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise

Expand Down Expand Up @@ -195,8 +194,8 @@ def sink_create(self, project, sink_name, filter_, destination):
destination=destination)
try:
self._gax_api.create_sink(parent, sink_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except _Rendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
path = 'projects/%s/sinks/%s' % (project, sink_name)
raise Conflict(path)
raise
Expand All @@ -218,8 +217,8 @@ def sink_get(self, project, sink_name):
path = 'projects/%s/sinks/%s' % (project, sink_name)
try:
sink_pb = self._gax_api.get_sink(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_sink_pb_to_mapping(sink_pb)
Expand Down Expand Up @@ -250,8 +249,8 @@ def sink_update(self, project, sink_name, filter_, destination):
sink_pb = LogSink(name=path, filter=filter_, destination=destination)
try:
self._gax_api.update_sink(path, sink_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_sink_pb_to_mapping(sink_pb)
Expand All @@ -269,8 +268,8 @@ def sink_delete(self, project, sink_name):
path = 'projects/%s/sinks/%s' % (project, sink_name)
try:
self._gax_api.delete_sink(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise

Expand Down Expand Up @@ -340,8 +339,8 @@ def metric_create(self, project, metric_name, filter_, description):
description=description)
try:
self._gax_api.create_log_metric(parent, metric_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except _Rendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
path = 'projects/%s/metrics/%s' % (project, metric_name)
raise Conflict(path)
raise
Expand All @@ -363,8 +362,8 @@ def metric_get(self, project, metric_name):
path = 'projects/%s/metrics/%s' % (project, metric_name)
try:
metric_pb = self._gax_api.get_log_metric(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_metric_pb_to_mapping(metric_pb)
Expand Down Expand Up @@ -395,8 +394,8 @@ def metric_update(self, project, metric_name, filter_, description):
description=description)
try:
self._gax_api.update_log_metric(path, metric_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_metric_pb_to_mapping(metric_pb)
Expand All @@ -414,8 +413,8 @@ def metric_delete(self, project, metric_name):
path = 'projects/%s/metrics/%s' % (project, metric_name)
try:
self._gax_api.delete_log_metric(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise

Expand Down
57 changes: 28 additions & 29 deletions google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi
from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from google.pubsub.v1.pubsub_pb2 import PushConfig
from grpc.beta.implementations import insecure_channel
from grpc import insecure_channel
from grpc import StatusCode
from grpc._channel import _Rendezvous

# pylint: disable=ungrouped-imports
from google.cloud._helpers import _to_bytes
from google.cloud._helpers import exc_to_code
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -93,8 +92,8 @@ def topic_create(self, topic_path):
"""
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except _Rendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return {'name': topic_pb.name}
Expand All @@ -116,8 +115,8 @@ def topic_get(self, topic_path):
"""
try:
topic_pb = self._gax_api.get_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return {'name': topic_pb.name}
Expand All @@ -134,8 +133,8 @@ def topic_delete(self, topic_path):
"""
try:
self._gax_api.delete_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise

Expand Down Expand Up @@ -163,8 +162,8 @@ def topic_publish(self, topic_path, messages):
try:
result = self._gax_api.publish(topic_path, message_pbs,
options=options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return result.message_ids
Expand Down Expand Up @@ -201,8 +200,8 @@ def topic_list_subscriptions(self, topic_path, page_size=0,
try:
page_iter = self._gax_api.list_topic_subscriptions(
topic_path, page_size=page_size, options=options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
subs = page_iter.next()
Expand Down Expand Up @@ -294,8 +293,8 @@ def subscription_create(self, subscription_path, topic_path,
try:
sub_pb = self._gax_api.create_subscription(
subscription_path, topic_path, push_config, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except _Rendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return _subscription_pb_to_mapping(sub_pb)
Expand All @@ -316,8 +315,8 @@ def subscription_get(self, subscription_path):
"""
try:
sub_pb = self._gax_api.get_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return _subscription_pb_to_mapping(sub_pb)
Expand All @@ -335,8 +334,8 @@ def subscription_delete(self, subscription_path):
"""
try:
self._gax_api.delete_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand All @@ -360,8 +359,8 @@ def subscription_modify_push_config(self, subscription_path,
push_config = PushConfig(push_endpoint=push_endpoint)
try:
self._gax_api.modify_push_config(subscription_path, push_config)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand Down Expand Up @@ -392,8 +391,8 @@ def subscription_pull(self, subscription_path, return_immediately=False,
try:
response_pb = self._gax_api.pull(
subscription_path, max_messages, return_immediately)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return [_received_message_pb_to_mapping(rmpb)
Expand All @@ -415,8 +414,8 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
"""
try:
self._gax_api.acknowledge(subscription_path, ack_ids)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand All @@ -442,8 +441,8 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
try:
self._gax_api.modify_ack_deadline(
subscription_path, ack_ids, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except _Rendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand Down Expand Up @@ -520,7 +519,7 @@ def make_gax_publisher_api(connection):
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
channel = insecure_channel(connection.host)
return PublisherApi(channel=channel)


Expand All @@ -540,5 +539,5 @@ def make_gax_subscriber_api(connection):
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
channel = insecure_channel(connection.host)
return SubscriberApi(channel=channel)
6 changes: 2 additions & 4 deletions system_tests/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
import os
import unittest

from google.gax.errors import GaxError
from grpc import StatusCode
from grpc._channel import _Rendezvous
import httplib2

# pylint: disable=ungrouped-imports
from google.cloud import _helpers
from google.cloud.environment_vars import PUBSUB_EMULATOR
from google.cloud.pubsub import client
# pylint: enable=ungrouped-imports
Expand All @@ -34,10 +32,10 @@


def _unavailable(exc):
return _helpers.exc_to_code(exc) == StatusCode.UNAVAILABLE
return exc.code() == StatusCode.UNAVAILABLE


retry_unavailable = RetryErrors((GaxError, _Rendezvous), _unavailable)
retry_unavailable = RetryErrors(_Rendezvous, _unavailable)


class Config(object):
Expand Down
6 changes: 5 additions & 1 deletion unit_tests/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ class _GAXBaseAPI(object):
def __init__(self, **kw):
self.__dict__.update(kw)

def _make_grpc_error(self, status_code):
def _make_grpc_error(self, status_code=None):
from grpc._channel import _Rendezvous
from grpc._channel import _RPCState
from grpc import StatusCode

if status_code is None:
status_code = StatusCode.UNKNOWN

details = 'Some error details.'
exc_state = _RPCState((), None, None, status_code, details)
Expand Down
Loading

0 comments on commit f916093

Please sign in to comment.