Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lay the groundwork for lazy dataset optimization (no behavior changes) #22233

Merged
merged 20 commits into from
Feb 14, 2022

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Feb 9, 2022

Why are these changes needed?

This PR refactors Dataset execution to enable lazy mode in the future, which can reduce memory usage in large-scale ingest pipelines. There should be no behavior changes in this PR. Many of the optimizations are also punted for future work.

Future work:

  • Run all dataset tests in lazy mode.
  • Make lazy down to the read datasource layer.
  • Think about intermediate copy materialization.
  • Enable for DatasetPipeline by default.

@ericl ericl requested a review from scv119 as a code owner February 9, 2022 04:28
@zhe-thoughts
Copy link
Collaborator

Two high level comments:

  1. Should lazy be something for the developer to control with an API? Should we just define certain (maybe most) transformations as lazy?
  2. groupBy is already lazy right? Should we consolidate how lazy operations are implemented?

@ericl
Copy link
Contributor Author

ericl commented Feb 9, 2022

Should lazy be something for the developer to control with an API? Should we just define certain (maybe most) transformations as lazy?

We can debate it, but having lazy as an optimization only may be more usable. cc @clarkzinzow on thoughts.

groupBy is already lazy right? Should we consolidate how lazy operations are implemented?

It's not really related. GroupBy is only specifying half of an operation, the second half is the aggregation.

@clarkzinzow
Copy link
Contributor

A few high-level things:

  1. This execution plan model is still at the operation (stage) level rather than at the block task level; although much, much simpler which is a big pro, I think that it has the following cons:
    a. Our ability to optimize the block lifetimes within an operation (such as the map and reduce phases within the shuffle) is limited. Not sure how important this is without a drill down analysis on where these memory-hogging intermediate results are hanging out.
    b. The path to move semantics for block tasks is less clear to me. Don't we need to be able to "move" at the site of the Ray task submission, not when invoking a higher-level stage op on block lists?
    c. Optimizations such as task fusion, i.e. fusing chains of functions applied to a block into a single Ray task, don't seem possible. Not sure what the high-level operation fusing that's indicated at in the current PR would give us, since they would still result in multiple Ray tasks.
    If we pushed this execution plan down to the compute model (i.e. right before Ray tasks are launched), we could avoid these cons, but this then changes our compute and data model more fundamentally.
  2. How will this work with dataset splits? Right now, it looks like the split triggers execution, but I don't know if that will be intuitive/desirable since the split might happen a while before training starts... If we think this is a problem, this can be solved by parking the (lazy) dataset in an actor upon splitting, caching computation of the dataset up to the split, and having the downstream .execute() calls on each split fetch the appropriate split from this actor. At least, this is what I implemented in my operation-level lazy prototype.
  3. How will this work with branching dataset computations? Right now, it looks like branching would cause repeated execution of the common subexpressions. Do we want to do any form of common subexpression elimination here? If so, how do we handle the case in which these branching computations are then passed into downstream Ray tasks and executed on different nodes? I ended up doing something similar to splitting, where the lazy dataset is parked in an actor and when downstream dataset operations need it, they trigger computation in the actor if it's not yet computed, and return the cached dataset otherwise.

What are your thoughts on these, especially (1)?

@ericl
Copy link
Contributor Author

ericl commented Feb 9, 2022

Our ability to optimize the block lifetimes within an operation (such as the map and reduce phases within the shuffle) is limited. Not sure how important this is without a drill down analysis on where these memory-hogging intermediate results are hanging out.

That's right, it currently is limited, but there's nothing stopping us from optimizing this within the shuffle operation itself. We'd have to go and optimize these separately.

The path to move semantics for block tasks is less clear to me. Don't we need to be able to "move" at the site of the Ray task submission, not when invoking a higher-level stage op on block lists?

For OneToOneOp, we can implement this in the common compute layer. For AllToAllOp, we'd need to implement the moving within the specific op implementations (like do_shuffle()). Each op can assume it can take ownership of its input.

Optimizations such as task fusion, i.e. fusing chains of functions applied to a block into a single Ray task, don't seem possible. Not sure what the high-level operation fusing that's indicated at in the current PR would give us, since they would still result in multiple Ray tasks.

You can do it in ExecutionPlan.execute(), for example, fusing OneToOneOps if they have compatible compute parameters. It's also possible to fuse OneToOneOp and AllToAllOps, if we extend the AllToAllOp API a bit.

How will this work with dataset splits? Right now, it looks like the split triggers execution, but I don't know if that will be intuitive/desirable since the split might happen a while before training starts...

Actually, the split is still lazy if done immediately on the base data. We could also keep it lazy if the plan only contains OneToOneOps as an optimization.

How will this work with branching dataset computations? Right now, it looks like branching would cause repeated execution of the common subexpressions.

You can call ensure_executed() to materialize the intermediate copy, though yeah it would be a little bit of a manual operation.

@clarkzinzow
Copy link
Contributor

That's right, it currently is limited, but there's nothing stopping us from optimizing this within the shuffle operation itself. We'd have to go and optimize these separately.

So we'd do this manually for those ops, got it. 👍

For OneToOneOp, we can implement this in the common compute layer. For AllToAllOp, we'd need to implement the moving within the specific op implementations (like do_shuffle()).

Doing this manually for AllToAllOps seems fine since there aren't that many, sounds good. 👍

Each op can assume it can take ownership of its input.

Yep that makes sense. 👍

You can do it in ExecutionPlan.execute(), for example, fusing OneToOneOps if they have compatible compute parameters. It's also possible to fuse OneToOneOp and AllToAllOps, if we extend the AllToAllOp API a bit.

Ah, so this would be done at the block_list --> block_list level, walking the op graph, eagerly fusing op chains that have compatible compute parameters AND that don't change the number of blocks into a single new op, then apply that fused _block_fn via the chosen compute model... Will this support our most important task fusion use cases?

The common use case that I run through is read --> transform --> shuffle (map --> reduce) --> trainer, where ideally read + transform + shuffle map could be fused, and I'm having a hard time seeing how the shuffle map phase, which is internal to the shuffle op, could be fused with the reading and transforming stages under this op execution model. 🤔 What kind of AllToAllOp API extension would enable that? Would that just be surfacing the map and reduce phases as sub-ops?

Actually, the split is still lazy if done immediately on the base data. We could also keep it lazy if the plan only contains OneToOneOps as an optimization.

I'm assuming that by "base data" you mean right after reading. If we make splitting lazy and splits are passed to multiple trainers, we'll need an actor to coordinate the execution of the dataset up to the split, right?

You can call ensure_executed() to materialize the intermediate copy, though yeah it would be a little bit of a manual operation.

This can be done automatically if the executed dataset is cached and somehow attached to the stage, but the big issue there is that you're then indefinitely holding on to a Dataset reference, which defeats the purpose of lazy execution. I haven't thought of a way around this other than manual reference counting, where the cached datasets are deleted when the reference count goes to 0. I think that we can kick this can down the road and ask users to do this manually for now.

For both branching and splitting, if we want to allow execution to still be lazy, I think that we'll need an actor to coordinate execution up to the branch/split in case the branches/splits are passed to Ray tasks/actors and attempt to trigger execution. We can force materialization for each for now, but I think that might result in poor UX.

@ericl
Copy link
Contributor Author

ericl commented Feb 10, 2022

The common use case that I run through is read --> transform --> shuffle (map --> reduce) --> trainer, where ideally read + transform + shuffle map could be fused, and I'm having a hard time seeing how the shuffle map phase, which is internal to the shuffle op, could be fused with the reading and transforming stages under this op execution model. thinking What kind of AllToAllOp API extension would enable that? Would that just be surfacing the map and reduce phases as sub-ops?

Yep this one is tricky. Basically I think the AllToAll API would look like __call__(blocks, block_mapping_fn = None), where block_mapping_fn can be the fused collection of all OneToOne transform ops prior to the shuffle. The shuffle implementation can then fuse the block mapping fn with the map stage.

I'm assuming that by "base data" you mean right after reading. If we make splitting lazy and splits are passed to multiple trainers, we'll need an actor to coordinate the execution of the dataset up to the split, right?

You'll need an actor if you want to avoid calling .cache() explicitly prior to the split point (to do the coordination at runtime instead of ahead of time). Though this does materialize it.

Anyway, it's solved with DatasetPipeline which does have a coordinator actor, so maybe this isn't too critical?

@clarkzinzow
Copy link
Contributor

Yep this one is tricky. Basically I think the AllToAll API would look like call(blocks, block_mapping_fn = None), where block_mapping_fn can be the fused collection of all OneToOne transform ops prior to the shuffle. The shuffle implementation can then fuse the block mapping fn with the map stage.

So to summarize, the general paradigm here is to support automatic block lifecycle optimization, move semantics, and task fusion for OneToOneOps, and to require individual AllToAllOps to each do some work to do the same for their internal block processing. And we're ok with this leaky abstraction since (1) there are relatively few AllToAllOps, and (2) it saves us a lot of complexity compared to pushing the execution plan down to the individual block task level (i.e. a graph of block tasks that's built up by each operation). That tradeoff seems good to me for now. 👍

You'll need an actor if you want to avoid calling .cache() explicitly prior to the split point (to do the coordination at runtime instead of ahead of time). Though this does materialize it.

Anyway, it's solved with DatasetPipeline which does have a coordinator actor, so maybe this isn't too critical?

Sure, as long as you think there's a sensible path forward if this does become an issue, we can consider the primary splitting use case, pipelined ML ingest, to be covered by DatasetPipeline's execution coordination.

@zhe-thoughts
Copy link
Collaborator

Great work and I look forward to seeing it merged! Just one remaining comment :)

Should lazy be something for the developer to control with an API? Should we just define certain (maybe most) transformations as lazy?

We can debate it, but having lazy as an optimization only may be more usable. cc @clarkzinzow on thoughts.

Should we just do a regular #api-changes review?

@ericl
Copy link
Contributor Author

ericl commented Feb 10, 2022

@zhe-thoughts that's only for core APIs

@ericl ericl changed the title [WIP] Prototype lazy block evaluation Lay the groundwork for lazy dataset optimization (no behavior changes) Feb 11, 2022
@ericl
Copy link
Contributor Author

ericl commented Feb 11, 2022

@clarkzinzow this is ready for review (almost all tests are passing now)

@clarkzinzow clarkzinzow self-assigned this Feb 11, 2022
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, a few nits, and a few questions around supporting lazy fan-in operations in follow-up work.

python/ray/data/dataset.py Show resolved Hide resolved
python/ray/data/dataset.py Show resolved Hide resolved
python/ray/data/dataset.py Show resolved Hide resolved
python/ray/data/impl/plan.py Show resolved Hide resolved
python/ray/data/impl/plan.py Show resolved Hide resolved
python/ray/data/dataset.py Show resolved Hide resolved
@ericl ericl merged commit 35a1579 into ray-project:master Feb 14, 2022
simonsays1980 pushed a commit to simonsays1980/ray that referenced this pull request Feb 27, 2022
ray-project#22233)

This PR refactors Dataset execution to enable lazy mode in the future, which can reduce memory usage in large-scale ingest pipelines. There should be no behavior changes in this PR. Many of the optimizations are also punted for future work.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants