-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Stage fusion optimizations, off by default #22373
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall paradigm is looking good, making sure that read-time load balancing is preserved when given _spread_resource_prefix="node:"
is the only big blocker that I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
python/ray/data/tests/test_stats.py
Outdated
@@ -167,6 +167,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared): | |||
for batch in pipe.iter_batches(): | |||
pass | |||
stats = canonicalize(pipe.stats()) | |||
print(stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print(stats) |
This PR adds the following stage fusion optimizations (off by default). In a later PR, I plan to enable this by default for DatasetPipelines. - Stage fusion: Whether to fuse compatible OneToOne stages. - Read stage fusion: Whether to fuse read stages into downstream OneToOne stages. This is accomplished by rewriting the read stage (LazyBlockList) into a transformation over a collection of read tasks (BlockList -> MapBatches(do_read)). - Shuffle stage fusion: Whether to fuse compatible OneToOne stages into shuffle stages that support specifying a map-side block UDF. Stages are considered compatible if their compute strategy is the same ("tasks" vs "actors"), and they have the same Ray remote args. Currently, the PR is ignoring the remote args of read tasks, but this will be fixed as a followup (I didn't want to change the read tasks default here).
Why are these changes needed?
This PR adds the following stage fusion optimizations (off by default). In a later PR, I plan to enable this by default for DatasetPipelines.
Stages are considered compatible if their compute strategy is the same ("tasks" vs "actors"), and they have the same Ray remote args. Currently, the PR is ignoring the remote args of read tasks, but this will be fixed as a followup (I didn't want to change the read tasks default here).
I've experimented with this locally and the memory reduction is ~50%, mostly from clearing input blocks. In the distributed setting I'd expect more savings especially under memory pressure, since fusion suppresses excess spilling / transfer of blocks.
Related issue number
Towards #18791