From 4fec71c66ad9b0c73b10d5457f44cfb45ea1ee82 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Mon, 29 Jul 2024 20:27:00 -0700 Subject: [PATCH] [FEAT] Enable buffered iteration on plans (#2566) Helps close part of #2561 This PR enables buffering of result partition tasks, preventing "runaway execution" of executions when run concurrently. The problem previously was that if we ran two executions in parallel (`e1` and `e2`) on a machine with 8 CPUs: 1. `e1` could potentially run `8` tasks and keep them buffered (not releasing the resource request) 2. When `e2` attempts to run the next task, it notices that the task cannot be admitted on the system (due to memory constraints) * `e2` thinks that it is deadlocking because there is a strong assumption in the pyrunner today that if a task cannot be admitted, it merely has to wait for some other tasks in the same execution to finish up. * However, `e2` doesn't have any tasks currently pending (because it is starved). The pending tasks are all buffered in `e1`. Thus it thinks that it is deadlocking. ## Solution * This PR sets the default buffering behavior to `1` instead of allowing each execution to run as many tasks as it wants * We introduce logic in the physical plan to have an upper limit on the size of the materialization buffer. If that buffer gets too large, it will start yielding `None` to indicate that the plan is unable to proceed. Note that there is still potentially a problem here, e.g. running > NUM_CPU number of executions concurrently. That can be solved in a follow-up PR for refactoring the way we do resource accounting. --------- Co-authored-by: Jay Chia Co-authored-by: Desmond Cheong --- daft/dataframe/dataframe.py | 19 ++- daft/execution/physical_plan.py | 145 ++++++++++++++--- .../plan_scheduler/physical_plan_scheduler.py | 6 +- daft/runners/pyrunner.py | 15 +- daft/runners/ray_runner.py | 14 +- tests/physical_plan/__init__.py | 0 .../test_physical_plan_buffering.py | 147 ++++++++++++++++++ 7 files changed, 309 insertions(+), 37 deletions(-) create mode 100644 tests/physical_plan/__init__.py create mode 100644 tests/physical_plan/test_physical_plan_buffering.py diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 0e651d4b03..c5d9a6ad01 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -210,7 +210,7 @@ def __iter__(self) -> Iterator[Dict[str, Any]]: else: # Execute the dataframe in a streaming fashion. context = get_context() - partitions_iter = context.runner().run_iter_tables(self._builder) + partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size=1) # Iterate through partitions. for partition in partitions_iter: @@ -222,15 +222,24 @@ def __iter__(self) -> Iterator[Dict[str, Any]]: yield row @DataframePublicAPI - def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]: + def iter_partitions( + self, results_buffer_size: Optional[int] = 1 + ) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]: """Begin executing this dataframe and return an iterator over the partitions. Each partition will be returned as a daft.Table object (if using Python runner backend) or a ray ObjectRef (if using Ray runner backend). - .. WARNING:: - This method is experimental and may change in future versions. + Args: + results_buffer_size: how many partitions to allow in the results buffer (defaults to 1). + Setting this value will buffer results up to the provided size and provide backpressure + to dataframe execution based on the rate of consumption from the returned iterator. Setting this to + `None` will result in a buffer of unbounded size, causing the dataframe to run asynchronously + to completion. """ + if results_buffer_size is not None and not results_buffer_size > 0: + raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}") + if self._result is not None: # If the dataframe has already finished executing, # use the precomputed results. @@ -240,7 +249,7 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[Micro else: # Execute the dataframe in a streaming fashion. context = get_context() - results_iter = context.runner().run_iter(self._builder) + results_iter = context.runner().run_iter(self._builder, results_buffer_size=results_buffer_size) for result in results_iter: yield result.partition() diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index c0050a2ece..99affa1098 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -1513,39 +1513,136 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions seed += 1 -def materialize( - child_plan: InProgressPhysicalPlan[PartitionT], -) -> MaterializedPhysicalPlan: +def _best_effort_next_step( + stage_id: int, child_plan: InProgressPhysicalPlan[PartitionT] +) -> tuple[PartitionTask[PartitionT] | None, bool]: + """Performs a best-effort attempt at retrieving the next step from a child plan + + Returns None in cases where there is nothing to run, or the plan has been exhausted. + + Returns: + step: the step (potentially None) to run + is_final_task: a boolean indicating whether or not this step was a final step + """ + try: + step = next(child_plan) + except StopIteration: + return (None, False) + else: + if isinstance(step, PartitionTaskBuilder): + step = step.finalize_partition_task_single_output(stage_id=stage_id) + return (step, True) + elif isinstance(step, PartitionTask): + return (step, False) + else: + return (None, False) + + +class Materialize: """Materialize the child plan. Repeatedly yields either a PartitionTask (to produce an intermediate partition) or a PartitionT (which is part of the final result). """ - materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - stage_id = next(stage_id_counter) - while True: - # Check if any inputs finished executing. - while len(materializations) > 0 and materializations[0].done(): - done_task = materializations.popleft() - yield done_task.result() + def __init__( + self, + child_plan: InProgressPhysicalPlan[PartitionT], + results_buffer_size: int | None, + ): + self.child_plan = child_plan + self.materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + self.results_buffer_size = results_buffer_size + + def __iter__(self) -> MaterializedPhysicalPlan: + num_materialized_yielded = 0 + num_intermediate_yielded = 0 + num_final_yielded = 0 + stage_id = next(stage_id_counter) + + logger.debug( + "[plan-%s] Starting to emit tasks from `materialize` with results_buffer_size=%s", + stage_id, + self.results_buffer_size, + ) - # Materialize a single dependency. - try: - step = next(child_plan) - if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output(stage_id=stage_id) - materializations.append(step) - assert isinstance(step, (PartitionTask, type(None))) + while True: + # If any inputs have finished executing, we want to drain the `materializations` buffer + while len(self.materializations) > 0 and self.materializations[0].done(): + # Make space on buffer by popping the task that was done + done_task = self.materializations.popleft() + + # Best-effort attempt to yield new work and fill up the buffer to the desired `results_buffer_size` + if self.results_buffer_size is not None: + for _ in range(self.results_buffer_size - len(self.materializations)): + best_effort_step, is_final_task = _best_effort_next_step(stage_id, self.child_plan) + if best_effort_step is None: + break + elif is_final_task: + assert isinstance(best_effort_step, SingleOutputPartitionTask) + self.materializations.append(best_effort_step) + num_final_yielded += 1 + logger.debug( + "[plan-%s] YIELDING final task to replace done materialized task (%s so far)", + stage_id, + num_final_yielded, + ) + else: + num_intermediate_yielded += 1 + logger.debug( + "[plan-%s] YIELDING an intermediate task to replace done materialized task (%s so far)", + stage_id, + num_intermediate_yielded, + ) + yield best_effort_step + + # Yield the task that was done + num_materialized_yielded += 1 + logger.debug("[plan-%s] YIELDING a materialized task (%s so far)", stage_id, num_materialized_yielded) + yield done_task.result() + + # If the buffer has too many results already, we yield None until some are completed + if self.results_buffer_size is not None and len(self.materializations) >= self.results_buffer_size: + logger.debug( + "[plan-%s] YIELDING none, waiting on tasks in buffer to complete: %s in buffer, but maximum is %s", + stage_id, + len(self.materializations), + self.results_buffer_size, + ) + yield None - yield step + # Important: start again at the top and drain materialized results + # Otherwise it may lead to a weird corner-case where the plan has ended (raising StopIteration) + # but some of the completed materializations haven't been drained from the buffer. + continue - except StopIteration: - if len(materializations) > 0: - logger.debug("materialize blocked on completion of all sources: %s", materializations) - yield None - else: - return + # Materialize a single dependency. + try: + step = next(self.child_plan) + if isinstance(step, PartitionTaskBuilder): + step = step.finalize_partition_task_single_output(stage_id=stage_id) + self.materializations.append(step) + num_final_yielded += 1 + logger.debug("[plan-%s] YIELDING final task (%s so far)", stage_id, num_final_yielded) + elif isinstance(step, PartitionTask): + num_intermediate_yielded += 1 + logger.debug( + "[plan-%s] YIELDING an intermediate task (%s so far)", stage_id, num_intermediate_yielded + ) + + assert isinstance(step, (PartitionTask, type(None))) + yield step + + except StopIteration: + if len(self.materializations) > 0: + logger.debug( + "[plan-%s] YIELDING none, iterator completed but materialize is blocked on completion of all sources: %s", + stage_id, + self.materializations, + ) + yield None + else: + return def enumerate_open_executions( diff --git a/daft/plan_scheduler/physical_plan_scheduler.py b/daft/plan_scheduler/physical_plan_scheduler.py index 8c5d0c7dcf..743848063f 100644 --- a/daft/plan_scheduler/physical_plan_scheduler.py +++ b/daft/plan_scheduler/physical_plan_scheduler.py @@ -45,8 +45,10 @@ def pretty_print(self, simple: bool = False) -> str: def __repr__(self) -> str: return self._scheduler.repr_ascii(simple=False) - def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: - return physical_plan.materialize(self._scheduler.to_partition_tasks(psets)) + def to_partition_tasks( + self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None + ) -> physical_plan.MaterializedPhysicalPlan: + return iter(physical_plan.Materialize(self._scheduler.to_partition_tasks(psets), results_buffer_size)) class AdaptivePhysicalPlanScheduler: diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index df6f277c42..4cf0130132 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -152,7 +152,6 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: def run_iter( self, builder: LogicalPlanBuilder, - # NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0 results_buffer_size: int | None = None, ) -> Iterator[PyMaterializedResult]: # NOTE: Freeze and use this same execution config for the entire execution @@ -167,7 +166,8 @@ def run_iter( source_id, plan_scheduler = adaptive_planner.next() # don't store partition sets in variable to avoid reference tasks = plan_scheduler.to_partition_tasks( - {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()} + {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}, + results_buffer_size, ) del plan_scheduler results_gen = self._physical_plan_to_partitions(tasks) @@ -198,7 +198,7 @@ def run_iter( plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config) psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()} # Get executable tasks from planner. - tasks = plan_scheduler.to_partition_tasks(psets) + tasks = plan_scheduler.to_partition_tasks(psets, results_buffer_size) del psets with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): results_gen = self._physical_plan_to_partitions(tasks) @@ -226,12 +226,14 @@ def _physical_plan_to_partitions( while True: if next_step is None: # Blocked on already dispatched tasks; await some tasks. + logger.debug("Skipping to wait on dispatched tasks: plan waiting on work") break elif isinstance(next_step, MaterializedResult): assert isinstance(next_step, PyMaterializedResult) # A final result. + logger.debug("Yielding completed step") yield next_step next_step = next(plan) continue @@ -240,6 +242,7 @@ def _physical_plan_to_partitions( next_step.resource_request, ): # Insufficient resources; await some tasks. + logger.debug("Skipping to wait on dispatched tasks: insufficient resources") break else: @@ -294,7 +297,11 @@ def _physical_plan_to_partitions( next_step = next(plan) # Await at least one task and process the results. - assert len(future_to_task) > 0, "Scheduler deadlocked! This should never happen. Please file an issue." + if not len(future_to_task) > 0: + raise RuntimeError( + f"Scheduler deadlocked! This should never happen. Please file an issue. Current step: {type(next_step)}" + ) + done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED) for done_future in done_set: done_id = future_to_task.pop(done_future) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 0c48f9da74..935359bed5 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -440,6 +440,7 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None: self.threads_by_df: dict[str, threading.Thread] = dict() self.results_by_df: dict[str, Queue] = {} self.active_by_df: dict[str, bool] = dict() + self.results_buffer_size_by_df: dict[str, int | None] = dict() self.use_ray_tqdm = use_ray_tqdm @@ -467,8 +468,9 @@ def start_plan( results_buffer_size: int | None = None, ) -> None: self.execution_configs_objref_by_df[result_uuid] = ray.put(daft_execution_config) - self.results_by_df[result_uuid] = Queue(maxsize=results_buffer_size or -1) + self.results_by_df[result_uuid] = Queue(maxsize=1 if results_buffer_size is not None else -1) self.active_by_df[result_uuid] = True + self.results_buffer_size_by_df[result_uuid] = results_buffer_size t = threading.Thread( target=self._run_plan, @@ -495,6 +497,7 @@ def stop_plan(self, result_uuid: str) -> None: del self.threads_by_df[result_uuid] del self.active_by_df[result_uuid] del self.results_by_df[result_uuid] + del self.results_buffer_size_by_df[result_uuid] def _run_plan( self, @@ -503,7 +506,14 @@ def _run_plan( result_uuid: str, ) -> None: # Get executable tasks from plan scheduler. - tasks = plan_scheduler.to_partition_tasks(psets) + results_buffer_size = self.results_buffer_size_by_df[result_uuid] + tasks = plan_scheduler.to_partition_tasks( + psets, + # Attempt to subtract 1 from results_buffer_size because the return Queue size is already 1 + # If results_buffer_size=1 though, we can't do much and the total buffer size actually has to be >= 2 + # because we have two buffers (the Queue and the buffer inside the `materialize` generator) + None if results_buffer_size is None else max(results_buffer_size - 1, 1), + ) daft_execution_config = self.execution_configs_objref_by_df[result_uuid] inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict() diff --git a/tests/physical_plan/__init__.py b/tests/physical_plan/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/physical_plan/test_physical_plan_buffering.py b/tests/physical_plan/test_physical_plan_buffering.py new file mode 100644 index 0000000000..ca0e71817c --- /dev/null +++ b/tests/physical_plan/test_physical_plan_buffering.py @@ -0,0 +1,147 @@ +from unittest.mock import MagicMock + +import pytest + +from daft.execution.physical_plan import Materialize, PartitionTask +from daft.execution.rust_physical_plan_shim import scan_with_tasks + + +def test_single_non_buffered_plan(): + result1, result2, result3 = MagicMock(), MagicMock(), MagicMock() + scan = scan_with_tasks([MagicMock() for i in range(3)]) + materialized = Materialize(scan, results_buffer_size=None) + plan = iter(materialized) + + # Buffer is unbounded and grows as we iterate on the plan (when no tasks are completed) + assert len(materialized.materializations) == 0 + task1 = next(plan) + assert isinstance(task1, PartitionTask) + assert len(materialized.materializations) == 1 + task2 = next(plan) + assert isinstance(task2, PartitionTask) + assert len(materialized.materializations) == 2 + task3 = next(plan) + assert isinstance(task3, PartitionTask) + assert len(materialized.materializations) == 3 + + # Ran out of work, waiting on new tasks to be completed + assert next(plan) is None + + # Manually "complete" the tasks + task1.set_result([result1]) + task2.set_result([result2]) + task3.set_result([result3]) + + # Results should be as we expect + assert next(plan) == result1 + assert next(plan) == result2 + assert next(plan) == result3 + with pytest.raises(StopIteration): + next(plan) + + +def test_single_non_buffered_plan_done_while_planning(): + result1, result2, result3 = MagicMock(), MagicMock(), MagicMock() + scan = scan_with_tasks([MagicMock() for i in range(3)]) + materialized = Materialize(scan, results_buffer_size=None) + plan = iter(materialized) + + # Buffer is unbounded and grows as we iterate on the plan (when no tasks are completed) + assert len(materialized.materializations) == 0 + task1 = next(plan) + assert isinstance(task1, PartitionTask) + assert len(materialized.materializations) == 1 + task2 = next(plan) + assert isinstance(task2, PartitionTask) + assert len(materialized.materializations) == 2 + + # Manually "complete" the tasks + task1.set_result([result1]) + task2.set_result([result2]) + + # On the next iteration, we should receive the result + assert next(plan) == result1 + assert next(plan) == result2 + + # Add more work to completion + task3 = next(plan) + assert isinstance(task3, PartitionTask) + assert len(materialized.materializations) == 1 # only task3 on the buffer + + # Manually "complete" the last task + task3.set_result([result3]) + + # Results should be as we expect + assert next(plan) == result3 + with pytest.raises(StopIteration): + next(plan) + + +def test_single_plan_with_buffer_slow_tasks(): + result1, result2, result3 = MagicMock(), MagicMock(), MagicMock() + scan = scan_with_tasks([MagicMock() for i in range(3)]) + materialized = Materialize(scan, results_buffer_size=2) + plan = iter(materialized) + + # Buffer and grows to size 2 (tasks are "slow" and don't complete faster than the next plan loop call) + assert len(materialized.materializations) == 0 + task1 = next(plan) + assert isinstance(task1, PartitionTask) + assert len(materialized.materializations) == 1 + task2 = next(plan) + assert isinstance(task2, PartitionTask) + assert len(materialized.materializations) == 2 + + # Plan cannot make forward progress until task1 finishes + assert next(plan) is None + task2.set_result([result2]) + assert next(plan) is None + task1.set_result([result1]) + + # Plan should fill its buffer with new tasks before starting to yield results again + task3 = next(plan) + assert isinstance(task3, PartitionTask) + assert next(plan) == result1 + assert next(plan) == result2 + + # Finish the last task + task3.set_result([result3]) + assert next(plan) == result3 + + with pytest.raises(StopIteration): + next(plan) + + +def test_single_plan_with_buffer_saturation_fast_tasks(): + result1, result2, result3 = MagicMock(), MagicMock(), MagicMock() + scan = scan_with_tasks([MagicMock() for i in range(3)]) + materialized = Materialize(scan, results_buffer_size=2) + plan = iter(materialized) + + # Buffer grows to size 1 + assert len(materialized.materializations) == 0 + task1 = next(plan) + assert isinstance(task1, PartitionTask) + assert len(materialized.materializations) == 1 + + # Finish up on task 1 (task is "fast" and completes so quickly even before the next plan loop call) + task1.set_result([result1]) + + # Plan should fill its buffer completely with new tasks before starting to yield results again + task2 = next(plan) + assert isinstance(task2, PartitionTask) + assert len(materialized.materializations) == 1 # task1 has been popped + task3 = next(plan) + assert isinstance(task3, PartitionTask) + assert len(materialized.materializations) == 2 # task1 has been popped + assert next(plan) == result1 + assert next(plan) is None + + # Finish the last task(s) + task2.set_result([result2]) + task3.set_result([result3]) + assert next(plan) == result2 + assert next(plan) == result3 + + with pytest.raises(StopIteration): + next(plan)