From 674574afa29ec805b26f844e66a21b539dbe1d25 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 9 Apr 2020 18:54:13 -0400 Subject: [PATCH 1/7] Spawn workers on demand in ProcessPoolExecutor --- Doc/whatsnew/3.9.rst | 5 ++++ Lib/concurrent/futures/process.py | 20 ++++++++++--- Lib/test/test_concurrent_futures.py | 45 ++++++++++++++++++++++++++--- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index e49d4264c65916..fdfa4ac2e93840 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -200,6 +200,11 @@ and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves compatibility with subinterpreters and predictability in their shutdown processes. (Contributed by Kyle Stanley in :issue:`39812`.) +Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on +demand, only when there are no available idle workers to reuse. This optimizes +startup overhead and reduces the amount of lost CPU time to idle workers. +(Contributed by Kyle Stanley in :issue:`39207`.) + curses ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 4c39500d675ff5..5ffc9c0ebf8101 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -209,7 +209,8 @@ def _sendback_result(result_queue, work_id, result=None, exception=None): result_queue.put(_ResultItem(work_id, exception=exc)) -def _process_worker(call_queue, result_queue, initializer, initargs): +def _process_worker(call_queue, result_queue, initializer, initargs, + idle_worker_semaphore): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -221,6 +222,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs): to by the worker. initializer: A callable initializer, or None initargs: A tuple of args for the initializer + idle_worker_semaphore: A multiprocessing.Semaphore that is used to + prevent new workers from being spawned when there are idle workers. """ if initializer is not None: try: @@ -249,6 +252,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs): # open files or shared memory that is not needed anymore del call_item + # increment idle process count after worker finishes job + idle_worker_semaphore.release() class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -601,6 +606,7 @@ def __init__(self, max_workers=None, mp_context=None, # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() + self._idle_worker_semaphore = mp_context.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -633,20 +639,25 @@ def __init__(self, max_workers=None, mp_context=None, def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. - self._adjust_process_count() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): + # if there's an idle process, we don't need to spawn a new one. + if self._idle_worker_semaphore.acquire(block=False): + return + + process_count = len(self._processes) + if process_count < self._max_workers: p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, self._result_queue, self._initializer, - self._initargs)) + self._initargs, + self._idle_worker_semaphore)) p.start() self._processes[p.pid] = p @@ -669,6 +680,7 @@ def submit(self, fn, /, *args, **kwargs): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() + self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 868415ab29916d..7cf4db6fa4878a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -486,10 +486,16 @@ def _prime_executor(self): pass def test_processes_terminate(self): - self.executor.submit(mul, 21, 2) - self.executor.submit(mul, 6, 7) - self.executor.submit(mul, 3, 14) - self.assertEqual(len(self.executor._processes), 5) + def acquire_lock(lock): + lock.acquire() + + mp_context = get_context() + sem = mp_context.Semaphore(0) + for _ in range(3): + self.executor.submit(acquire_lock, sem) + self.assertEqual(len(self.executor._processes), 3) + for _ in range(3): + sem.release() processes = self.executor._processes self.executor.shutdown() @@ -964,6 +970,37 @@ def test_ressources_gced_in_workers(self): mgr.shutdown() mgr.join() + def test_saturation(self): + executor = self.executor_type(4) + def acquire_lock(lock): + lock.aquire() + + mp_context = get_context() + sem = mp_context.Semaphore(0) + job_count = 15 * executor._max_workers + for _ in range(job_count): + executor.submit(acquire_lock, sem) + self.assertEqual(len(executor._processes), executor._max_workers) + for _ in range(job_count): + sem.release() + executor.shutdown() + + def test_idle_process_reuse_one(self): + executor = self.executor_type() + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._processes), 1) + executor.shutdown() + + def test_idle_process_reuse_multiple(self): + executor = self.executor_type() + executor.submit(mul, 12, 7).result() + executor.submit(mul, 33, 25) + executor.submit(mul, 25, 26).result() + executor.submit(mul, 18, 29) + self.assertEqual(len(executor._processes), 2) + executor.shutdown() create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, From 5e5e4f115af7b3e0b9507bb6f3d789a51223492e Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 9 Apr 2020 21:15:50 -0400 Subject: [PATCH 2/7] Set _idle_worker_semaphore to None in executor.shutdown() --- Lib/concurrent/futures/process.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 5ffc9c0ebf8101..0d60b23989de9a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -730,6 +730,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._call_queue = None self._result_queue = None self._processes = None + self._idle_worker_semaphore = None if self._executor_manager_thread_wakeup: self._executor_manager_thread_wakeup = None From 04e592760f2ddefd0ea197273f0a80f332afba19 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Fri, 10 Apr 2020 01:25:00 +0000 Subject: [PATCH 3/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst diff --git a/Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst b/Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst new file mode 100644 index 00000000000000..3fa82771ded237 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst @@ -0,0 +1,4 @@ +Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on +demand, only when there are no available idle workers to reuse. This optimizes +startup overhead and reduces the amount of lost CPU time to idle workers. +Patch by Kyle Stanley. \ No newline at end of file From 28bb66998114a83025860555dd88177217f811a3 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 10 Apr 2020 00:22:57 -0400 Subject: [PATCH 4/7] Use executor weakref to access semaphore --- Lib/concurrent/futures/process.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 0d60b23989de9a..40ffa60811a47b 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -209,8 +209,7 @@ def _sendback_result(result_queue, work_id, result=None, exception=None): result_queue.put(_ResultItem(work_id, exception=exc)) -def _process_worker(call_queue, result_queue, initializer, initargs, - idle_worker_semaphore): +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -222,8 +221,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs, to by the worker. initializer: A callable initializer, or None initargs: A tuple of args for the initializer - idle_worker_semaphore: A multiprocessing.Semaphore that is used to - prevent new workers from being spawned when there are idle workers. """ if initializer is not None: try: @@ -252,8 +249,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs, # open files or shared memory that is not needed anymore del call_item - # increment idle process count after worker finishes job - idle_worker_semaphore.release() class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -323,6 +318,12 @@ def run(self): # while waiting on new results. del result_item + # attempt to increment idle process count + executor = self.executor_reference() + if executor is not None: + executor._idle_worker_semaphore.release() + del executor + if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -656,8 +657,7 @@ def _adjust_process_count(self): args=(self._call_queue, self._result_queue, self._initializer, - self._initargs, - self._idle_worker_semaphore)) + self._initargs)) p.start() self._processes[p.pid] = p @@ -730,7 +730,6 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._call_queue = None self._result_queue = None self._processes = None - self._idle_worker_semaphore = None if self._executor_manager_thread_wakeup: self._executor_manager_thread_wakeup = None From 04e6b2979ee9b519ae8e29c224b60be0a5f623af Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 10 Apr 2020 20:32:30 -0400 Subject: [PATCH 5/7] Fix race condition in unit tests --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7cf4db6fa4878a..815bd20bbd9e8a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -999,7 +999,7 @@ def test_idle_process_reuse_multiple(self): executor.submit(mul, 33, 25) executor.submit(mul, 25, 26).result() executor.submit(mul, 18, 29) - self.assertEqual(len(executor._processes), 2) + self.assertTrue(len(executor._processes) <= 2) executor.shutdown() create_executor_tests(ProcessPoolExecutorTest, From 75690c743dbd5359bb5c42a214f3b7f5a435de17 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 18 Apr 2020 19:50:26 -0400 Subject: [PATCH 6/7] Change mp.Semaphore to threading.Semaphore --- Lib/concurrent/futures/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 40ffa60811a47b..36355ae8756dbc 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -607,7 +607,7 @@ def __init__(self, max_workers=None, mp_context=None, # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() - self._idle_worker_semaphore = mp_context.Semaphore(0) + self._idle_worker_semaphore = threading.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -647,7 +647,7 @@ def _start_executor_manager_thread(self): def _adjust_process_count(self): # if there's an idle process, we don't need to spawn a new one. - if self._idle_worker_semaphore.acquire(block=False): + if self._idle_worker_semaphore.acquire(blocking=False): return process_count = len(self._processes) From d390e51bc2ccad6a80640e50fc62b4203a79d448 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 18 Apr 2020 19:53:37 -0400 Subject: [PATCH 7/7] Several improvements to ProcessPoolExecutor tests. * Use try-finally for executor.shutdown() in test_saturation * Use assertLessEqual * Simplify semaphore acquire Co-authored-by: Antoine Pitrou --- Lib/test/test_concurrent_futures.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 815bd20bbd9e8a..a8c5bb6aa1a3a5 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -972,21 +972,20 @@ def test_ressources_gced_in_workers(self): def test_saturation(self): executor = self.executor_type(4) - def acquire_lock(lock): - lock.aquire() - mp_context = get_context() sem = mp_context.Semaphore(0) job_count = 15 * executor._max_workers - for _ in range(job_count): - executor.submit(acquire_lock, sem) - self.assertEqual(len(executor._processes), executor._max_workers) - for _ in range(job_count): - sem.release() - executor.shutdown() + try: + for _ in range(job_count): + executor.submit(sem.acquire) + self.assertEqual(len(executor._processes), executor._max_workers) + for _ in range(job_count): + sem.release() + finally: + executor.shutdown() def test_idle_process_reuse_one(self): - executor = self.executor_type() + executor = self.executor_type(4) executor.submit(mul, 21, 2).result() executor.submit(mul, 6, 7).result() executor.submit(mul, 3, 14).result() @@ -994,12 +993,12 @@ def test_idle_process_reuse_one(self): executor.shutdown() def test_idle_process_reuse_multiple(self): - executor = self.executor_type() + executor = self.executor_type(4) executor.submit(mul, 12, 7).result() executor.submit(mul, 33, 25) executor.submit(mul, 25, 26).result() executor.submit(mul, 18, 29) - self.assertTrue(len(executor._processes) <= 2) + self.assertLessEqual(len(executor._processes), 2) executor.shutdown() create_executor_tests(ProcessPoolExecutorTest,