Skip to content

Commit

Permalink
Fixed #253: added a timeout for threads to join
Browse files Browse the repository at this point in the history
  • Loading branch information
rgaudin committed May 17, 2022
1 parent 31c9ce3 commit 30221b0
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions src/sotoki/utils/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# vim: ai ts=4 sts=4 et sw=4 nu

import datetime
import queue
import threading
from typing import Callable
Expand All @@ -12,6 +13,7 @@
# Lock that ensures that new workers are not created while the interpreter is
# shutting down. Must be held while mutating _threads_queues and _shutdown.
_global_shutdown_lock = threading.Lock()
thread_deadline_sec = 60


def excepthook(args):
Expand Down Expand Up @@ -129,11 +131,19 @@ def join(self):
"""Await completion of workers, requesting them to stop taking new task"""
logger.debug(f"joining all threads for {self.prefix}")
self.no_more = True
for t in self._workers:
for num, t in enumerate(self._workers):
deadline = datetime.datetime.now() + datetime.timedelta(
seconds=thread_deadline_sec
)
logger.debug(f"Giving {self.prefix}{num} {thread_deadline_sec}s to join")
e = threading.Event()
while t.is_alive():
while t.is_alive() and datetime.datetime.now() < deadline:
t.join(1)
e.wait(timeout=2)
if t.is_alive():
logger.debug(f"Thread {self.prefix}{num} is not joining. Skipping…")
else:
logger.debug(f"Thread {self.prefix}{num} joined")
logger.debug(f"all threads joined for {self.prefix}")

def release_halt(self):
Expand Down

0 comments on commit 30221b0

Please sign in to comment.