From 2f09caeed4e493ad1b5dfb970c53ce6b60ad596c Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:49:39 -0700 Subject: [PATCH] [BUG] Fix into_partitions to use a more naive approach without materialization (#3080) Previous implementation of `into_partitions()` would fully materialize the results in order to provide even splitting of partitions. However, this can be very costly when all a user might want to do is to split the partitions into smaller chunks for more memory-efficient processing. --------- Co-authored-by: Jay Chia --- daft/dataframe/dataframe.py | 4 +-- daft/execution/execution_step.py | 30 ++++++++++++++++ daft/execution/physical_plan.py | 61 ++++++++------------------------ 3 files changed, 47 insertions(+), 48 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index cd96a9a480..8dbb33111e 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -1659,8 +1659,8 @@ def repartition(self, num: Optional[int], *partition_by: ColumnInputType) -> "Da def into_partitions(self, num: int) -> "DataFrame": """Splits or coalesces DataFrame to ``num`` partitions. Order is preserved. - No rebalancing is done; the minimum number of splits or merges are applied. - (i.e. if there are 2 partitions, and change it into 3, this function will just split the bigger one) + This will naively greedily split partitions in a round-robin fashion to hit the targeted number of partitions. + The number of rows/size in a given partition is not taken into account during the splitting. Example: >>> import daft diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index daa9afa289..ea490073ea 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -1044,3 +1044,33 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) ) return results + + +@dataclass(frozen=True) +class FanoutEvenSlices(FanoutInstruction): + def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]: + [input] = inputs + results = [] + + input_length = len(input) + num_outputs = self.num_outputs() + + chunk_size, remainder = divmod(input_length, num_outputs) + ptr = 0 + for output_idx in range(self.num_outputs()): + end = ptr + chunk_size + 1 if output_idx < remainder else ptr + chunk_size + results.append(input.slice(ptr, end)) + ptr = end + assert ptr == input_length + + return results + + def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]: + # TODO: Derive this based on the ratios of num rows + return [ + PartialPartitionMetadata( + num_rows=None, + size_bytes=None, + ) + for _ in range(self._num_outputs) + ] diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 2b65a35f21..230ef493aa 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -1351,61 +1351,30 @@ def split( num_input_partitions: int, num_output_partitions: int, ) -> InProgressPhysicalPlan[PartitionT]: - """Repartition the child_plan into more partitions by splitting partitions only. Preserves order.""" + """Repartition the child_plan into more partitions by splitting partitions only. Preserves order. + This performs a naive split, which might lead to data skews but does not require a full materialization of + input partitions when performing the split. + """ assert ( num_output_partitions >= num_input_partitions ), f"Cannot split from {num_input_partitions} to {num_output_partitions}." - # Materialize the input partitions so we can see the number of rows and try to split evenly. - # Splitting evenly is fairly important if this operation is to be used for parallelism. - # (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.) - materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - stage_id = next(stage_id_counter) + base_splits_per_partition, num_partitions_with_extra_output = divmod(num_output_partitions, num_input_partitions) + + input_partition_idx = 0 for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output(stage_id=stage_id) - materializations.append(step) - yield step - - while any(not _.done() for _ in materializations): - logger.debug("split_to blocked on completion of all sources: %s", materializations) - yield None - - splits_per_partition = deque([1 for _ in materializations]) - num_splits_to_apply = num_output_partitions - num_input_partitions - - # Split by rows for now. - # In the future, maybe parameterize to allow alternatively splitting by size. - rows_by_partitions = [task.partition_metadata().num_rows for task in materializations] - - # Calculate how to spread the required splits across all the partitions. - # Iteratively apply a split and update how many rows would be in the resulting partitions. - # After this loop, splits_per_partition has the final number of splits to apply to each partition. - rows_after_splitting = [float(_) for _ in rows_by_partitions] - for _ in range(num_splits_to_apply): - _, split_at = max((rows, index) for (index, rows) in enumerate(rows_after_splitting)) - splits_per_partition[split_at] += 1 - rows_after_splitting[split_at] = float(rows_by_partitions[split_at] / splits_per_partition[split_at]) - - # Emit the split partitions. - for task, num_out, num_rows in zip(consume_deque(materializations), splits_per_partition, rows_by_partitions): - if num_out == 1: - yield PartitionTaskBuilder[PartitionT]( - inputs=[task.partition()], - partial_metadatas=[task.partition_metadata()], - resource_request=ResourceRequest(memory_bytes=task.partition_metadata().size_bytes), + num_out = ( + base_splits_per_partition + 1 + if input_partition_idx < num_partitions_with_extra_output + else base_splits_per_partition ) + step = step.add_instruction(instruction=execution_step.FanoutEvenSlices(_num_outputs=num_out)) + input_partition_idx += 1 + yield step else: - boundaries = [math.ceil(num_rows * i / num_out) for i in range(num_out + 1)] - starts, ends = boundaries[:-1], boundaries[1:] - yield PartitionTaskBuilder[PartitionT]( - inputs=[task.partition()], - partial_metadatas=[task.partition_metadata()], - resource_request=ResourceRequest(memory_bytes=task.partition_metadata().size_bytes), - ).add_instruction( - instruction=execution_step.FanoutSlices(_num_outputs=num_out, slices=list(zip(starts, ends))) - ) + yield step def coalesce(