Skip to content

Commit

Permalink
Make sure handler.flush() doesn't deadlock.
Browse files Browse the repository at this point in the history
Currently it deadlocks during process termination, when
atexit first calls handler.close() and then logging.shutdown(),
that in turn calls handler.flush() without arguments.

handler.close() kills the worker, and then handler.flush() forever waits for the dead worker to send the messages from the queue.

After this change, the deadlock is still possible if something concurrently closes the handler from another thread during the flush. However, this scenario is much less likely.
  • Loading branch information
gukoff committed Mar 15, 2022
1 parent 56386a1 commit 0bdd025
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ def log_record_to_envelope(self, record):
raise NotImplementedError # pragma: NO COVER

def flush(self, timeout=None):
if self._queue.is_empty():
return

# We must check the worker thread is alive, because otherwise flush
# is useless. Also, it would deadlock if not timeout is given, and the
# queue isn't empty.
# This is a very possible scenario during process termination, when
# atexit first calls handler.close() and then logging.shutdown(),
# that in turn calls handler.flush() without arguments.
if not self._worker.is_alive():
logger.warning("Can't flush %s, worker thread is dead. ",
"Pending messages will be lost.", self)
return

self._queue.flush(timeout=timeout)


Expand Down
5 changes: 4 additions & 1 deletion opencensus/common/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ def _gets(self, count, timeout):
def gets(self, count, timeout):
return tuple(self._gets(count, timeout))

def is_empty(self):
return not self._queue.qsize()

def flush(self, timeout=None):
if self._queue.qsize() == 0:
return 0
Expand All @@ -124,7 +127,7 @@ def flush(self, timeout=None):
return
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if event.wait(timeout):
if event.wait(wait_time):
return time.time() - start_time # time taken to flush

def put(self, item, block=True, timeout=None):
Expand Down

0 comments on commit 0bdd025

Please sign in to comment.