Skip to content

Commit

Permalink
[BUG] Fix into_partitions to use a more naive approach without materi…
Browse files Browse the repository at this point in the history
…alization (#3080)

Previous implementation of `into_partitions()` would fully materialize
the results in order to provide even splitting of partitions. However,
this can be very costly when all a user might want to do is to split the
partitions into smaller chunks for more memory-efficient processing.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 21, 2024
1 parent 16665f2 commit 2f09cae
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 48 deletions.
4 changes: 2 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,8 +1659,8 @@ def repartition(self, num: Optional[int], *partition_by: ColumnInputType) -> "Da
def into_partitions(self, num: int) -> "DataFrame":
"""Splits or coalesces DataFrame to ``num`` partitions. Order is preserved.
No rebalancing is done; the minimum number of splits or merges are applied.
(i.e. if there are 2 partitions, and change it into 3, this function will just split the bigger one)
This will naively greedily split partitions in a round-robin fashion to hit the targeted number of partitions.
The number of rows/size in a given partition is not taken into account during the splitting.
Example:
>>> import daft
Expand Down
30 changes: 30 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,33 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
)

return results


@dataclass(frozen=True)
class FanoutEvenSlices(FanoutInstruction):
def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
[input] = inputs
results = []

input_length = len(input)
num_outputs = self.num_outputs()

chunk_size, remainder = divmod(input_length, num_outputs)
ptr = 0
for output_idx in range(self.num_outputs()):
end = ptr + chunk_size + 1 if output_idx < remainder else ptr + chunk_size
results.append(input.slice(ptr, end))
ptr = end
assert ptr == input_length

return results

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
# TODO: Derive this based on the ratios of num rows
return [
PartialPartitionMetadata(
num_rows=None,
size_bytes=None,
)
for _ in range(self._num_outputs)
]
61 changes: 15 additions & 46 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,61 +1351,30 @@ def split(
num_input_partitions: int,
num_output_partitions: int,
) -> InProgressPhysicalPlan[PartitionT]:
"""Repartition the child_plan into more partitions by splitting partitions only. Preserves order."""
"""Repartition the child_plan into more partitions by splitting partitions only. Preserves order.
This performs a naive split, which might lead to data skews but does not require a full materialization of
input partitions when performing the split.
"""
assert (
num_output_partitions >= num_input_partitions
), f"Cannot split from {num_input_partitions} to {num_output_partitions}."

# Materialize the input partitions so we can see the number of rows and try to split evenly.
# Splitting evenly is fairly important if this operation is to be used for parallelism.
# (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.)
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
base_splits_per_partition, num_partitions_with_extra_output = divmod(num_output_partitions, num_input_partitions)

input_partition_idx = 0
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
yield step

while any(not _.done() for _ in materializations):
logger.debug("split_to blocked on completion of all sources: %s", materializations)
yield None

splits_per_partition = deque([1 for _ in materializations])
num_splits_to_apply = num_output_partitions - num_input_partitions

# Split by rows for now.
# In the future, maybe parameterize to allow alternatively splitting by size.
rows_by_partitions = [task.partition_metadata().num_rows for task in materializations]

# Calculate how to spread the required splits across all the partitions.
# Iteratively apply a split and update how many rows would be in the resulting partitions.
# After this loop, splits_per_partition has the final number of splits to apply to each partition.
rows_after_splitting = [float(_) for _ in rows_by_partitions]
for _ in range(num_splits_to_apply):
_, split_at = max((rows, index) for (index, rows) in enumerate(rows_after_splitting))
splits_per_partition[split_at] += 1
rows_after_splitting[split_at] = float(rows_by_partitions[split_at] / splits_per_partition[split_at])

# Emit the split partitions.
for task, num_out, num_rows in zip(consume_deque(materializations), splits_per_partition, rows_by_partitions):
if num_out == 1:
yield PartitionTaskBuilder[PartitionT](
inputs=[task.partition()],
partial_metadatas=[task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=task.partition_metadata().size_bytes),
num_out = (
base_splits_per_partition + 1
if input_partition_idx < num_partitions_with_extra_output
else base_splits_per_partition
)
step = step.add_instruction(instruction=execution_step.FanoutEvenSlices(_num_outputs=num_out))
input_partition_idx += 1
yield step
else:
boundaries = [math.ceil(num_rows * i / num_out) for i in range(num_out + 1)]
starts, ends = boundaries[:-1], boundaries[1:]
yield PartitionTaskBuilder[PartitionT](
inputs=[task.partition()],
partial_metadatas=[task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=task.partition_metadata().size_bytes),
).add_instruction(
instruction=execution_step.FanoutSlices(_num_outputs=num_out, slices=list(zip(starts, ends)))
)
yield step


def coalesce(
Expand Down

0 comments on commit 2f09cae

Please sign in to comment.