Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [WIP] Prototype wrapping operation-based lazy compute model. #22268

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import tensorflow as tf
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.grouped_dataset import GroupedDataset
from ray.data.lazy_dataset import LazyDataset

import collections
import itertools
Expand Down Expand Up @@ -2493,6 +2494,19 @@ def __iter__(self):
it = Iterable(self._blocks, self._epoch)
return DatasetPipeline(it, length=len(it._splits))

def lazy(self) -> "LazyDataset":
"""
Convert this Dataset into a LazyDataset, where all subsequent
operations won't be applied until .compute() or a consuming
function (such as .iter_batches(), .to_torch(), etc.) is called.

Returns:
A LazyDataset.
"""
from ray.data.lazy_dataset import LazyDataset

return LazyDataset(lambda: self)

@DeveloperAPI
def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
"""Get a list of references to the underlying blocks of this dataset.
Expand Down
14 changes: 14 additions & 0 deletions python/ray/data/dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

if TYPE_CHECKING:
import pyarrow
from ray.data.lazy_dataset import LazyDatasetPipeline

# Operations that can be naively applied per dataset row in the pipeline.
PER_DATASET_OPS = ["map", "map_batches", "flat_map", "filter"]
Expand Down Expand Up @@ -563,6 +564,19 @@ def __iter__(self):

return EpochDelimitedIterator(self)

def lazy(self) -> "LazyDatasetPipeline":
"""
Convert this DatasetPipeline into a LazyDatasetPipeline, where all subsequent
operations won't be applied until .compute() or a consuming
function (such as .iter_batches(), .to_torch(), etc.) is called.

Returns:
A LazyDatasetPipeline.
"""
from ray.data.lazy_dataset import LazyDatasetPipeline

return LazyDatasetPipeline(lambda: self)

@DeveloperAPI
def iter_datasets(self) -> Iterator[Dataset[T]]:
"""Iterate over the output datasets of this pipeline.
Expand Down
Loading