diff --git a/nameko_sentry.py b/nameko_sentry.py index f3abc0d..d1e7fd5 100644 --- a/nameko_sentry.py +++ b/nameko_sentry.py @@ -1,6 +1,5 @@ import logging -from eventlet.queue import Queue from nameko.extensions import DependencyProvider from raven import Client @@ -8,77 +7,72 @@ class SentryReporter(DependencyProvider): """ Send exceptions generated by entrypoints to a sentry server. """ - _gt = None - queue = None - client = None - - def _run(self): - - while True: - item = self.queue.get() - if item is None: - break - - exc_info, message, extra, data = item - self.client.captureException( - exc_info, message=message, extra=extra, data=data) - - # these will remain in scope until the next iteration and - # can potentially be large, so delete to reclaim the memory now - del exc_info, message, extra, data, item - - def start(self): - self._gt = self.container.spawn_managed_thread( - self._run, protected=True) - - def stop(self): - self.queue.put(None) - - if self._gt is not None: - self._gt.wait() - def setup(self): sentry_config = self.container.config.get('SENTRY') dsn = sentry_config['DSN'] kwargs = sentry_config.get('CLIENT_CONFIG', {}) + report_expected_exceptions = sentry_config.get( + 'REPORT_EXPECTED_EXCEPTIONS', True + ) - self.queue = Queue() self.client = Client(dsn, **kwargs) + self.report_expected_exceptions = report_expected_exceptions + + def format_message(self, worker_ctx, exc_info): + exc_type, exc, _ = exc_info + return ( + 'Unhandled exception in call {}: ' + '{} {!r}'.format(worker_ctx.call_id, exc_type.__name__, str(exc)) + ) + + def is_expected_exception(self, worker_ctx, exc_info): + _, exc, _ = exc_info + expected_exceptions = getattr( + worker_ctx.entrypoint, 'expected_exceptions', tuple()) + return isinstance(exc, expected_exceptions) + + def build_tags(self, worker_ctx, exc_info): + return { + 'call_id': worker_ctx.call_id, + 'parent_call_id': worker_ctx.immediate_parent_call_id, + } + + def build_extra(self, worker_ctx, exc_info): + _, exc, _ = exc_info + return { + 'exc': exc + } def worker_result(self, worker_ctx, result, exc_info): if exc_info is None: return + self.capture_exception(worker_ctx, exc_info) - exc = exc_info[1] - call_id = worker_ctx.call_id - parent_call_id = worker_ctx.immediate_parent_call_id + def capture_exception(self, worker_ctx, exc_info): - expected_exceptions = getattr( - worker_ctx.entrypoint, 'expected_exceptions', tuple()) + logger = '{}.{}'.format( + worker_ctx.service_name, worker_ctx.entrypoint.method_name + ) - level = logging.ERROR - if expected_exceptions and isinstance(exc, expected_exceptions): + if self.is_expected_exception(worker_ctx, exc_info): + if not self.report_expected_exceptions: + return # nothing to do level = logging.WARNING + else: + level = logging.ERROR - message = ( - 'Unhandled exception in call {}: ' - '{} {!r}'.format(call_id, exc_info[0].__name__, str(exc)) - ) - - logger = '{}.{}'.format( - worker_ctx.service_name, worker_ctx.entrypoint.method_name) + message = self.format_message(worker_ctx, exc_info) + extra = self.build_extra(worker_ctx, exc_info) + tags = self.build_tags(worker_ctx, exc_info) data = { 'logger': logger, 'level': level, 'message': message, - 'tags': { - 'call_id': call_id, - 'parent_call_id': parent_call_id, - }, + 'tags': tags } - extra = {'exc': exc} - - self.queue.put((exc_info, message, extra, data)) + self.client.captureException( + exc_info, message=message, extra=extra, data=data + ) diff --git a/test_nameko_sentry.py b/test_nameko_sentry.py index 618043b..908d498 100644 --- a/test_nameko_sentry.py +++ b/test_nameko_sentry.py @@ -1,15 +1,18 @@ import logging +import socket import pytest from eventlet.event import Event -from eventlet.queue import Queue -from mock import Mock, call, patch +from mock import Mock, patch from nameko.containers import WorkerContext from nameko.extensions import Entrypoint from nameko.testing.services import dummy, entrypoint_hook, entrypoint_waiter -from nameko.testing.utils import get_extension +from nameko.web.handlers import http from nameko_sentry import SentryReporter -from raven.transport.threaded import ThreadedHTTPTransport +from raven import Client +from raven.transport.eventlet import EventletHTTPTransport + +from six.moves.urllib import parse class CustomException(Exception): @@ -20,7 +23,7 @@ class CustomException(Exception): def config(): return { 'SENTRY': { - 'DSN': 'http://user:pass@localhost:9000/1', + 'DSN': 'eventlet+http://user:pass@localhost:9000/1', 'CLIENT_CONFIG': { 'site': 'site name' } @@ -48,11 +51,11 @@ def container(config): return Mock(config=config) -@pytest.fixture(params=[tuple(), CustomException]) # expected exceptions -def worker_ctx(request, container): +@pytest.fixture +def worker_ctx(container): service = Mock() - entrypoint = Mock(spec=Entrypoint, expected_exceptions=request.param) + entrypoint = Mock(spec=Entrypoint, expected_exceptions=CustomException) args = ("a", "b", "c") kwargs = {"d": "d", "e": "e"} data = { @@ -67,9 +70,10 @@ def worker_ctx(request, container): ) -@pytest.fixture +@pytest.yield_fixture def reporter(container): - return SentryReporter().bind(container, "sentry") + with patch.object(Client, 'captureException'): + yield SentryReporter().bind(container, "sentry") def test_setup(reporter): @@ -82,10 +86,7 @@ def test_setup(reporter): # transport set correctly transport = reporter.client.remote.get_transport() - assert isinstance(transport, ThreadedHTTPTransport) - - # queue created - assert isinstance(reporter.queue, Queue) + assert isinstance(transport, EventletHTTPTransport) def test_setup_without_optional_config(config): @@ -102,10 +103,7 @@ def test_setup_without_optional_config(config): # transport set correctly transport = reporter.client.remote.get_transport() - assert isinstance(transport, ThreadedHTTPTransport) - - # queue created - assert isinstance(reporter.queue, Queue) + assert isinstance(transport, EventletHTTPTransport) def test_disabled(config): @@ -126,125 +124,152 @@ def test_worker_result(reporter, worker_ctx): reporter.setup() reporter.worker_result(worker_ctx, result, None) - assert reporter.queue.qsize() == 0 - + assert reporter.client.captureException.call_count == 0 -def test_worker_exception(reporter, worker_ctx): - exc = CustomException("Error!") - exc_info = (CustomException, exc, None) +@pytest.mark.parametrize("exception_cls,expected_level", [ + (CustomException, logging.WARNING), + (KeyError, logging.ERROR) +]) +def test_worker_exception( + exception_cls, expected_level, reporter, worker_ctx +): + exc = exception_cls("Error!") + exc_info = (exception_cls, exc, None) reporter.setup() reporter.worker_result(worker_ctx, None, exc_info) # generate expected call args - logger = "{}.{}".format( - worker_ctx.service_name, worker_ctx.entrypoint.method_name) + expected_logger = "{}.{}".format( + worker_ctx.service_name, worker_ctx.entrypoint.method_name + ) expected_message = "Unhandled exception in call {}: {} {!r}".format( - worker_ctx.call_id, CustomException.__name__, str(exc) + worker_ctx.call_id, exception_cls.__name__, str(exc) ) expected_extra = {'exc': exc} - - if isinstance(exc, worker_ctx.entrypoint.expected_exceptions): - loglevel = logging.WARNING - else: - loglevel = logging.ERROR - + expected_tags = { + 'call_id': worker_ctx.call_id, + 'parent_call_id': worker_ctx.immediate_parent_call_id + } expected_data = { - 'logger': logger, - 'level': loglevel, + 'logger': expected_logger, + 'level': expected_level, 'message': expected_message, - 'tags': { - 'call_id': worker_ctx.call_id, - 'parent_call_id': worker_ctx.immediate_parent_call_id - } + 'tags': expected_tags } - assert reporter.queue.qsize() == 1 + assert reporter.client.captureException.call_count == 1 - _, message, extra, data = reporter.queue.get() - assert message == expected_message - assert extra == expected_extra - assert data == expected_data + _, kwargs = reporter.client.captureException.call_args + assert kwargs['message'] == expected_message + assert kwargs['extra'] == expected_extra + assert kwargs['data'] == expected_data -def test_run(reporter): +@pytest.mark.parametrize("exception_cls,expected_count", [ + (CustomException, 0), + (KeyError, 1) +]) +def test_expected_exception_not_reported( + exception_cls, expected_count, config, worker_ctx +): - exc = CustomException("Error!") - exc_info = (CustomException, exc, None) + exc = exception_cls("Error!") + exc_info = (exception_cls, exc, None) - message = "message" - extra = "extra" - data = "data" + config['SENTRY']['REPORT_EXPECTED_EXCEPTIONS'] = False + container = Mock(config=config) + reporter = SentryReporter().bind(container, "sentry") reporter.setup() - reporter.queue.put((exc_info, message, extra, data)) - reporter.queue.put(None) + with patch.object(reporter.client, 'captureException') as capture: + reporter.worker_result(worker_ctx, None, exc_info) - with patch.object(reporter, 'client') as client: - reporter._run() + assert capture.call_count == expected_count - assert client.captureException.call_args_list == [ - call(exc_info, message=message, extra=extra, data=data) - ] +@patch.object(EventletHTTPTransport, '_send_payload') +def test_raven_transport_does_not_affect_container( + send_mock, container_factory, service_cls, config +): + """ Allowing raven to use the eventlet transport should not affect the + nameko container, even if raven blocks trying to make calls. + """ + def block(*args): + Event().wait() -def test_start(container_factory, service_cls, config): + send_mock.side_effect = block container = container_factory(service_cls, config) - reporter = get_extension(container, SentryReporter) - - reporter.setup() + container.start() - running = Event() + with entrypoint_hook(container, 'broken') as broken: + with entrypoint_waiter(container, 'broken'): + with pytest.raises(CustomException): + broken() - def run(): - running.send(True) + container.stop() - with patch.object(reporter, '_run', wraps=run) as patched_run: - reporter.start() - running.wait() - assert patched_run.call_count == 1 - assert reporter._gt is not None +class TestEndToEnd(object): + @pytest.fixture + def tracker(self): + return Mock() -def test_stop(container_factory, service_cls, config): + @pytest.fixture + def free_port(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + port = sock.getsockname()[1] + sock.close() + return port - container = container_factory(service_cls, config) - reporter = get_extension(container, SentryReporter) + @pytest.fixture + def sentry_dsn(self, free_port): + return 'eventlet+http://user:pass@localhost:{}/1'.format(free_port) - reporter.setup() - reporter.start() - assert not reporter._gt.dead - reporter.stop() - assert reporter._gt.dead + @pytest.fixture + def sentry_stub(self, container_factory, sentry_dsn, tracker): + """ Start a container to imitate a sentry server + """ - # subsequent stop has no adverse effect - reporter.stop() + class SentryStub(object): + name = "sentry" + @http('POST', "/api/1/store/") + def report(self, request): + tracker(request.get_data()) + return 200, "OK" -def test_stop_not_started(container_factory, service_cls, config): + address = parse.urlparse(sentry_dsn).netloc.split("@")[-1] + config = { + 'WEB_SERVER_ADDRESS': address + } - container = container_factory(service_cls, config) - reporter = get_extension(container, SentryReporter) + container = container_factory(SentryStub, config) + container.start() - reporter.setup() - assert reporter._gt is None - reporter.stop() + return container + @pytest.fixture + def config(self, config, sentry_dsn): + config['SENTRY']['DSN'] = sentry_dsn + return config -def test_end_to_end(container_factory, service_cls, config): + def test_end_to_end( + self, container_factory, service_cls, config, sentry_stub, tracker + ): - container = container_factory(service_cls, config) - container.start() + container = container_factory(service_cls, config) + container.start() - reporter = get_extension(container, SentryReporter) + with entrypoint_waiter(sentry_stub, 'report'): + with entrypoint_hook(container, 'broken') as broken: + with entrypoint_waiter(container, 'broken'): + with pytest.raises(CustomException): + broken() - with patch.object(reporter, 'client') as client: - with entrypoint_hook(container, 'broken') as broken: - with entrypoint_waiter(container, 'broken'): - with pytest.raises(CustomException): - broken() - assert client.captureException.call_count == 1 + assert tracker.called