Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure handler.flush() doesn't deadlock. #1112

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ def log_record_to_envelope(self, record):
raise NotImplementedError # pragma: NO COVER

def flush(self, timeout=None):
if self._queue.is_empty():
return
aabmass marked this conversation as resolved.
Show resolved Hide resolved

# We must check the worker thread is alive, because otherwise flush
# is useless. Also, it would deadlock if no timeout is given, and the
# queue isn't empty.
# This is a very possible scenario during process termination, when
# atexit first calls handler.close() and then logging.shutdown(),
# that in turn calls handler.flush() without arguments.
if not self._worker.is_alive():
logger.warning("Can't flush %s, worker thread is dead. "
"Any pending messages will be lost.", self)
return

self._queue.flush(timeout=timeout)


Expand Down
5 changes: 4 additions & 1 deletion opencensus/common/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ def _gets(self, count, timeout):
def gets(self, count, timeout):
return tuple(self._gets(count, timeout))

def is_empty(self):
return not self._queue.qsize()

def flush(self, timeout=None):
if self._queue.qsize() == 0:
return 0
Expand All @@ -124,7 +127,7 @@ def flush(self, timeout=None):
return
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if event.wait(timeout):
if event.wait(wait_time):
return time.time() - start_time # time taken to flush

def put(self, item, block=True, timeout=None):
Expand Down