Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Enable buffered iteration on plans #2566

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def __iter__(self) -> Iterator[Dict[str, Any]]:
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter_tables(self._builder)
partitions_iter = context.runner().run_iter_tables(self._builder, results_buffer_size=1)

# Iterate through partitions.
for partition in partitions_iter:
Expand All @@ -222,15 +222,24 @@ def __iter__(self) -> Iterator[Dict[str, Any]]:
yield row

@DataframePublicAPI
def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]:
def iter_partitions(
self, results_buffer_size: Optional[int] = 1
) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]:
"""Begin executing this dataframe and return an iterator over the partitions.

Each partition will be returned as a daft.Table object (if using Python runner backend)
or a ray ObjectRef (if using Ray runner backend).

.. WARNING::
This method is experimental and may change in future versions.
Args:
results_buffer_size: how many partitions to allow in the results buffer (defaults to 1).
Setting this value will buffer results up to the provided size and provide backpressure
to dataframe execution based on the rate of consumption from the returned iterator. Setting this to
`None` will result in a buffer of unbounded size, causing the dataframe to run asynchronously
to completion.
"""
if results_buffer_size is not None and not results_buffer_size > 0:
raise ValueError(f"Provided `results_buffer_size` value must be > 0, received: {results_buffer_size}")

if self._result is not None:
# If the dataframe has already finished executing,
# use the precomputed results.
Expand All @@ -240,7 +249,7 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[Micro
else:
# Execute the dataframe in a streaming fashion.
context = get_context()
results_iter = context.runner().run_iter(self._builder)
results_iter = context.runner().run_iter(self._builder, results_buffer_size=results_buffer_size)
for result in results_iter:
yield result.partition()

Expand Down
145 changes: 121 additions & 24 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1513,39 +1513,136 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions
seed += 1


def materialize(
child_plan: InProgressPhysicalPlan[PartitionT],
) -> MaterializedPhysicalPlan:
def _best_effort_next_step(
stage_id: int, child_plan: InProgressPhysicalPlan[PartitionT]
) -> tuple[PartitionTask[PartitionT] | None, bool]:
"""Performs a best-effort attempt at retrieving the next step from a child plan

Returns None in cases where there is nothing to run, or the plan has been exhausted.

Returns:
step: the step (potentially None) to run
is_final_task: a boolean indicating whether or not this step was a final step
"""
try:
step = next(child_plan)
except StopIteration:
return (None, False)
else:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
return (step, True)
elif isinstance(step, PartitionTask):
return (step, False)
else:
return (None, False)


class Materialize:
"""Materialize the child plan.

Repeatedly yields either a PartitionTask (to produce an intermediate partition)
or a PartitionT (which is part of the final result).
"""

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
yield done_task.result()
def __init__(
self,
child_plan: InProgressPhysicalPlan[PartitionT],
results_buffer_size: int | None,
):
self.child_plan = child_plan
self.materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
self.results_buffer_size = results_buffer_size

def __iter__(self) -> MaterializedPhysicalPlan:
num_materialized_yielded = 0
num_intermediate_yielded = 0
num_final_yielded = 0
stage_id = next(stage_id_counter)

logger.debug(
"[plan-%s] Starting to emit tasks from `materialize` with results_buffer_size=%s",
stage_id,
self.results_buffer_size,
)

# Materialize a single dependency.
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
assert isinstance(step, (PartitionTask, type(None)))
while True:
# If any inputs have finished executing, we want to drain the `materializations` buffer
while len(self.materializations) > 0 and self.materializations[0].done():
# Make space on buffer by popping the task that was done
done_task = self.materializations.popleft()

# Best-effort attempt to yield new work and fill up the buffer to the desired `results_buffer_size`
if self.results_buffer_size is not None:
for _ in range(self.results_buffer_size - len(self.materializations)):
best_effort_step, is_final_task = _best_effort_next_step(stage_id, self.child_plan)
if best_effort_step is None:
break
elif is_final_task:
assert isinstance(best_effort_step, SingleOutputPartitionTask)
self.materializations.append(best_effort_step)
num_final_yielded += 1
logger.debug(
"[plan-%s] YIELDING final task to replace done materialized task (%s so far)",
stage_id,
num_final_yielded,
)
else:
num_intermediate_yielded += 1
logger.debug(
"[plan-%s] YIELDING an intermediate task to replace done materialized task (%s so far)",
stage_id,
num_intermediate_yielded,
)
yield best_effort_step

# Yield the task that was done
num_materialized_yielded += 1
logger.debug("[plan-%s] YIELDING a materialized task (%s so far)", stage_id, num_materialized_yielded)
yield done_task.result()

# If the buffer has too many results already, we yield None until some are completed
if self.results_buffer_size is not None and len(self.materializations) >= self.results_buffer_size:
logger.debug(
"[plan-%s] YIELDING none, waiting on tasks in buffer to complete: %s in buffer, but maximum is %s",
stage_id,
len(self.materializations),
self.results_buffer_size,
)
yield None

yield step
# Important: start again at the top and drain materialized results
# Otherwise it may lead to a weird corner-case where the plan has ended (raising StopIteration)
# but some of the completed materializations haven't been drained from the buffer.
continue

except StopIteration:
if len(materializations) > 0:
logger.debug("materialize blocked on completion of all sources: %s", materializations)
yield None
else:
return
# Materialize a single dependency.
try:
step = next(self.child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
self.materializations.append(step)
num_final_yielded += 1
logger.debug("[plan-%s] YIELDING final task (%s so far)", stage_id, num_final_yielded)
elif isinstance(step, PartitionTask):
num_intermediate_yielded += 1
logger.debug(
"[plan-%s] YIELDING an intermediate task (%s so far)", stage_id, num_intermediate_yielded
)

assert isinstance(step, (PartitionTask, type(None)))
yield step

except StopIteration:
if len(self.materializations) > 0:
logger.debug(
"[plan-%s] YIELDING none, iterator completed but materialize is blocked on completion of all sources: %s",
stage_id,
self.materializations,
)
yield None
else:
return


def enumerate_open_executions(
Expand Down
6 changes: 4 additions & 2 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ def pretty_print(self, simple: bool = False) -> str:
def __repr__(self) -> str:
return self._scheduler.repr_ascii(simple=False)

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None
) -> physical_plan.MaterializedPhysicalPlan:
return iter(physical_plan.Materialize(self._scheduler.to_partition_tasks(psets), results_buffer_size))


class AdaptivePhysicalPlanScheduler:
Expand Down
15 changes: 11 additions & 4 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
def run_iter(
self,
builder: LogicalPlanBuilder,
# NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0
results_buffer_size: int | None = None,
) -> Iterator[PyMaterializedResult]:
# NOTE: Freeze and use this same execution config for the entire execution
Expand All @@ -167,7 +166,8 @@ def run_iter(
source_id, plan_scheduler = adaptive_planner.next()
# don't store partition sets in variable to avoid reference
tasks = plan_scheduler.to_partition_tasks(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
results_buffer_size,
)
del plan_scheduler
results_gen = self._physical_plan_to_partitions(tasks)
Expand Down Expand Up @@ -198,7 +198,7 @@ def run_iter(
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
tasks = plan_scheduler.to_partition_tasks(psets, results_buffer_size)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
Expand Down Expand Up @@ -226,12 +226,14 @@ def _physical_plan_to_partitions(
while True:
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: plan waiting on work")
break

elif isinstance(next_step, MaterializedResult):
assert isinstance(next_step, PyMaterializedResult)

# A final result.
logger.debug("Yielding completed step")
yield next_step
next_step = next(plan)
continue
Expand All @@ -240,6 +242,7 @@ def _physical_plan_to_partitions(
next_step.resource_request,
):
# Insufficient resources; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: insufficient resources")
break

else:
Expand Down Expand Up @@ -294,7 +297,11 @@ def _physical_plan_to_partitions(
next_step = next(plan)

# Await at least one task and process the results.
assert len(future_to_task) > 0, "Scheduler deadlocked! This should never happen. Please file an issue."
if not len(future_to_task) > 0:
raise RuntimeError(
f"Scheduler deadlocked! This should never happen. Please file an issue. Current step: {type(next_step)}"
)

done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED)
for done_future in done_set:
done_id = future_to_task.pop(done_future)
Expand Down
14 changes: 12 additions & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None:
self.threads_by_df: dict[str, threading.Thread] = dict()
self.results_by_df: dict[str, Queue] = {}
self.active_by_df: dict[str, bool] = dict()
self.results_buffer_size_by_df: dict[str, int | None] = dict()

self.use_ray_tqdm = use_ray_tqdm

Expand Down Expand Up @@ -467,8 +468,9 @@ def start_plan(
results_buffer_size: int | None = None,
) -> None:
self.execution_configs_objref_by_df[result_uuid] = ray.put(daft_execution_config)
self.results_by_df[result_uuid] = Queue(maxsize=results_buffer_size or -1)
self.results_by_df[result_uuid] = Queue(maxsize=1 if results_buffer_size is not None else -1)
self.active_by_df[result_uuid] = True
self.results_buffer_size_by_df[result_uuid] = results_buffer_size

t = threading.Thread(
target=self._run_plan,
Expand All @@ -495,6 +497,7 @@ def stop_plan(self, result_uuid: str) -> None:
del self.threads_by_df[result_uuid]
del self.active_by_df[result_uuid]
del self.results_by_df[result_uuid]
del self.results_buffer_size_by_df[result_uuid]

def _run_plan(
self,
Expand All @@ -503,7 +506,14 @@ def _run_plan(
result_uuid: str,
) -> None:
# Get executable tasks from plan scheduler.
tasks = plan_scheduler.to_partition_tasks(psets)
results_buffer_size = self.results_buffer_size_by_df[result_uuid]
tasks = plan_scheduler.to_partition_tasks(
psets,
# Attempt to subtract 1 from results_buffer_size because the return Queue size is already 1
# If results_buffer_size=1 though, we can't do much and the total buffer size actually has to be >= 2
# because we have two buffers (the Queue and the buffer inside the `materialize` generator)
None if results_buffer_size is None else max(results_buffer_size - 1, 1),
)

daft_execution_config = self.execution_configs_objref_by_df[result_uuid]
inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict()
Expand Down
Empty file added tests/physical_plan/__init__.py
Empty file.
Loading
Loading