From ff218e7289707ac368979dbaba7e9eab7145be05 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Tue, 21 Nov 2023 20:50:24 -0800 Subject: [PATCH] [PERF] Remove calls to `remote_len_partition` (#1660) This PR refactors `PartitionSet.len_of_partitions` to avoid usage of our `remote_len_partition` Ray remote function, which has been observed to cause problems when run on dataframes with large amounts of spilling. A few refactors had to be performed to achieve this: 1. The `RayPartitionSet` was refactored to hold `RayMaterializedResult` objects instead of just raw `ray.ObjectRef[Table]`. - This allows us to access the `.metadata()` method which holds the length of each partition. - To access the `ray.ObjectRef[Table]`, we can use the `.partition()` method which holds the partition 2. As part of (1), `PartitionSet.set_partition` had to be refactored to take as input a `MaterializedResult` instead of a plain `PartitionT` 3. On the execution end, we refactored the code mainly around `MaterializedPhysicalPlan`, which now yields `MaterializedResult[PartitionT]` instead of just `PartitionT`, when indicating "done" tasks. --------- Co-authored-by: Jay Chia --- daft/daft.pyi | 2 +- daft/dataframe/dataframe.py | 5 +- daft/execution/execution_step.py | 51 +++++-------------- daft/execution/physical_plan.py | 13 +++-- daft/execution/rust_physical_plan_shim.py | 6 +-- daft/runners/partitioning.py | 40 +++++++++++++-- daft/runners/pyrunner.py | 36 ++++++------- daft/runners/ray_runner.py | 62 ++++++++++------------- daft/runners/runner.py | 10 ++-- daft/runners/runner_io.py | 4 +- 10 files changed, 114 insertions(+), 115 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 0108566999..8960b54d25 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -934,7 +934,7 @@ class PhysicalPlanScheduler: def num_partitions(self) -> int: ... def to_partition_tasks( self, psets: dict[str, list[PartitionT]], is_ray_runner: bool - ) -> physical_plan.MaterializedPhysicalPlan: ... + ) -> physical_plan.InProgressPhysicalPlan: ... class LogicalPlanBuilder: """ diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 557f6d3f4f..6661b2a1a6 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -198,8 +198,9 @@ def iter_partitions(self) -> Iterator[Union[Table, "RayObjectRef"]]: else: # Execute the dataframe in a streaming fashion. context = get_context() - partitions_iter = context.runner().run_iter(self._builder) - yield from partitions_iter + results_iter = context.runner().run_iter(self._builder) + for result in results_iter: + yield result.partition() @DataframePublicAPI def __repr__(self) -> str: diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 87795a9571..a41ec19da4 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -4,7 +4,7 @@ import pathlib import sys from dataclasses import dataclass, field -from typing import Generic, TypeVar +from typing import Generic if sys.version_info < (3, 8): from typing_extensions import Protocol @@ -26,14 +26,15 @@ from daft.logical.map_partition_ops import MapPartitionOp from daft.logical.schema import Schema from daft.runners.partitioning import ( + MaterializedResult, PartialPartitionMetadata, PartitionMetadata, + PartitionT, TableParseCSVOptions, TableReadOptions, ) from daft.table import Table, table_io -PartitionT = TypeVar("PartitionT") ID_GEN = itertools.count() @@ -175,28 +176,29 @@ def set_result(self, result: list[MaterializedResult[PartitionT]]) -> None: def done(self) -> bool: return self._result is not None + def result(self) -> MaterializedResult[PartitionT]: + assert self._result is not None, "Cannot call .result() on a PartitionTask that is not done" + return self._result + def cancel(self) -> None: # Currently only implemented for Ray tasks. - if self._result is not None: - self._result.cancel() + if self.done(): + self.result().cancel() def partition(self) -> PartitionT: """Get the PartitionT resulting from running this PartitionTask.""" - assert self._result is not None - return self._result.partition() + return self.result().partition() def partition_metadata(self) -> PartitionMetadata: """Get the metadata of the result partition. (Avoids retrieving the actual partition itself if possible.) """ - assert self._result is not None - return self._result.metadata() + return self.result().metadata() def vpartition(self) -> Table: """Get the raw vPartition of the result.""" - assert self._result is not None - return self._result.vpartition() + return self.result().vpartition() def __str__(self) -> str: return super().__str__() @@ -251,35 +253,6 @@ def __repr__(self) -> str: return super().__str__() -class MaterializedResult(Protocol[PartitionT]): - """A protocol for accessing the result partition of a PartitionTask. - - Different Runners can fill in their own implementation here. - """ - - def partition(self) -> PartitionT: - """Get the partition of this result.""" - ... - - def vpartition(self) -> Table: - """Get the vPartition of this result.""" - ... - - def metadata(self) -> PartitionMetadata: - """Get the metadata of the partition in this result.""" - ... - - def cancel(self) -> None: - """If possible, cancel execution of this PartitionTask.""" - ... - - def _noop(self, _: PartitionT) -> None: - """Implement this as a no-op. - https://peps.python.org/pep-0544/#overriding-inferred-variance-of-protocol-classes - """ - ... - - class Instruction(Protocol): """An instruction is a function to run over a list of partitions. diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 8d527ac6ee..a25f3511b4 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -38,11 +38,14 @@ ) from daft.expressions import ExpressionsProjection from daft.logical.schema import Schema -from daft.runners.partitioning import PartialPartitionMetadata +from daft.runners.partitioning import ( + MaterializedResult, + PartialPartitionMetadata, + PartitionT, +) logger = logging.getLogger(__name__) -PartitionT = TypeVar("PartitionT") T = TypeVar("T") @@ -50,7 +53,7 @@ InProgressPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionTaskBuilder[PartitionT]]] # A PhysicalPlan that is complete and will only yield PartitionTasks or final PartitionTs. -MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]] +MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], MaterializedResult[PartitionT]]] def _stage_id_counter(): @@ -108,7 +111,7 @@ def file_read( for i in range(len(vpartition)): file_read_step = PartitionTaskBuilder[PartitionT]( inputs=[done_task.partition()], - partial_metadatas=[done_task.partition_metadata()], + partial_metadatas=None, # Child's metadata doesn't really matter for a file read ).add_instruction( instruction=execution_step.ReadFile( index=i, @@ -738,7 +741,7 @@ def materialize( # Check if any inputs finished executing. while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() - yield done_task.partition() + yield done_task.result() # Materialize a single dependency. try: diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index b232f9c4a9..aab94c339e 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Iterator, TypeVar, cast +from typing import Iterator, cast from daft.daft import ( FileFormat, @@ -19,11 +19,9 @@ from daft.expressions import Expression, ExpressionsProjection from daft.logical.map_partition_ops import MapPartitionOp from daft.logical.schema import Schema -from daft.runners.partitioning import PartialPartitionMetadata +from daft.runners.partitioning import PartialPartitionMetadata, PartitionT from daft.table import Table -PartitionT = TypeVar("PartitionT") - def scan_with_tasks( scan_tasks: list[ScanTask], diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 6da1085d66..e11c3b1f72 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -92,6 +92,40 @@ def from_table(cls, table: Table) -> PartitionMetadata: PartitionT = TypeVar("PartitionT") +class MaterializedResult(Generic[PartitionT]): + """A protocol for accessing the result partition of a PartitionTask. + + Different Runners can fill in their own implementation here. + """ + + @abstractmethod + def partition(self) -> PartitionT: + """Get the partition of this result.""" + ... + + @abstractmethod + def vpartition(self) -> Table: + """Get the vPartition of this result.""" + ... + + @abstractmethod + def metadata(self) -> PartitionMetadata: + """Get the metadata of the partition in this result.""" + ... + + @abstractmethod + def cancel(self) -> None: + """If possible, cancel execution of this PartitionTask.""" + ... + + @abstractmethod + def _noop(self, _: PartitionT) -> None: + """Implement this as a no-op. + https://peps.python.org/pep-0544/#overriding-inferred-variance-of-protocol-classes + """ + ... + + class PartitionSet(Generic[PartitionT]): def _get_merged_vpartition(self) -> Table: raise NotImplementedError() @@ -126,7 +160,7 @@ def get_partition(self, idx: PartID) -> PartitionT: raise NotImplementedError() @abstractmethod - def set_partition(self, idx: PartID, part: PartitionT) -> None: + def set_partition(self, idx: PartID, part: MaterializedResult[PartitionT]) -> None: raise NotImplementedError() @abstractmethod @@ -139,10 +173,6 @@ def has_partition(self, idx: PartID) -> bool: @abstractmethod def __len__(self) -> int: - return sum(self.len_of_partitions()) - - @abstractmethod - def len_of_partitions(self) -> list[int]: raise NotImplementedError() @abstractmethod diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 1e950ea767..7c87ea82c8 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -16,13 +16,14 @@ StorageConfig, ) from daft.execution import physical_plan -from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask +from daft.execution.execution_step import Instruction, PartitionTask from daft.filesystem import glob_path_with_stats from daft.internal.gpu import cuda_device_count from daft.logical.builder import LogicalPlanBuilder from daft.logical.schema import Schema from daft.runners import runner_io from daft.runners.partitioning import ( + MaterializedResult, PartID, PartitionCacheEntry, PartitionMetadata, @@ -52,8 +53,8 @@ def _get_merged_vpartition(self) -> Table: def get_partition(self, idx: PartID) -> Table: return self._partitions[idx] - def set_partition(self, idx: PartID, part: Table) -> None: - self._partitions[idx] = part + def set_partition(self, idx: PartID, part: MaterializedResult[Table]) -> None: + self._partitions[idx] = part.partition() def delete_partition(self, idx: PartID) -> None: del self._partitions[idx] @@ -62,11 +63,7 @@ def has_partition(self, idx: PartID) -> bool: return idx in self._partitions def __len__(self) -> int: - return sum(self.len_of_partitions()) - - def len_of_partitions(self) -> list[int]: - partition_ids = sorted(list(self._partitions.keys())) - return [len(self._partitions[pid]) for pid in partition_ids] + return sum([len(self._partitions[pid]) for pid in self._partitions]) def num_partitions(self) -> int: return len(self._partitions) @@ -119,11 +116,11 @@ def runner_io(self) -> PyRunnerIO: return PyRunnerIO() def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: - partitions = list(self.run_iter(builder)) + results = list(self.run_iter(builder)) result_pset = LocalPartitionSet({}) - for i, partition in enumerate(partitions): - result_pset.set_partition(i, partition) + for i, result in enumerate(results): + result_pset.set_partition(i, result) pset_entry = self.put_partition_set_into_cache(result_pset) return pset_entry @@ -133,7 +130,7 @@ def run_iter( builder: LogicalPlanBuilder, # NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0 results_buffer_size: int | None = None, - ) -> Iterator[Table]: + ) -> Iterator[PyMaterializedResult]: # Optimize the logical plan. builder = builder.optimize() # Finalize the logical plan and get a physical plan scheduler for translating the @@ -147,13 +144,16 @@ def run_iter( # Get executable tasks from planner. tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False) with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): - partitions_gen = self._physical_plan_to_partitions(tasks) - yield from partitions_gen + results_gen = self._physical_plan_to_partitions(tasks) + yield from results_gen def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]: - return self.run_iter(builder, results_buffer_size=results_buffer_size) + for result in self.run_iter(builder, results_buffer_size=results_buffer_size): + yield result.partition() - def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalPlan) -> Iterator[Table]: + def _physical_plan_to_partitions( + self, plan: physical_plan.MaterializedPhysicalPlan[Table] + ) -> Iterator[PyMaterializedResult]: inflight_tasks: dict[str, PartitionTask] = dict() inflight_tasks_resources: dict[str, ResourceRequest] = dict() future_to_task: dict[futures.Future, str] = dict() @@ -171,7 +171,9 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP # Blocked on already dispatched tasks; await some tasks. break - elif isinstance(next_step, Table): + elif isinstance(next_step, MaterializedResult): + assert isinstance(next_step, PyMaterializedResult) + # A final result. yield next_step next_step = next(plan) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index af14ac6de9..fae345eb2e 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -36,7 +36,6 @@ from daft.execution.execution_step import ( FanoutInstruction, Instruction, - MaterializedResult, MultiOutputPartitionTask, PartitionTask, ReduceInstruction, @@ -45,6 +44,7 @@ from daft.filesystem import glob_path_with_stats from daft.runners import runner_io from daft.runners.partitioning import ( + MaterializedResult, PartID, PartitionCacheEntry, PartitionMetadata, @@ -120,11 +120,6 @@ def _to_pandas_ref(df: pd.DataFrame | ray.ObjectRef[pd.DataFrame]) -> ray.Object raise ValueError("Expected a Ray object ref or a Pandas DataFrame, " f"got {type(df)}") -@ray.remote -def remote_len_partition(p: Table) -> int: - return len(p) - - @ray.remote def sample_schema_from_filepath( first_file_path: str, @@ -138,10 +133,10 @@ def sample_schema_from_filepath( @dataclass class RayPartitionSet(PartitionSet[ray.ObjectRef]): - _partitions: dict[PartID, ray.ObjectRef] + _results: dict[PartID, RayMaterializedResult] def items(self) -> list[tuple[PartID, ray.ObjectRef]]: - return sorted(self._partitions.items()) + return [(pid, result.partition()) for pid, result in sorted(self._results.items())] def _get_merged_vpartition(self) -> Table: ids_and_partitions = self.items() @@ -156,7 +151,7 @@ def to_ray_dataset(self) -> RayDataset: "Unable to import `ray.data.from_arrow_refs`. Please ensure that you have a compatible version of Ray >= 1.10 installed." ) - blocks = [_make_ray_block_from_vpartition.remote(self._partitions[k]) for k in self._partitions.keys()] + blocks = [_make_ray_block_from_vpartition.remote(self._results[k].partition()) for k in self._results.keys()] # NOTE: although the Ray method is called `from_arrow_refs`, this method works also when the blocks are List[T] types # instead of Arrow tables as the codepath for Dataset creation is the same. return from_arrow_refs(blocks) @@ -176,36 +171,31 @@ def _make_dask_dataframe_partition_from_vpartition(partition: Table) -> pd.DataF return partition.to_pandas() ddf_parts = [ - _make_dask_dataframe_partition_from_vpartition(self._partitions[k]) for k in self._partitions.keys() + _make_dask_dataframe_partition_from_vpartition(self._results[k].partition()) for k in self._results.keys() ] return dd.from_delayed(ddf_parts, meta=meta) def get_partition(self, idx: PartID) -> ray.ObjectRef: - return self._partitions[idx] + return self._results[idx].partition() - def set_partition(self, idx: PartID, part: ray.ObjectRef) -> None: - self._partitions[idx] = part + def set_partition(self, idx: PartID, result: MaterializedResult[ray.ObjectRef]) -> None: + assert isinstance(result, RayMaterializedResult) + self._results[idx] = result def delete_partition(self, idx: PartID) -> None: - del self._partitions[idx] + del self._results[idx] def has_partition(self, idx: PartID) -> bool: - return idx in self._partitions + return idx in self._results def __len__(self) -> int: - return sum(self.len_of_partitions()) - - def len_of_partitions(self) -> list[int]: - partition_ids = sorted(list(self._partitions.keys())) - - result: list[int] = ray.get([remote_len_partition.remote(self._partitions[pid]) for pid in partition_ids]) - return result + return sum([self._results[pid].metadata().num_rows for pid in self._results]) def num_partitions(self) -> int: - return len(self._partitions) + return len(self._results) def wait(self) -> None: - deduped_object_refs = set(self._partitions.values()) + deduped_object_refs = {r.partition() for r in self._results.values()} ray.wait(list(deduped_object_refs)) @@ -274,7 +264,7 @@ def partition_set_from_ray_dataset( daft_vpartitions = [ _make_daft_partition_from_ray_dataset_blocks.remote(block, daft_schema) for block in block_refs ] - return RayPartitionSet(dict(enumerate(daft_vpartitions))), daft_schema + return RayPartitionSet({i: RayMaterializedResult(obj) for i, obj in enumerate(daft_vpartitions)}), daft_schema def partition_set_from_dask_dataframe( self, @@ -296,7 +286,7 @@ def partition_set_from_dask_dataframe( "Can't convert a Dask DataFrame with inconsistent schemas across partitions to a Daft DataFrame:", schemas, ) - return RayPartitionSet(dict(enumerate(daft_vpartitions))), schemas[0] + return RayPartitionSet({i: RayMaterializedResult(obj) for i, obj in enumerate(daft_vpartitions)}), schemas[0] def _get_ray_task_options(resource_request: ResourceRequest) -> dict[str, Any]: @@ -406,7 +396,7 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None: self.use_ray_tqdm = use_ray_tqdm - def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: + def next(self, result_uuid: str) -> RayMaterializedResult | StopIteration: # Case: thread is terminated and no longer exists. # Should only be hit for repeated calls to next() after StopIteration. if result_uuid not in self.threads_by_df: @@ -495,7 +485,7 @@ def _run_plan( # Blocked on already dispatched tasks; await some tasks. break - elif isinstance(next_step, ray.ObjectRef): + elif isinstance(next_step, MaterializedResult): # A final result. self.results_by_df[result_uuid].put(next_step) next_step = next(tasks) @@ -650,7 +640,9 @@ def active_plans(self) -> list[str]: else: return self.scheduler.active_plans() - def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[ray.ObjectRef]: + def run_iter( + self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None + ) -> Iterator[RayMaterializedResult]: # Optimize the logical plan. builder = builder.optimize() # Finalize the logical plan and get a physical plan scheduler for translating the @@ -701,16 +693,16 @@ def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None self.scheduler.stop_plan(result_uuid) def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]: - for ref in self.run_iter(builder, results_buffer_size=results_buffer_size): - yield ray.get(ref) + for result in self.run_iter(builder, results_buffer_size=results_buffer_size): + yield ray.get(result.partition()) def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: result_pset = RayPartitionSet({}) - partitions_iter = self.run_iter(builder) + results_iter = self.run_iter(builder) - for i, partition in enumerate(partitions_iter): - result_pset.set_partition(i, partition) + for i, result in enumerate(results_iter): + result_pset.set_partition(i, result) pset_entry = self._part_set_cache.put_partition_set(result_pset) @@ -718,7 +710,7 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: def put_partition_set_into_cache(self, pset: PartitionSet) -> PartitionCacheEntry: if isinstance(pset, LocalPartitionSet): - pset = RayPartitionSet({pid: ray.put(val) for pid, val in pset._partitions.items()}) + pset = RayPartitionSet({pid: RayMaterializedResult(ray.put(val)) for pid, val in pset._partitions.items()}) return self._part_set_cache.put_partition_set(pset=pset) diff --git a/daft/runners/runner.py b/daft/runners/runner.py index 528f3b3337..e969ce5ebc 100644 --- a/daft/runners/runner.py +++ b/daft/runners/runner.py @@ -1,19 +1,19 @@ from __future__ import annotations from abc import abstractmethod -from typing import Generic, Iterator, TypeVar +from typing import Generic, Iterator from daft.logical.builder import LogicalPlanBuilder from daft.runners.partitioning import ( + MaterializedResult, PartitionCacheEntry, PartitionSet, PartitionSetCache, + PartitionT, ) from daft.runners.runner_io import RunnerIO from daft.table import Table -PartitionT = TypeVar("PartitionT") - class Runner(Generic[PartitionT]): def __init__(self) -> None: @@ -34,7 +34,9 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: ... @abstractmethod - def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[PartitionT]: + def run_iter( + self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None + ) -> Iterator[MaterializedResult[PartitionT]]: """Similar to run(), but yield the individual partitions as they are completed. Args: diff --git a/daft/runners/runner_io.py b/daft/runners/runner_io.py index 59f0832889..c03c7fcac8 100644 --- a/daft/runners/runner_io.py +++ b/daft/runners/runner_io.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING from daft.daft import ( CsvSourceConfig, @@ -20,8 +20,6 @@ if TYPE_CHECKING: pass -PartitionT = TypeVar("PartitionT") - class RunnerIO: """Reading and writing data from the Runner.