-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make execution plan/blocklist aware of the memory ownership and who runs the plan #26650
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a lot more natural, though I'm wondering if we can improve the naming / concept a little bit more. The proposal seems OK to me though. What do others think?
Signed-off-by: Ubuntu <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
My experiments (the script ) showed that dataset.split_at_indices() with SPREAD tasks have more predictable performance Concretely: on 10 m5.4xlarge nodes with 5000 iops disk calling ds.split_at_indices(81) on 200GB dataset with 400 blocks: the split_at_indices without this PR takes 7-19 seconds, split_at_indices with SPREAD takes 7-12 seconds. Signed-off-by: Ubuntu <[email protected]>
…on (ray-project#26634) Signed-off-by: Ubuntu <[email protected]>
…ject#26458) Signed-off-by: Ubuntu <[email protected]>
Co-authored-by: Kourosh Hakhamaneshi <[email protected]> Signed-off-by: Ubuntu <[email protected]>
…ject#26094) Co-authored-by: Eric Liang <[email protected]> Co-authored-by: matthewdeng <[email protected]> Co-authored-by: Matthew Deng <[email protected]> Co-authored-by: Richard Liaw <[email protected]> Signed-off-by: Ubuntu <[email protected]>
…y-project#26643) Signed-off-by: Ubuntu <[email protected]>
…project#26637) Signed-off-by: Ubuntu <[email protected]>
…roject#26640) Signed-off-by: Ubuntu <[email protected]>
…ints (ray-project#26641) This PR replaces dataset.split(.., equal=True) implementation by dataset.split_at_indices() . My experiments (the script ) showed that dataset.split_at_indices() have more predictable performance than the dataset.split(…) Concretely: on 10 m5.4xlarge nodes with 5000 iops disk calling ds.split(81) on 200GB dataset with 400 blocks: the split takes 20-40 seconds, split_at_indices takes ~12 seconds. calling ds.split(163) on 200GB dataset with 400 blocks, the split takes 40-100 seconds, split_at_indices takes ~24 seconds. I don’t have much insight of dataset.split implementation, but with dataset.split_at_indices() we are just doing SPREAD to num_split_at_indices tasks, which yield much stable performance. Note: clean up the usage of experimental locality_hints in ray-project#26647 Signed-off-by: Ubuntu <[email protected]>
Why are these changes needed? Since locality_hints is an experimental feature, we stop promoting it in doc and don't enable it in AIR. See ray-project#26641 for more context Signed-off-by: Ubuntu <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
…project#26521) Signed-off-by: Ubuntu <[email protected]>
- Stop using dot command to run ci.sh script: it doesn't fail the build if the command fails for windows and is generally dangerous since it will make unexpected changes to the current shell. - Fix uncovered windows build issues. Signed-off-by: Ubuntu <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
@ericl @clarkzinzow For "run_by_consumer" that's currently attached to each stage, I'm thinking it makes more sense to be attached to ExecutionPlan instead, since that's the actual unit run by the consumer. Each individual stage can derive that from the ExecutionPlan it belongs to. I'll make it so if this sounds good to you. |
…is arg in ExecutionPlan instead of Stage
assert len(blocks) == len(metadata), (blocks, metadata) | ||
self._blocks: List[ObjectRef[Block]] = blocks | ||
self._num_blocks = len(self._blocks) | ||
self._metadata: List[BlockMetadata] = metadata | ||
# Whether the block list is owned by consuming APIs, and if so it can be | ||
# eagerly deleted after read by the consumer. | ||
self._owned_by_consumer = owned_by_consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we clearly define what is consumer in one place, e.g. dataset.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
blocks: List[ObjectRef[Block]], | ||
metadata: List[BlockMetadata], | ||
*, | ||
owned_by_consumer: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it the idea that if the blocks are owned_by_consumer
, then we can eagerly clean them up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, once it's consumed, the blocklist can be cleared.
@@ -1058,16 +1058,20 @@ def equalize(splits: List[Dataset[T]], num_splits: int) -> List[Dataset[T]]: | |||
|
|||
block_refs, metadata = zip(*blocks.get_blocks_with_metadata()) | |||
metadata_mapping = {b: m for b, m in zip(block_refs, metadata)} | |||
owned_by_consumer = blocks._owned_by_consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I don't see iter_batches()
set run_by_consumer
, or any other consumer APIs (such as show()
, take()
, write_xxx
). Am I missing anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot do this for Dataset yet, for now it's unchanged.
I am wondering in which cases we do need eager execution for now? Can we remove support for eager execution post 2.0, and support lazy execution only? That would make code structure much simpler I guess. In that case, only the input and output blocks of execution plan needs be preserved, and all intermediate blocks can be cleaned up. We could introduce another dataset |
Currently in call cases (except read) it's eager. I think lazy only (not just lazy by default) may make sense, just like DatasetPipeline. In that case, we can remove the "run_by_consumer", because it's always run by consumer. |
…uns the plan (ray-project#26650) Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing. This is an alternative with better abstraction to ray-project#26196. Note: this doesn't work for Dataset.split() yet, will do in a followup PR. Signed-off-by: Rohan138 <[email protected]>
…uns the plan (ray-project#26650) Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing. This is an alternative with better abstraction to ray-project#26196. Note: this doesn't work for Dataset.split() yet, will do in a followup PR. Signed-off-by: Stefan van der Kleij <[email protected]>
Why are these changes needed?
Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing.
This is an alternative with better abstraction to #26196.
Note: this doesn't work for Dataset.split() yet, will do in a followup PR.
Related issue number
#25249
Checks
scripts/format.sh
to lint the changes in this PR.