Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] randomize_block_order() not compatible with stage fusion #26057

Closed
ericl opened this issue Jun 24, 2022 · 10 comments · Fixed by #26090
Closed

[data] randomize_block_order() not compatible with stage fusion #26057

ericl opened this issue Jun 24, 2022 · 10 comments · Fixed by #26090
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks

Comments

@ericl
Copy link
Contributor

ericl commented Jun 24, 2022

What happened + What you expected to happen

Why are these changes needed?
The randomize_block_order() command has issues with breaking stage fusion, per #25870

Versions / Dependencies

master

Reproduction script

ray.data.range(10).randomize_block_order().map(fn)

In the above script, you'd expect read->map_batches to be fused, but the randomize stage breaks this.

Issue Severity

No response

@ericl ericl added bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks data Ray Data-related issues labels Jun 24, 2022
@ericl
Copy link
Contributor Author

ericl commented Jun 24, 2022

I'm not sure about the best way to fix this; one approach would be to define a new kind of stage, that both OneToOne and AllToAll stages could support fusing with. Fusing with this stage would just require the stage to call block_list.randomize_block_order(seed) prior to applying the real data transforms of the stage.

cc @clarkzinzow @jianoaix for any thoughts.

@jianoaix
Copy link
Contributor

If the goal of using randomize_block_order() is to approximate the row-level shuffle at lower cost, we may switch the order of them in optimization, e.g. ds.randomize_block_order().map_batches() is equivalent to ds.map_batches(). randomize_block_order() (commutative), enable the OneToOne ops to fuse.

If the goal is to randomize the placement of blocks onto nodes for some operation (e.g. read), it seems have to be before that operation. But in this case:

  • Randomizing the placement can also be achieved by scheduling, so this seems not a main use case of randomize_block_order()?
  • A dataset is an ordered list of records, randomizing blocks will break the order, so it again looks better to address by scheduling?

@ericl
Copy link
Contributor Author

ericl commented Jun 24, 2022

It's relevant for pipelining too, so ideally you push the reorder as soon as possible (all the way into the read stage of possible). Otherwise, the pipeline stages won't access blocks in random order. I don't think it's efficient/correct to move it to a later stage.

@ericl
Copy link
Contributor Author

ericl commented Jun 24, 2022

Another option is to mutate the dataset Blocklist by reordering the blocks in place. That would be the simplest, but is a little breaking the model.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Jun 24, 2022

Agreed with @jianoaix in general.

It's relevant for pipelining too, so ideally you push the reorder as soon as possible (all the way into the read stage of possible). Otherwise, the pipeline stages won't access blocks in random order. I don't think it's efficient/correct to move it to a later stage.

@ericl If using the API for an efficient block-level random shuffle (not for avoiding hotspots), then pushing the randomize_block_order() operation later in the chain would be more efficient. E.g. for a chain of operations like ray.data.read().map_batches().randomize_blocker_order().map_batches(), the most efficient reorder would be ray.data.read().map_batches().map_batches().randomize_block_order(), since the two maps can be fused into the read, resulting in a single round of tasks + block list reorder.

For the hotspot avoidance use case, I still think that pushing the randomize_block_order() to the end of the surrounding one-to-one stage chain is the optimal behavior, since we're getting the most aggressive stage fusion while still randomizing the block order of the stage returns. But this is contingent on these one-to-one stages being able to be fused.

@ericl
Copy link
Contributor Author

ericl commented Jun 24, 2022

It makes sense. I just want to make sure it also works for read().randomize().map_batches().window(), where the randomize has to occur anywhere before the window.

@clarkzinzow
Copy link
Contributor

A few options come to mind:

  1. General solution: Introduce stage commutativity, where we let the randomize_block_order() stage commute with all one-to-one stages but not all-to-all stages, and stage priority, where the randomize_block_order() will prefer to be towards the end of the chain.
  2. One-off: Instead of randomize_block_order() creating a new stage, have it mutate the last added stage with a _randomize_blocks=True attribute, propagate this attribute properly when fusing one-to-one stages, and have stages randomize the block order on their returned blocks if this _randomized_blocks attribute is set.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Jun 24, 2022

It makes sense. I just want to make sure it also works for read().randomize().map_batches().window(), where the randomize has to occur anywhere before the window.

Hmm shouldn't that happen automatically? The source blocks for the windowed pipeline will be either materialized or will be only a read stage, so read().randomize().map_batches().window() would turn into read().map_batches().randomize().window().

ray/python/ray/data/dataset.py

Lines 3146 to 3153 in 95fe327

if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _, _ = self._plan._get_source_blocks_and_stages()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
outer_stats = self._plan.stats()
read_stage = None

@jianoaix
Copy link
Contributor

Right, for now the window()/repeat() will materialize the dataset, so any op (except read) will not move across the boarder of dataset transform and pipeline transform.
In the future as we combine plan, we will still cannot move AllToAll operators across the boarder: it will apply for ds.sort().window() as well as ds.randomize_block_order().window().
So in either case this should work (no violation of correctness).

@ericl
Copy link
Contributor Author

ericl commented Jun 24, 2022

It makes sense, but more that window does not materialize the dataset: that would defeat the purpose. Window can operate over lazy block lists.

@ericl ericl self-assigned this Jun 24, 2022
ericl added a commit that referenced this issue Jun 30, 2022
Why are these changes needed?
Per the discussion in #26057, fix the stage fusion issue by re-ordering the randomize stage past any 1-1 stages.

Closes #26057
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants