From b36b5ecadb3e2ed0af361f875e9be016fb44fa42 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Tue, 30 Jul 2024 16:56:24 -0700 Subject: [PATCH] [FEAT] Fix resource accounting in PyRunner (#2567) Together with #2566 , closes #2561 This PR changes the way the PyRunner performs resource accounting. Instead of updating the number of CPUs, GPUs and memory used only when futures are retrieved, we do this just before each task completes. These variables are protected with a lock to allow for concurrent access from across worker threads. Additionally, this PR now tracks the inflight `Futures` across all executions globally in the PyRunner singleton. This is because there will be instances where a single execution might not be able to make forward progress (e.g. there are only 8 CPUs available, and there are 8 other currently-executing partitions). In this case, we need to wait for **some** execution globally to complete before attempting to make forward progress on the current execution. --------- Co-authored-by: Jay Chia --- daft/runners/pyrunner.py | 107 ++++++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 42 deletions(-) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 4cf0130132..0820af6987 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading from concurrent import futures from dataclasses import dataclass from typing import Iterator @@ -121,8 +122,7 @@ def __init__(self, use_thread_pool: bool | None) -> None: self._thread_pool = futures.ThreadPoolExecutor() # Global accounting of tasks and resources - self._inflight_tasks_resources: dict[str, ResourceRequest] = dict() - self._inflight_tasks: dict[str, PartitionTask] = dict() + self._inflight_futures: dict[str, futures.Future] = {} system_info = SystemInfo() num_cpus = system_info.cpu_count() @@ -134,7 +134,13 @@ def __init__(self, use_thread_pool: bool | None) -> None: self.num_cpus = num_cpus self.num_gpus = cuda_device_count() - self.bytes_memory = system_info.total_memory() + self.total_bytes_memory = system_info.total_memory() + + # Resource accounting: + self._resource_accounting_lock = threading.Lock() + self._available_bytes_memory = self.total_bytes_memory + self._available_cpus = float(self.num_cpus) + self._available_gpus = float(self.num_gpus) def runner_io(self) -> PyRunnerIO: return PyRunnerIO() @@ -213,8 +219,7 @@ def run_iter_tables( def _physical_plan_to_partitions( self, plan: physical_plan.MaterializedPhysicalPlan[MicroPartition] ) -> Iterator[PyMaterializedResult]: - future_to_task: dict[futures.Future, str] = dict() - + local_futures_to_task: dict[futures.Future, PartitionTask] = {} pbar = ProgressBar(use_ray_tqdm=False) try: @@ -238,15 +243,16 @@ def _physical_plan_to_partitions( next_step = next(plan) continue - elif not self._can_admit_task( - next_step.resource_request, - ): - # Insufficient resources; await some tasks. - logger.debug("Skipping to wait on dispatched tasks: insufficient resources") - break - else: # next_task is a task to run. + task_admitted = self._attempt_admit_task( + next_step.resource_request, + ) + + if not task_admitted: + # Insufficient resources; await some tasks. + logger.debug("Skipping to wait on dispatched tasks: insufficient resources") + break # Run the task in the main thread, instead of the thread pool, in certain conditions: # - Threading is disabled in runner config. @@ -269,6 +275,7 @@ def _physical_plan_to_partitions( next_step.instructions, next_step.inputs, next_step.partial_metadatas, + next_step.resource_request, ) next_step.set_result(materialized_results) @@ -284,36 +291,38 @@ def _physical_plan_to_partitions( next_step.instructions, next_step.inputs, next_step.partial_metadatas, + next_step.resource_request, ) - # Register the inflight task and resources used. - future_to_task[future] = next_step.id() + # Register the inflight task assert ( - next_step.id() not in self._inflight_tasks_resources + next_step.id() not in local_futures_to_task ), "Step IDs should be unique - this indicates an internal error, please file an issue!" - self._inflight_tasks[next_step.id()] = next_step - self._inflight_tasks_resources[next_step.id()] = next_step.resource_request + self._inflight_futures[next_step.id()] = future + local_futures_to_task[future] = next_step next_step = next(plan) - # Await at least one task and process the results. - if not len(future_to_task) > 0: + if next_step is None and not len(local_futures_to_task) > 0: raise RuntimeError( f"Scheduler deadlocked! This should never happen. Please file an issue. Current step: {type(next_step)}" ) - done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED) + # Await at least one task in the global futures to finish before proceeding + _ = futures.wait(list(self._inflight_futures.values()), return_when=futures.FIRST_COMPLETED) + + # Now await at a task in the local futures to finish, so as to progress the local execution + done_set, _ = futures.wait(list(local_futures_to_task), return_when=futures.FIRST_COMPLETED) for done_future in done_set: - done_id = future_to_task.pop(done_future) - del self._inflight_tasks_resources[done_id] - done_task = self._inflight_tasks.pop(done_id) + done_task = local_futures_to_task.pop(done_future) materialized_results = done_future.result() pbar.mark_task_done(done_task) + del self._inflight_futures[done_task.id()] logger.debug( "Task completed: %s -> <%s partitions>", - done_id, + done_task.id(), len(materialized_results), ) @@ -333,39 +342,53 @@ def _check_resource_requests(self, resource_request: ResourceRequest) -> None: raise RuntimeError(f"Requested {resource_request.num_cpus} CPUs but found only {self.num_cpus} available") if resource_request.num_gpus is not None and resource_request.num_gpus > self.num_gpus: raise RuntimeError(f"Requested {resource_request.num_gpus} GPUs but found only {self.num_gpus} available") - if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.bytes_memory: + if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.total_bytes_memory: raise RuntimeError( - f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.bytes_memory} available" + f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.total_bytes_memory} available" ) - def _can_admit_task( + def _attempt_admit_task( self, resource_request: ResourceRequest, ) -> bool: self._check_resource_requests(resource_request) - inflight_resources = self._inflight_tasks_resources.values() - total_inflight_resources: ResourceRequest = sum(inflight_resources, ResourceRequest()) - cpus_okay = (total_inflight_resources.num_cpus or 0) + (resource_request.num_cpus or 0) <= self.num_cpus - gpus_okay = (total_inflight_resources.num_gpus or 0) + (resource_request.num_gpus or 0) <= self.num_gpus - memory_okay = (total_inflight_resources.memory_bytes or 0) + ( - resource_request.memory_bytes or 0 - ) <= self.bytes_memory + with self._resource_accounting_lock: + memory_okay = (resource_request.memory_bytes or 0) <= self._available_bytes_memory + cpus_okay = (resource_request.num_cpus or 0) <= self._available_cpus + gpus_okay = (resource_request.num_gpus or 0) <= self._available_gpus + all_okay = all((cpus_okay, gpus_okay, memory_okay)) + + # Update resource accounting if we have the resources (this is considered as the task being "admitted") + if all_okay: + self._available_bytes_memory -= resource_request.memory_bytes or 0 + self._available_cpus -= resource_request.num_cpus or 0.0 + self._available_gpus -= resource_request.num_gpus or 0.0 - return all((cpus_okay, gpus_okay, memory_okay)) + return all_okay - @staticmethod def build_partitions( + self, instruction_stack: list[Instruction], partitions: list[MicroPartition], final_metadata: list[PartialPartitionMetadata], + resource_request: ResourceRequest, ) -> list[MaterializedResult[MicroPartition]]: - for instruction in instruction_stack: - partitions = instruction.run(partitions) - return [ - PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial)) - for part, partial in zip(partitions, final_metadata) - ] + try: + for instruction in instruction_stack: + partitions = instruction.run(partitions) + + results: list[MaterializedResult[MicroPartition]] = [ + PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial)) + for part, partial in zip(partitions, final_metadata) + ] + return results + finally: + # Release CPU, GPU and memory resources + with self._resource_accounting_lock: + self._available_bytes_memory += resource_request.memory_bytes or 0 + self._available_cpus += resource_request.num_cpus or 0.0 + self._available_gpus += resource_request.num_gpus or 0.0 @dataclass