Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul logging background thread transport #3407

Merged
merged 6 commits into from
May 12, 2017

Conversation

theacodes
Copy link
Contributor

No description provided.

@theacodes theacodes added the api: logging Issues related to the Cloud Logging API. label May 12, 2017
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label May 12, 2017

This class reuses a single :class:`Batch` method to write successive
entries.
:type max: int

This comment was marked as spam.

This comment was marked as spam.

from google.cloud.logging.handlers.transports.base import Transport

_WORKER_THREAD_NAME = 'google.cloud.logging.handlers.transport.Worker'
_DEFAULT_GRACE_PERIOD = 5.0

This comment was marked as spam.

This comment was marked as spam.

"""
self._cloud_logger = cloud_logger
self._grace_period = grace_period
self._batch_size = batch_size

This comment was marked as spam.

This comment was marked as spam.

@@ -20,7 +20,11 @@

DEFAULT_LOGGER_NAME = 'python'

EXCLUDED_LOGGER_DEFAULTS = ('google.cloud', 'oauth2client')
EXCLUDED_LOGGER_DEFAULTS = (

This comment was marked as spam.

This comment was marked as spam.

_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
_DEFAULT_MAX_BATCH_SIZE = 10
_WORKER_THREAD_NAME = 'google.cloud.logging.Worker'
_WORKER_TERMINATOR = object()

This comment was marked as spam.



class _Worker(object):
"""A threaded worker that writes batches of log entries
def _get_many(q, max_items=None):

This comment was marked as spam.

This comment was marked as spam.

def _get_many(q, max_items=None):
"""Get multiple items from a Queue.

Gets at least one (blocking) and at most :param:`max_items` items

This comment was marked as spam.

This comment was marked as spam.


Writes entries to the logger API.
:type q: ~queue.Queue

This comment was marked as spam.

This comment was marked as spam.


transport, worker = self._make_one(client, name)

logger = worker.call_args[0][0]

This comment was marked as spam.

This comment was marked as spam.



class TestWorker(unittest.TestCase):
NAME = 'python_logger'

This comment was marked as spam.

This comment was marked as spam.

def _make_one_with_mock_thread(self, *args, **kw):
with mock.patch('threading.Thread', new=_Thread):
with mock.patch('atexit.register') as atexit_mock:
self.atexit_mock = atexit_mock

This comment was marked as spam.

This comment was marked as spam.

worker = self._make_one(logger)
self.assertTrue(worker.is_alive)
self.assertIsNotNone(worker._thread)
self.assertTrue(worker._thread._daemon)

This comment was marked as spam.

This comment was marked as spam.


self.assertFalse(worker.is_alive)

def _enqueue_record(self, worker, message):

This comment was marked as spam.

This comment was marked as spam.

Copy link
Contributor Author

@theacodes theacodes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhermes ready for another round.

@@ -20,7 +20,11 @@

DEFAULT_LOGGER_NAME = 'python'

EXCLUDED_LOGGER_DEFAULTS = ('google.cloud', 'oauth2client')
EXCLUDED_LOGGER_DEFAULTS = (

This comment was marked as spam.

@@ -19,137 +19,224 @@

import atexit

This comment was marked as spam.



class _Worker(object):
"""A threaded worker that writes batches of log entries
def _get_many(q, max_items=None):

This comment was marked as spam.

def _get_many(q, max_items=None):
"""Get multiple items from a Queue.

Gets at least one (blocking) and at most :param:`max_items` items

This comment was marked as spam.


Writes entries to the logger API.
:type q: ~queue.Queue

This comment was marked as spam.


transport, worker = self._make_one(client, name)

logger = worker.call_args[0][0]

This comment was marked as spam.



class TestWorker(unittest.TestCase):
NAME = 'python_logger'

This comment was marked as spam.

def _make_one_with_mock_thread(self, *args, **kw):
with mock.patch('threading.Thread', new=_Thread):
with mock.patch('atexit.register') as atexit_mock:
self.atexit_mock = atexit_mock

This comment was marked as spam.

worker = self._make_one(logger)
self.assertTrue(worker.is_alive)
self.assertIsNotNone(worker._thread)
self.assertTrue(worker._thread._daemon)

This comment was marked as spam.


self.assertFalse(worker.is_alive)

def _enqueue_record(self, worker, message):

This comment was marked as spam.

@waprin
Copy link
Contributor

waprin commented May 12, 2017

So the excluded loggers change, particularly oath2->google.auth looks very important (maybe the simple explanation for the deadlock). In fact now I'm highly highly highly suspicious that's the root cause of these deadlocks popping up so that should get merged asap.

Just as a tangent, I think the most useful change to make would be to automatically detect recursive logging traces so that type of breakage is impossible. I think Jon gave me some ideas on how that might be possible with logging filters but I haven't dug deep on it yet.

As for switching from the conditions to the queue, I am 99% indifferent but 1% 👎 , I thought about doing it that way but removing all the items from the Queue at once seemed like you might as well just use the Batch entries list, instead of having to basically copy the whole list over every time you commit which seems awkward to me. But obviously you found the existing code ugly and it's mostly a stylistic judgement call so it's up to you and other maintainers, like I said largely indifferent one way or the other since it's more or less accomplishing the same thing.

@theacodes
Copy link
Contributor Author

As for switching from the conditions to the queue, I am 99% indifferent but 1% 👎 ... But obviously you found the existing code ugly and it's mostly a stylistic judgement call so it's up to you and other maintainers, like I said largely indifferent one way or the other since it's more or less accomplishing the same thing.

My reasoning for doing this wasn't that the code was ugly. I switched to the queue pattern because it's widely deployment, widely tested, well-understood, and easy to reason about. It gets us closer to shared-nothing which is ideal when working with concurrency. I did not like the idea of using locks/conditions/mutexes to patch around a non-thread-safe data structure.

I thought about doing it that way but removing all the items from the Queue at once seemed like you might as well just use the Batch entries list, instead of having to basically copy the whole list over every time you commit which seems awkward to me

We're not draining the entire queue each time, just a subset. And I'm less worried about copying the list because Python essentially just passes around lists of pointers in this case.

max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""
The background thread is started automatically.
:type cloud_logger: ~google.cloud.logging.logger.Logger

This comment was marked as spam.

This comment was marked as spam.

self._cloud_logger = cloud_logger
self._grace_period = grace_period
self._max_batch_size = max_batch_size
self._queue = queue.Queue(-1)
self._queue = queue.Queue(0)

This comment was marked as spam.

This comment was marked as spam.


self._safely_commit_batch(batch)

for _ in range(len(items)):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""
:type client: ~google.cloud.logging.client.Client
:param client: The Logging client.
:type client: ~google.cloud.logging.client.Client

This comment was marked as spam.

This comment was marked as spam.

background thread.
def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""

This comment was marked as spam.

This comment was marked as spam.

with mock.patch('threading.Thread', new=_Thread):
with mock.patch('atexit.register') as atexit_mock:
self.atexit_mock = atexit_mock
return self._make_one(*args, **kw)
return worker.start()

This comment was marked as spam.

This comment was marked as spam.

@dhermes
Copy link
Contributor

dhermes commented May 12, 2017

LGTM

@theacodes theacodes merged commit a623929 into googleapis:master May 12, 2017
@theacodes theacodes deleted the new-background-thread branch May 12, 2017 20:30
@waprin waprin mentioned this pull request Jan 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: logging Issues related to the Cloud Logging API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants