Skip to content

Commit

Permalink
[FEAT] Enable buffered iteration on plans (#2566)
Browse files Browse the repository at this point in the history
Helps close part of #2561 

This PR enables buffering of result partition tasks, preventing "runaway
execution" of executions when run concurrently.

The problem previously was that if we ran two executions in parallel
(`e1` and `e2`) on a machine with 8 CPUs:

1. `e1` could potentially run `8` tasks and keep them buffered (not
releasing the resource request)
2. When `e2` attempts to run the next task, it notices that the task
cannot be admitted on the system (due to memory constraints)
* `e2` thinks that it is deadlocking because there is a strong
assumption in the pyrunner today that if a task cannot be admitted, it
merely has to wait for some other tasks in the same execution to finish
up.
* However, `e2` doesn't have any tasks currently pending (because it is
starved). The pending tasks are all buffered in `e1`. Thus it thinks
that it is deadlocking.

## Solution

* This PR sets the default buffering behavior to `1` instead of allowing
each execution to run as many tasks as it wants
* We introduce logic in the physical plan to have an upper limit on the
size of the materialization buffer. If that buffer gets too large, it
will start yielding `None` to indicate that the plan is unable to
proceed.

Note that there is still potentially a problem here, e.g. running >
NUM_CPU number of executions concurrently. That can be solved in a
follow-up PR for refactoring the way we do resource accounting.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Co-authored-by: Desmond Cheong <[email protected]>
  • Loading branch information
3 people authored Jul 30, 2024
1 parent 8fce5b5 commit 4fec71c
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 37 deletions.
19 changes: 14 additions & 5 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def __iter__(self) -> Iterator[Dict[str, Any]]:
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter_tables(self._builder)
partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size=1)

# Iterate through partitions.
for partition in partitions_iter:
Expand All @@ -222,15 +222,24 @@ def __iter__(self) -> Iterator[Dict[str, Any]]:
yield row

@DataframePublicAPI
def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]:
def iter_partitions(
self, results_buffer_size: Optional[int] = 1
) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]:
"""Begin executing this dataframe and return an iterator over the partitions.
Each partition will be returned as a daft.Table object (if using Python runner backend)
or a ray ObjectRef (if using Ray runner backend).
.. WARNING::
This method is experimental and may change in future versions.
Args:
results_buffer_size: how many partitions to allow in the results buffer (defaults to 1).
Setting this value will buffer results up to the provided size and provide backpressure
to dataframe execution based on the rate of consumption from the returned iterator. Setting this to
`None` will result in a buffer of unbounded size, causing the dataframe to run asynchronously
to completion.
"""
if results_buffer_size is not None and not results_buffer_size > 0:
raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}")

if self._result is not None:
# If the dataframe has already finished executing,
# use the precomputed results.
Expand All @@ -240,7 +249,7 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[Micro
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
results_iter = context.runner().run_iter(self._builder)
results_iter = context.runner().run_iter(self._builder, results_buffer_size=results_buffer_size)
for result in results_iter:
yield result.partition()

Expand Down
145 changes: 121 additions & 24 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1513,39 +1513,136 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions
seed += 1


def materialize(
child_plan: InProgressPhysicalPlan[PartitionT],
) -> MaterializedPhysicalPlan:
def _best_effort_next_step(
stage_id: int, child_plan: InProgressPhysicalPlan[PartitionT]
) -> tuple[PartitionTask[PartitionT] | None, bool]:
"""Performs a best-effort attempt at retrieving the next step from a child plan
Returns None in cases where there is nothing to run, or the plan has been exhausted.
Returns:
step: the step (potentially None) to run
is_final_task: a boolean indicating whether or not this step was a final step
"""
try:
step = next(child_plan)
except StopIteration:
return (None, False)
else:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
return (step, True)
elif isinstance(step, PartitionTask):
return (step, False)
else:
return (None, False)


class Materialize:
"""Materialize the child plan.
Repeatedly yields either a PartitionTask (to produce an intermediate partition)
or a PartitionT (which is part of the final result).
"""

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
yield done_task.result()
def __init__(
self,
child_plan: InProgressPhysicalPlan[PartitionT],
results_buffer_size: int | None,
):
self.child_plan = child_plan
self.materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
self.results_buffer_size = results_buffer_size

def __iter__(self) -> MaterializedPhysicalPlan:
num_materialized_yielded = 0
num_intermediate_yielded = 0
num_final_yielded = 0
stage_id = next(stage_id_counter)

logger.debug(
"[plan-%s] Starting to emit tasks from `materialize` with results_buffer_size=%s",
stage_id,
self.results_buffer_size,
)

# Materialize a single dependency.
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
assert isinstance(step, (PartitionTask, type(None)))
while True:
# If any inputs have finished executing, we want to drain the `materializations` buffer
while len(self.materializations) > 0 and self.materializations[0].done():
# Make space on buffer by popping the task that was done
done_task = self.materializations.popleft()

# Best-effort attempt to yield new work and fill up the buffer to the desired `results_buffer_size`
if self.results_buffer_size is not None:
for _ in range(self.results_buffer_size - len(self.materializations)):
best_effort_step, is_final_task = _best_effort_next_step(stage_id, self.child_plan)
if best_effort_step is None:
break
elif is_final_task:
assert isinstance(best_effort_step, SingleOutputPartitionTask)
self.materializations.append(best_effort_step)
num_final_yielded += 1
logger.debug(
"[plan-%s] YIELDING final task to replace done materialized task (%s so far)",
stage_id,
num_final_yielded,
)
else:
num_intermediate_yielded += 1
logger.debug(
"[plan-%s] YIELDING an intermediate task to replace done materialized task (%s so far)",
stage_id,
num_intermediate_yielded,
)
yield best_effort_step

# Yield the task that was done
num_materialized_yielded += 1
logger.debug("[plan-%s] YIELDING a materialized task (%s so far)", stage_id, num_materialized_yielded)
yield done_task.result()

# If the buffer has too many results already, we yield None until some are completed
if self.results_buffer_size is not None and len(self.materializations) >= self.results_buffer_size:
logger.debug(
"[plan-%s] YIELDING none, waiting on tasks in buffer to complete: %s in buffer, but maximum is %s",
stage_id,
len(self.materializations),
self.results_buffer_size,
)
yield None

yield step
# Important: start again at the top and drain materialized results
# Otherwise it may lead to a weird corner-case where the plan has ended (raising StopIteration)
# but some of the completed materializations haven't been drained from the buffer.
continue

except StopIteration:
if len(materializations) > 0:
logger.debug("materialize blocked on completion of all sources: %s", materializations)
yield None
else:
return
# Materialize a single dependency.
try:
step = next(self.child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
self.materializations.append(step)
num_final_yielded += 1
logger.debug("[plan-%s] YIELDING final task (%s so far)", stage_id, num_final_yielded)
elif isinstance(step, PartitionTask):
num_intermediate_yielded += 1
logger.debug(
"[plan-%s] YIELDING an intermediate task (%s so far)", stage_id, num_intermediate_yielded
)

assert isinstance(step, (PartitionTask, type(None)))
yield step

except StopIteration:
if len(self.materializations) > 0:
logger.debug(
"[plan-%s] YIELDING none, iterator completed but materialize is blocked on completion of all sources: %s",
stage_id,
self.materializations,
)
yield None
else:
return


def enumerate_open_executions(
Expand Down
6 changes: 4 additions & 2 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ def pretty_print(self, simple: bool = False) -> str:
def __repr__(self) -> str:
return self._scheduler.repr_ascii(simple=False)

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None
) -> physical_plan.MaterializedPhysicalPlan:
return iter(physical_plan.Materialize(self._scheduler.to_partition_tasks(psets), results_buffer_size))


class AdaptivePhysicalPlanScheduler:
Expand Down
15 changes: 11 additions & 4 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
def run_iter(
self,
builder: LogicalPlanBuilder,
# NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0
results_buffer_size: int | None = None,
) -> Iterator[PyMaterializedResult]:
# NOTE: Freeze and use this same execution config for the entire execution
Expand All @@ -167,7 +166,8 @@ def run_iter(
source_id, plan_scheduler = adaptive_planner.next()
# don't store partition sets in variable to avoid reference
tasks = plan_scheduler.to_partition_tasks(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
results_buffer_size,
)
del plan_scheduler
results_gen = self._physical_plan_to_partitions(tasks)
Expand Down Expand Up @@ -198,7 +198,7 @@ def run_iter(
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
tasks = plan_scheduler.to_partition_tasks(psets, results_buffer_size)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
Expand Down Expand Up @@ -226,12 +226,14 @@ def _physical_plan_to_partitions(
while True:
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: plan waiting on work")
break

elif isinstance(next_step, MaterializedResult):
assert isinstance(next_step, PyMaterializedResult)

# A final result.
logger.debug("Yielding completed step")
yield next_step
next_step = next(plan)
continue
Expand All @@ -240,6 +242,7 @@ def _physical_plan_to_partitions(
next_step.resource_request,
):
# Insufficient resources; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: insufficient resources")
break

else:
Expand Down Expand Up @@ -294,7 +297,11 @@ def _physical_plan_to_partitions(
next_step = next(plan)

# Await at least one task and process the results.
assert len(future_to_task) > 0, "Scheduler deadlocked! This should never happen. Please file an issue."
if not len(future_to_task) > 0:
raise RuntimeError(
f"Scheduler deadlocked! This should never happen. Please file an issue. Current step: {type(next_step)}"
)

done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED)
for done_future in done_set:
done_id = future_to_task.pop(done_future)
Expand Down
14 changes: 12 additions & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None:
self.threads_by_df: dict[str, threading.Thread] = dict()
self.results_by_df: dict[str, Queue] = {}
self.active_by_df: dict[str, bool] = dict()
self.results_buffer_size_by_df: dict[str, int | None] = dict()

self.use_ray_tqdm = use_ray_tqdm

Expand Down Expand Up @@ -467,8 +468,9 @@ def start_plan(
results_buffer_size: int | None = None,
) -> None:
self.execution_configs_objref_by_df[result_uuid] = ray.put(daft_execution_config)
self.results_by_df[result_uuid] = Queue(maxsize=results_buffer_size or -1)
self.results_by_df[result_uuid] = Queue(maxsize=1 if results_buffer_size is not None else -1)
self.active_by_df[result_uuid] = True
self.results_buffer_size_by_df[result_uuid] = results_buffer_size

t = threading.Thread(
target=self._run_plan,
Expand All @@ -495,6 +497,7 @@ def stop_plan(self, result_uuid: str) -> None:
del self.threads_by_df[result_uuid]
del self.active_by_df[result_uuid]
del self.results_by_df[result_uuid]
del self.results_buffer_size_by_df[result_uuid]

def _run_plan(
self,
Expand All @@ -503,7 +506,14 @@ def _run_plan(
result_uuid: str,
) -> None:
# Get executable tasks from plan scheduler.
tasks = plan_scheduler.to_partition_tasks(psets)
results_buffer_size = self.results_buffer_size_by_df[result_uuid]
tasks = plan_scheduler.to_partition_tasks(
psets,
# Attempt to subtract 1 from results_buffer_size because the return Queue size is already 1
# If results_buffer_size=1 though, we can't do much and the total buffer size actually has to be >= 2
# because we have two buffers (the Queue and the buffer inside the `materialize` generator)
None if results_buffer_size is None else max(results_buffer_size - 1, 1),
)

daft_execution_config = self.execution_configs_objref_by_df[result_uuid]
inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict()
Expand Down
Empty file added tests/physical_plan/__init__.py
Empty file.
Loading

0 comments on commit 4fec71c

Please sign in to comment.