diff --git a/libcst/codemod/_cli.py b/libcst/codemod/_cli.py index a9c9e81ab..39a3ff27e 100644 --- a/libcst/codemod/_cli.py +++ b/libcst/codemod/_cli.py @@ -19,7 +19,7 @@ from multiprocessing import Process, Queue, cpu_count from pathlib import Path from queue import Empty -from typing import AnyStr, List, Optional, Sequence, Union, cast +from typing import AnyStr, Dict, List, Optional, Sequence, Set, Union, cast from libcst import PartialParserConfig, parse_module from libcst.codemod._codemod import Codemod @@ -682,30 +682,31 @@ def parallel_exec_transform_with_prettyprint( # noqa: C901 pending_processes: List[Process] = [] # Start processes + filename_to_process: Dict[str, Process] = {} for f in files: - pending_processes.append( - Process( - target=_parallel_exec_process_stub, - args=( - queue, - transform, - f, - repo_root, - unified_diff, - include_generated, - generated_code_marker, - format_code, - formatter_args, - blacklist_patterns, - python_version, - ), - ) + process = Process( + target=_parallel_exec_process_stub, + args=( + queue, + transform, + f, + repo_root, + unified_diff, + include_generated, + generated_code_marker, + format_code, + formatter_args, + blacklist_patterns, + python_version, + ), ) + pending_processes.append(process) + filename_to_process[f] = process # Start the processes, allowing no more than num_processes to be running # at once. results_left = len(pending_processes) - joinable_processes: List[Process] = [] + joinable_processes: Set[Process] = set() processes_started = 0 interrupted = False @@ -714,7 +715,7 @@ def parallel_exec_transform_with_prettyprint( # noqa: C901 try: # Move this process to the joinables process = pending_processes.pop(0) - joinable_processes.append(process) + joinable_processes.add(process) # Start it, bookkeep that we did process.start() @@ -756,6 +757,14 @@ def parallel_exec_transform_with_prettyprint( # noqa: C901 warnings += len(result.transform_result.warning_messages) + # Join the process to free any related resources. + # Remove all references to the process to allow the GC to + # clean up any file handles. + process = filename_to_process.pop(result.filename, None) + if process: + process.join() + joinable_processes.discard(process) + # Now, join on all of them so we don't leave zombies or hang for p in joinable_processes: p.join()