Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [WIP] Prototype wrapping operation-based lazy compute model. #22268

Conversation

clarkzinzow
Copy link
Contributor

This PR contains a rough prototype of a lazy compute model for Datasets, implemented via wrapping the existing eagerly-computed Datasets abstractions with lazy proxies. After switching to lazy mode (via a ds.lazy() call or a lazy=True arg to any dataset-creating APIs), all future operations are lazy, building up an operation graph that won't be executed until .compute() or a consuming method (e.g. .show(), .iter_batches(), .to_torch(), etc.) is called.

import ray

ds = ray.data.range(100)
ds = ds.lazy()  # Comment me out.
# ds is now a LazyDataset

print("# Map")
# It exposes the same API as Dataset.
ds = ds.map(lambda x: x + 1)
print("# Shuffle")
ds = ds.random_shuffle()
print("# Show")
# A consuming function like .show() triggers execution.
ds.show()

I'm pushing this PR up for reference; we will almost surely be going with the operation-based lazy compute model in #22233 since it provides a clearer path to implementing (1) move semantics for blocks within operations, and (2) optimizations such as task fusion.

The big pros of this PR are:

  1. The lazy compute model is implemented as a wrapper of Dataset, DatasetPipeline, and GroupedDataset; the only changes to those abstractions as the addition of the .lazy() API. The fact that this prototype doesn't touch the core Datasets abstractions makes it very easy to add and later remove.
  2. .split() and .split_at_indices() are still lazy, and support automatic caching of the materialized pre-split dataset (pipeline) and triggering of execution by any of the consumers. This is done by parking the pre-split dataset (pipeline) in an actor, where all downstream operations of the split reference a split fetch to the actor, the first of which will trigger execution of the dataset on the actor, after which the dataset will be cached. The lifetime of this actor is tied to the lifetime of the underlying dataset, so the cached dataset should be released once it's no longer needed by any of the splits.
  3. The materialized caching is also exposed to the user via a ds.cache() method, which can be used before a branching computation, even if the branches are then passed to Ray tasks.
  4. Easy, low-code addition of fully lazy dataset creation to all read/creation APIs.

Hopefully we can pull a subset of these pros that makes sense into the other prototype.

@clarkzinzow
Copy link
Contributor Author

cc @ericl for reference

width=80, initial_indent=indent, subsequent_indent=indent + " "
)
lines.insert(i, wrapper.fill(lazy_arg_doc))
return "\n".join(lines)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't tested this arg docstring injection, but the general approach seems doable.

@clarkzinzow clarkzinzow added the do-not-merge Do not merge this PR! label Feb 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
do-not-merge Do not merge this PR!
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant