Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [New Query Planner] [2/N] Push partition spec into physical plan, remove Coalesce logical op. #1540

Merged
merged 10 commits into from
Oct 30, 2023

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Oct 27, 2023

This PR pushes the partition spec concept into the physical plan; this is required in order to defer partition information gathering (e.g. globbing) until plan translation, and also helps unify some past duplication of partitioning-based logic between the logical plan and logical -> physical translation.

In addition to push all partition spec logic to plan translation and the physical plan, the following major changes were made to facilitate this:

  • The Coalesce logical op is absorbed into the Repartition logical op; the switching between a Split or Coalesce is now made at logical -> physical translation time.
  • df.repartition() now takes an optional num_partitions arg, and num_partitions is now optional for the logical Repartition op; this is needed to support the "hash repartition by these columns to the same number of partitions as the input" use case, e.g. for writing out partitioned CSV/JSON datasets.
  • Support for df.num_partitions() has been preserved, by triggering optimization + plan translation to fulfill the query; we can look at caching this in the future, so we don't have to redo globbing once we've moved globbing to plan translation time. df.num_partitions() API is removed, since this can't be determined based on just the logical plan alone. We could add this back and trigger plan optimization + translation to get this answer, although this will end up triggering nontrivial computation for the globbing backend, so we may want to (1) only trigger enough execution in order to get the num_partitions answer, and (2) cache that execution.

TODOs Before Merging

  • Add DropRepartitions optimizations as a split between a logical optimization rule (for non-partition spec logic) and a plan translation-time optimization (for partition spec logic).
  • Add old logical Project unit tests around partition spec munging to physical Project operator

@clarkzinzow clarkzinzow changed the title [FEAT] [New Query Planner] Push partition spec into physical plan, remove Coalesce logical op. [FEAT] [New Query Planner] [2/N] Push partition spec into physical plan, remove Coalesce logical op. Oct 27, 2023
@github-actions github-actions bot added the enhancement New feature or request label Oct 27, 2023
@jaychia
Copy link
Contributor

jaychia commented Oct 27, 2023

The "major changes" sound good to me

The Coalesce logical op is absorbed into the Repartition logical op; the switching between a Split or Coalesce is now made at logical -> physical translation time.

Sounds like then split/coalesce is a physical-only concept, and the logical plan only knows the concept of "repartition"?

df.num_partitions() API is removed

I do think we will want to add this soon and do "trigger plan optimization + translation to get this answer". Before we have AQE/dynamic repartitioning at runtime, the number of partitions is pretty important to know for performance reasons (e.g. if I have 20,000 partitions, as a user I should be aware and run a repartition).

I'm ok throwing a NotImplementedError for now though!

@clarkzinzow
Copy link
Contributor Author

Sounds like then split/coalesce is a physical-only concept, and the logical plan only knows the concept of "repartition"?

Yep, that's correct!

I do think we will want to add this soon and do "trigger plan optimization + translation to get this answer"

I'll take a look at the minimal work to support this. In terms of redundant work, I think this shouldn't amount to much more than a repeated optimization + plan translation, which is currently very cheap. Once we move globbing to be at planning time, we could find a way to cache that globbing, if needed.

@clarkzinzow
Copy link
Contributor Author

Actually, we already have optimization + plan translation cached with the Rust-side physical plan scheduler, so we should just be able to expose another num_partitions() API on that physical plan scheduler, and if ever want to cache the optimization + plan translation, that should be Python-only changes around the runner APIs; could be very straightforward!

@codecov
Copy link

codecov bot commented Oct 27, 2023

Codecov Report

Merging #1540 (824d5b5) into main (6bd8f51) will increase coverage by 0.00%.
Report is 4 commits behind head on main.
The diff coverage is 94.44%.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1540   +/-   ##
=======================================
  Coverage   85.20%   85.21%           
=======================================
  Files          54       54           
  Lines        5131     5120   -11     
=======================================
- Hits         4372     4363    -9     
+ Misses        759      757    -2     
Files Coverage Δ
daft/io/file_path.py 100.00% <100.00%> (ø)
daft/logical/builder.py 88.88% <100.00%> (-0.19%) ⬇️
daft/plan_scheduler/physical_plan_scheduler.py 100.00% <100.00%> (ø)
daft/runners/partitioning.py 81.03% <100.00%> (+0.33%) ⬆️
daft/dataframe/dataframe.py 89.58% <90.00%> (+0.12%) ⬆️

daft/dataframe/dataframe.py Outdated Show resolved Hide resolved
daft/dataframe/dataframe.py Show resolved Hide resolved
src/daft-plan/src/logical_ops/repartition.rs Outdated Show resolved Hide resolved
@clarkzinzow clarkzinzow merged commit fa59676 into main Oct 30, 2023
37 checks passed
@clarkzinzow clarkzinzow deleted the clark/push-partitioning-to-physical branch October 30, 2023 22:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants