From 7937132b8bb67dc0ea0b9f3a9ac4d5191ff64a9c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 00:19:42 -0700 Subject: [PATCH 01/17] wip --- python/ray/data/dataset.py | 60 +++++++++++++++++++---------- python/ray/data/dataset_pipeline.py | 35 +++++++++++++++++ python/ray/data/read_api.py | 10 ++--- 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 264db2661d8f..85935b85b44f 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -63,7 +63,7 @@ class Dataset(Generic[T]): and simple repartition, but currently not aggregations and joins. """ - def __init__(self, blocks: BlockList[T]): + def __init__(self, blocks: BlockList[T], epoch: int): """Construct a Dataset (internal API). The constructor is not part of the Dataset API. Use the ``ray.data.*`` @@ -71,6 +71,7 @@ def __init__(self, blocks: BlockList[T]): """ self._blocks: BlockList[T] = blocks self._uuid = uuid4().hex + self._epoch = epoch assert isinstance(self._blocks, BlockList), self._blocks def map(self, @@ -125,7 +126,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def map_batches(self, fn: Union[CallableClass, Callable[[BatchType], BatchType]], @@ -219,7 +222,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def flat_map(self, fn: Union[CallableClass, Callable[[T], Iterable[U]]], @@ -257,7 +262,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def filter(self, fn: Union[CallableClass, Callable[[T], bool]], @@ -295,7 +302,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def repartition(self, num_blocks: int) -> "Dataset[T]": """Repartition the dataset into exactly this number of blocks. @@ -316,7 +325,7 @@ def repartition(self, num_blocks: int) -> "Dataset[T]": """ new_blocks = simple_shuffle(self._blocks, num_blocks) - return Dataset(new_blocks) + return Dataset(new_blocks, self._epoch) def random_shuffle( self, @@ -360,7 +369,7 @@ def random_shuffle( random_shuffle=True, random_seed=seed, _spread_resource_prefix=_spread_resource_prefix) - return Dataset(new_blocks) + return Dataset(new_blocks, self._epoch) def _move_blocks(self): blocks = self._blocks.copy() @@ -557,7 +566,8 @@ def equalize(splits: List[Dataset[T]], return equalize([ Dataset( BlockList( - list(blocks), [metadata_mapping[b] for b in blocks])) + list(blocks), [metadata_mapping[b] + for b in blocks]), self._epoch) for blocks in np.array_split(block_refs, n) if not equal or len(blocks) > 0 ], n) @@ -657,7 +667,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]: BlockList( allocation_per_actor[actor], [metadata_mapping[b] - for b in allocation_per_actor[actor]])) + for b in allocation_per_actor[actor]]), self._epoch) for actor in locality_hints ], n) @@ -733,7 +743,7 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": metadata.extend(bl._metadata) blocks.extend(bl._blocks) - return Dataset(LazyBlockList(calls, metadata, blocks)) + return Dataset(LazyBlockList(calls, metadata, blocks), self._epoch) def sort(self, key: Union[None, str, List[str], Callable[[T], Any]] = None, @@ -772,7 +782,7 @@ def sort(self, # Handle empty dataset. if self.num_blocks() == 0: return self - return Dataset(sort_impl(self._blocks, key, descending)) + return Dataset(sort_impl(self._blocks, key, descending), self._epoch) def zip(self, other: "Dataset[U]") -> "Dataset[(T, U)]": """Zip this dataset with the elements of another. @@ -823,7 +833,7 @@ def do_zip(block1: Block, block2: Block) -> (Block, BlockMetadata): # TODO(ekl) it might be nice to have a progress bar here. metadata = ray.get(metadata) - return Dataset(BlockList(blocks, metadata)) + return Dataset(BlockList(blocks, metadata), self._epoch) def limit(self, limit: int) -> "Dataset[T]": """Limit the dataset to the first number of records specified. @@ -1621,6 +1631,7 @@ def __init__(self, ds: "Dataset[T]"): def __next__(self) -> "Dataset[T]": if times and self._i >= times: raise StopIteration + self._ds._set_epoch(self._i) self._i += 1 return lambda: self._ds @@ -1687,8 +1698,9 @@ def window(self, *, blocks_per_window: int = 10) -> "DatasetPipeline[T]": from ray.data.dataset_pipeline import DatasetPipeline class Iterator: - def __init__(self, splits): + def __init__(self, splits, epoch): self._splits = splits.copy() + self._epoch = epoch def __next__(self) -> "Dataset[T]": if not self._splits: @@ -1697,18 +1709,19 @@ def __next__(self) -> "Dataset[T]": blocks = self._splits.pop(0) def gen(): - return Dataset(blocks) + return Dataset(blocks, self._epoch) return gen class Iterable: - def __init__(self, blocks): + def __init__(self, blocks, epoch): self._splits = blocks.split(split_size=blocks_per_window) + self._epoch = epoch def __iter__(self): - return Iterator(self._splits) + return Iterator(self._splits, self._epoch) - it = Iterable(self._blocks) + it = Iterable(self._blocks, self._epoch) return DatasetPipeline(it, length=len(it._splits)) @DeveloperAPI @@ -1759,16 +1772,17 @@ def _split(self, index: int, right_metadata.append(ray.get(m1)) count += num_rows - left = Dataset(BlockList(left_blocks, left_metadata)) + left = Dataset(BlockList(left_blocks, left_metadata), self._epoch) if return_right_half: - right = Dataset(BlockList(right_blocks, right_metadata)) + right = Dataset( + BlockList(right_blocks, right_metadata), self._epoch) else: right = None return left, right def _divide(self, block_idx: int) -> ("Dataset[T]", "Dataset[T]"): left, right = self._blocks.divide(block_idx) - return Dataset(left), Dataset(right) + return Dataset(left, self._epoch), Dataset(right, self._epoch) def __repr__(self) -> str: schema = self.schema() @@ -1808,6 +1822,12 @@ def _get_uuid(self) -> str: def _set_uuid(self, uuid: str) -> None: self._uuid = uuid + def _get_epoch(self) -> int: + return self._epoch + + def _set_epoch(self, epoch: int) -> None: + self._epoch = epoch + def _get_num_rows(block: Block) -> int: block = BlockAccessor.for_block(block) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 962961105f89..50e07fe3d837 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -1,4 +1,5 @@ import functools +import more_itertools import time from typing import Any, Callable, List, Iterator, Iterable, Generic, Union, \ Optional, TYPE_CHECKING @@ -338,6 +339,7 @@ def __next__(self) -> Dataset[T]: # Going through a repeat of the pipeline. if self._i < self._max_i: res = self._results[self._i % len(self._results)] + res._set_epoch(1 + self._i // len(self._results)) self._i += 1 return lambda: res else: @@ -420,6 +422,39 @@ def show_windows(self, limit_per_dataset: int = 10) -> None: print("=== Window {} ===".format(i)) ds.show(limit_per_dataset) + def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: + class SingleEpochIterator: + def __init__(self, peekable_iter: Iterator[Dataset[T]], + epoch: int): + self._iter = peekable_iter + self._epoch = epoch + + def __next__(self) -> Dataset[T]: + if self._iter.peek()._get_epoch() > self._epoch: + raise StopIteration + ds = next(self._iter) + return lambda: ds + + def __iter__(self): + return self + + class EpochDelimitedIterator: + def __init__(self, pipe): + self._iter = more_itertools.peekable(pipe.iter_datasets()) + self._epoch = 0 + + def __next__(self) -> "DatasetPipeline[T]": + self._iter.peek() # Raises StopIteration on end of data. + epoch_pipe = DatasetPipeline.from_iterable( + SingleEpochIterator(self._iter, self._epoch)) + self._epoch += 1 + return epoch_pipe + + def __iter__(self): + return self + + return EpochDelimitedIterator(self) + @DeveloperAPI def iter_datasets(self) -> Iterator[Dataset[T]]: """Iterate over the output datasets of this pipeline. diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 8d1b66d04c04..0cdfab22a993 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -61,7 +61,7 @@ def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]: BlockAccessor.for_block(block).get_metadata(input_files=None)) i += block_size - return Dataset(BlockList(blocks, metadata)) + return Dataset(BlockList(blocks, metadata), 0) @PublicAPI(stability="beta") @@ -202,7 +202,7 @@ def remote_read(task: ReadTask) -> Block: input_files=metadata[0].input_files, )) - return Dataset(block_list) + return Dataset(block_list, 0) @PublicAPI(stability="beta") @@ -541,7 +541,7 @@ def from_pandas_refs( res = [df_to_block.remote(df) for df in dfs] blocks, metadata = zip(*res) - return Dataset(BlockList(blocks, ray.get(list(metadata)))) + return Dataset(BlockList(blocks, ray.get(list(metadata))), 0) def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: @@ -557,7 +557,7 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: res = [ndarray_to_block.remote(ndarray) for ndarray in ndarrays] blocks, metadata = zip(*res) - return Dataset(BlockList(blocks, ray.get(list(metadata)))) + return Dataset(BlockList(blocks, ray.get(list(metadata))), 0) @PublicAPI(stability="beta") @@ -589,7 +589,7 @@ def from_arrow_refs(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]] """ get_metadata = cached_remote_fn(_get_metadata) metadata = [get_metadata.remote(t) for t in tables] - return Dataset(BlockList(tables, ray.get(metadata))) + return Dataset(BlockList(tables, ray.get(metadata)), 0) @PublicAPI(stability="beta") From 48204b4352163d5a9aad26b3dce333cbbf1f0286 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 00:30:49 -0700 Subject: [PATCH 02/17] fix it --- python/ray/data/dataset.py | 3 ++- python/ray/data/dataset_pipeline.py | 13 ++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 85935b85b44f..f669062fb75f 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -743,7 +743,8 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": metadata.extend(bl._metadata) blocks.extend(bl._blocks) - return Dataset(LazyBlockList(calls, metadata, blocks), self._epoch) + max_epoch = max(*[ds._get_epoch() for ds in datasets]) + return Dataset(LazyBlockList(calls, metadata, blocks), max_epoch) def sort(self, key: Union[None, str, List[str], Callable[[T], Any]] = None, diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 50e07fe3d837..d621062e55f4 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -424,15 +424,16 @@ def show_windows(self, limit_per_dataset: int = 10) -> None: def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: class SingleEpochIterator: - def __init__(self, peekable_iter: Iterator[Dataset[T]], - epoch: int): + def __init__(self, peekable_iter: Iterator[Dataset[T]]): self._iter = peekable_iter - self._epoch = epoch + self._epoch = None def __next__(self) -> Dataset[T]: - if self._iter.peek()._get_epoch() > self._epoch: + if (self._epoch is not None + and self._iter.peek()._get_epoch() != self._epoch): raise StopIteration ds = next(self._iter) + self._epoch = ds._get_epoch() return lambda: ds def __iter__(self): @@ -441,13 +442,11 @@ def __iter__(self): class EpochDelimitedIterator: def __init__(self, pipe): self._iter = more_itertools.peekable(pipe.iter_datasets()) - self._epoch = 0 def __next__(self) -> "DatasetPipeline[T]": self._iter.peek() # Raises StopIteration on end of data. epoch_pipe = DatasetPipeline.from_iterable( - SingleEpochIterator(self._iter, self._epoch)) - self._epoch += 1 + SingleEpochIterator(self._iter)) return epoch_pipe def __iter__(self): From ce3a69a292b5a68f1b23d6fa52756206bea79e13 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 10:30:58 -0700 Subject: [PATCH 03/17] doc --- python/ray/data/dataset.py | 3 +++ python/ray/data/dataset_pipeline.py | 23 +++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index f669062fb75f..a8a5fb8755cb 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1602,6 +1602,9 @@ def repeat(self, times: int = None) -> "DatasetPipeline[T]": Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset. + Note that every repeat of the dataset is considered an "epoch" for + the purposes of ``DatasetPipeline.iter_epochs()``. + Examples: >>> # Infinite pipeline of numbers [0, 5) >>> ray.data.range(5).repeat().take() diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index d621062e55f4..11a2eaa684a1 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -304,6 +304,10 @@ def repeat(self, times: int = None) -> "DatasetPipeline[T]": Transformations done on the repeated pipeline are evaluated on each loop of the pipeline over the base pipeline. + Note that every repeat of the pipeline is considered an "epoch" for + the purposes of ``iter_epochs()``. If there are multiple repeat calls, + the latest repeat takes precedence for the purpose of defining epochs. + Args: times: The number of times to loop over this pipeline, or None to repeat indefinitely. @@ -423,6 +427,25 @@ def show_windows(self, limit_per_dataset: int = 10) -> None: ds.show(limit_per_dataset) def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: + """Split this pipeline up by epoch. + + This allows reading of data per-epoch for repeated Datasets, which is + useful for ML training. For example, ``ray.data.range(10).repeat(50)`` + generates a pipeline with 500 rows total split across 50 epochs. This + method allows iterating over the data individually per epoch + (repetition) of the original data. + + Examples: + >>> epochs = ray.data.range(10).repeat(50).iter_epochs() + >>> for i, epoch in enumerate(epochs): + ... print("Epoch", i) + ... for row in epoch.iter_rows(): + ... print(row) + + Returns: + Iterator over epochs objects, where each epoch is a DatasetPipeline + containing data from that epoch only. + """ class SingleEpochIterator: def __init__(self, peekable_iter: Iterator[Dataset[T]]): self._iter = peekable_iter From f3089804e0f9dae30b17b31e694ef7ebf961a29c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 10:31:07 -0700 Subject: [PATCH 04/17] update --- python/ray/data/dataset_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 11a2eaa684a1..5e54418a2073 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -446,6 +446,7 @@ def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: Iterator over epochs objects, where each epoch is a DatasetPipeline containing data from that epoch only. """ + class SingleEpochIterator: def __init__(self, peekable_iter: Iterator[Dataset[T]]): self._iter = peekable_iter From a7cea60ea560fa19e3bef41d476dbbdce5a38e6e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 11:34:23 -0700 Subject: [PATCH 05/17] update --- python/ray/data/tests/test_dataset_pipeline.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index cffb378f3686..ef9100932914 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -35,6 +35,19 @@ def block_on_ones(x: int) -> int: assert pipe.take(1) == [0] +def test_epoch(ray_start_regular_shared): + # Test dataset repeat. + pipe = ray.data.range(5).map(lambda x: x * 2).repeat(3).map( + lambda x: x * 2) + results = [p.take() for p in pipe.iter_epochs()] + assert results == [[0, 4, 8, 12, 16], [0, 4, 8, 12, 16], [0, 4, 8, 12, 16]] + + # Test dataset pipeline repeat. + pipe = ray.data.range(3).window(blocks_per_window=3).repeat(3) + results = [p.take() for p in pipe.iter_epochs()] + assert results == [[0, 1, 2], [0, 1, 2], [0, 1, 2]] + + def test_cannot_read_twice(ray_start_regular_shared): ds = ray.data.range(10) pipe = ds.window(blocks_per_window=1) From 16ebbccbef9d06c3a6508806e6ae8d0351b3b2f4 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 11:34:33 -0700 Subject: [PATCH 06/17] update --- python/ray/data/tests/test_dataset_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index ef9100932914..d1e62f098220 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -43,7 +43,7 @@ def test_epoch(ray_start_regular_shared): assert results == [[0, 4, 8, 12, 16], [0, 4, 8, 12, 16], [0, 4, 8, 12, 16]] # Test dataset pipeline repeat. - pipe = ray.data.range(3).window(blocks_per_window=3).repeat(3) + pipe = ray.data.range(3).window(blocks_per_window=2).repeat(3) results = [p.take() for p in pipe.iter_epochs()] assert results == [[0, 1, 2], [0, 1, 2], [0, 1, 2]] From 3a3b5277c0c6b04690f9c7c2df4794e26e2411d8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 11:37:29 -0700 Subject: [PATCH 07/17] add example --- doc/source/data/dataset-pipeline.rst | 32 ++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/doc/source/data/dataset-pipeline.rst b/doc/source/data/dataset-pipeline.rst index d954df8051eb..ee3f0b1077d0 100644 --- a/doc/source/data/dataset-pipeline.rst +++ b/doc/source/data/dataset-pipeline.rst @@ -112,6 +112,38 @@ You can also apply arbitrary transformations to each window using ``DatasetPipel # 3 # 1 +Handling Epochs +~~~~~~~~~~~~~~~ + +It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset. DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments. Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example: + +.. code-block:: python + + pipe = ray.data.range(5).repeat(3) + for i, epoch in enumerate(pipe.iter_epochs()): + print("Epoch {}", i) + for row in epoch.iter_items(): + print(row) + # -> + # Epoch 0 + # 0 + # 1 + # 2 + # 3 + # 4 + # Epoch 1 + # 0 + # 1 + # 2 + # 3 + # 4 + # Epoch 2 + # 0 + # 1 + # 2 + # 3 + # 4 + Example: Pipelined Batch Inference ---------------------------------- From ff6d40cefca7ce8b2adfcee5fd4e057a45f14954 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 12:48:09 -0700 Subject: [PATCH 08/17] fix --- python/ray/data/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 91aa91e5c2eb..457d65c42a00 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -197,7 +197,7 @@ def _test_equal_split_balanced(block_sizes, num_splits): metadata.append(BlockAccessor.for_block(block).get_metadata(None)) total_rows += block_size block_list = BlockList(blocks, metadata) - ds = Dataset(block_list) + ds = Dataset(block_list, 0) splits = ds.split(num_splits, equal=True) split_counts = [split.count() for split in splits] From 7a76e0be9785245eba052bfed3811b812a9aca17 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 14:30:08 -0700 Subject: [PATCH 09/17] fix --- python/ray/data/dataset_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 5e54418a2073..6dd3335c9740 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -443,7 +443,7 @@ def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: ... print(row) Returns: - Iterator over epochs objects, where each epoch is a DatasetPipeline + Iterator over epoch objects, where each epoch is a DatasetPipeline containing data from that epoch only. """ From dd45c139f77c966af28079f8f0f8c493a9c34450 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 16:08:27 -0700 Subject: [PATCH 10/17] update --- doc/source/data/dataset-pipeline.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/source/data/dataset-pipeline.rst b/doc/source/data/dataset-pipeline.rst index ee3f0b1077d0..1079114f6441 100644 --- a/doc/source/data/dataset-pipeline.rst +++ b/doc/source/data/dataset-pipeline.rst @@ -144,6 +144,8 @@ It's common in ML training to want to divide data ingest into epochs, or repetit # 3 # 4 +Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.rewindow()`` is used after the repeat call or there are multiple ``.repeat()`` calls. + Example: Pipelined Batch Inference ---------------------------------- From 948b677e2527d0a8359299d5ad6e337fb6e8a318 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 16:10:29 -0700 Subject: [PATCH 11/17] update --- doc/source/data/dataset-pipeline.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/data/dataset-pipeline.rst b/doc/source/data/dataset-pipeline.rst index 1079114f6441..6a3983fdcb08 100644 --- a/doc/source/data/dataset-pipeline.rst +++ b/doc/source/data/dataset-pipeline.rst @@ -119,7 +119,7 @@ It's common in ML training to want to divide data ingest into epochs, or repetit .. code-block:: python - pipe = ray.data.range(5).repeat(3) + pipe = ray.data.range(5).window(blocks_per_window=2).repeat(3) for i, epoch in enumerate(pipe.iter_epochs()): print("Epoch {}", i) for row in epoch.iter_items(): @@ -144,7 +144,7 @@ It's common in ML training to want to divide data ingest into epochs, or repetit # 3 # 4 -Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.rewindow()`` is used after the repeat call or there are multiple ``.repeat()`` calls. +Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls. Example: Pipelined Batch Inference ---------------------------------- From 13deebab4a2ded538da8f6521edf98cbfaf9babe Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 16:22:49 -0700 Subject: [PATCH 12/17] update docs --- doc/source/data/dataset-pipeline.rst | 80 ++++++++++++++++------------ python/ray/data/dataset_pipeline.py | 5 ++ 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/doc/source/data/dataset-pipeline.rst b/doc/source/data/dataset-pipeline.rst index 6a3983fdcb08..56d406f60d95 100644 --- a/doc/source/data/dataset-pipeline.rst +++ b/doc/source/data/dataset-pipeline.rst @@ -69,6 +69,40 @@ You can also create a DatasetPipeline from a custom iterator over dataset creato splits = ray.data.range(1000, parallelism=200).split(20) pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits]) +Handling Epochs +~~~~~~~~~~~~~~~ + +It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset. DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments. Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example: + +.. code-block:: python + + pipe = ray.data.range(5).repeat(3) + for i, epoch in enumerate(pipe.iter_epochs()): + print("Epoch {}", i) + for row in epoch.iter_items(): + print(row) + # -> + # Epoch 0 + # 0 + # 1 + # 2 + # 3 + # 4 + # Epoch 1 + # 0 + # 1 + # 2 + # 3 + # 4 + # Epoch 2 + # 0 + # 1 + # 2 + # 3 + # 4 + +Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls. + Per-Window Transformations ~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -79,12 +113,14 @@ While most Dataset operations are per-row (e.g., map, filter), some operations a # Example of randomly shuffling each window of a pipeline. ray.data.range(5).repeat(2).random_shuffle_each_window().show_windows() # -> + # ----- Epoch 0 ------ # === Window 0 === # 4 # 3 # 1 # 0 # 2 + # ----- Epoch 1 ------ # === Window 1 === # 2 # 1 @@ -99,12 +135,14 @@ You can also apply arbitrary transformations to each window using ``DatasetPipel # Equivalent transformation using .foreach_window() ray.data.range(5).repeat(2).foreach_window(lambda w: w.random_shuffle()).show_windows() # -> + # ----- Epoch 0 ------ # === Window 0 === # 1 # 0 # 4 # 2 # 3 + # ----- Epoch 1 ------ # === Window 1 === # 4 # 2 @@ -112,40 +150,6 @@ You can also apply arbitrary transformations to each window using ``DatasetPipel # 3 # 1 -Handling Epochs -~~~~~~~~~~~~~~~ - -It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset. DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments. Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example: - -.. code-block:: python - - pipe = ray.data.range(5).window(blocks_per_window=2).repeat(3) - for i, epoch in enumerate(pipe.iter_epochs()): - print("Epoch {}", i) - for row in epoch.iter_items(): - print(row) - # -> - # Epoch 0 - # 0 - # 1 - # 2 - # 3 - # 4 - # Epoch 1 - # 0 - # 1 - # 2 - # 3 - # 4 - # Epoch 2 - # 0 - # 1 - # 2 - # 3 - # 4 - -Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls. - Example: Pipelined Batch Inference ---------------------------------- @@ -290,6 +294,7 @@ Sometimes, you may want to change the structure of an existing pipeline. For exa .repeat(2) \ .show_windows() # -> + # ------ Epoch 0 ------ # === Window 0 === # 0 # 1 @@ -298,6 +303,7 @@ Sometimes, you may want to change the structure of an existing pipeline. For exa # 3 # === Window 2 === # 4 + # ------ Epoch 1 ------ # === Window 3 === # 0 # 1 @@ -307,18 +313,22 @@ Sometimes, you may want to change the structure of an existing pipeline. For exa # === Window 5 === # 4 - # Repeat followed by window. + # Repeat followed by window. Note that epoch 1 contains some leftover + # data from the tail end of epoch 0, since re-windowing can merge windows + # across epochs. ray.data.range(5) \ .repeat(2) \ .rewindow(blocks_per_window=2) \ .show_windows() # -> + # ------ Epoch 0 ------ # === Window 0 === # 0 # 1 # === Window 1 === # 2 # 3 + # ------ Epoch 1 ------ # === Window 2 === # 4 # 0 diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 6dd3335c9740..ffca20532306 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -331,6 +331,7 @@ def __next__(self) -> Dataset[T]: if self._original_iter: try: res = next(self._original_iter) + res._set_epoch(0) self._results.append(res) return lambda: res except StopIteration: @@ -422,7 +423,11 @@ def show_windows(self, limit_per_dataset: int = 10) -> None: Args: limit_per_dataset: Rows to print per window/dataset. """ + epoch = None for i, ds in enumerate(self.iter_datasets()): + if ds._get_epoch() != epoch: + epoch = ds._get_epoch() + print("------ Epoch {} ------".format(epoch)) print("=== Window {} ===".format(i)) ds.show(limit_per_dataset) From e58923e50a47a56961233cce3823c90471eea9a7 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 16:27:01 -0700 Subject: [PATCH 13/17] test --- python/ray/data/tests/test_dataset_pipeline.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index d1e62f098220..ceb3b3b59778 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -47,6 +47,12 @@ def test_epoch(ray_start_regular_shared): results = [p.take() for p in pipe.iter_epochs()] assert results == [[0, 1, 2], [0, 1, 2], [0, 1, 2]] + # Test nested repeat. + pipe = ray.data.range(5).repeat(2).repeat(2) + results = [p.take() for p in pipe.iter_epochs()] + assert results == [[0, 1, 2, 3, 4, 0, 1, 2, 3, 4], + [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] + def test_cannot_read_twice(ray_start_regular_shared): ds = ray.data.range(10) From fb4ed9ec6f628b66fc993f239cec42b55ed091ea Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 17:56:42 -0700 Subject: [PATCH 14/17] update --- python/ray/data/dataset.py | 14 +++++++++++++- python/ray/data/dataset_pipeline.py | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index d609a50b4ee8..6e3db0a6ba7b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -44,6 +44,9 @@ logger = logging.getLogger(__name__) +# Whether we have warned of Datasets containing multiple epochs of data. +_epoch_warned = False + @PublicAPI(stability="beta") class Dataset(Generic[T]): @@ -743,7 +746,16 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": metadata.extend(bl._metadata) blocks.extend(bl._blocks) - max_epoch = max(*[ds._get_epoch() for ds in datasets]) + epochs = [ds._get_epoch() for ds in datasets] + max_epoch = max(*epochs) + if len(set(epochs)) > 1: + global _epoch_warned + if not _epoch_warned: + logger.warning( + "Dataset contains data from multiple epochs: {}, " + "using the higher epoch number {}. This warning will not " + "be shown again.".format(set(epochs), max_epoch)) + _epoch_warned = True return Dataset(LazyBlockList(calls, metadata, blocks), max_epoch) def sort(self, diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index ffca20532306..d58c0bd91cd8 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -273,6 +273,8 @@ def __next__(self) -> Dataset[T]: # Slice off the left-most chunk and return it. res, self._buffer = self._buffer._divide(blocks_per_window) assert res.num_blocks() <= blocks_per_window, res + if self._buffer.num_blocks() == 0: + self._buffer = None return lambda: res except StopIteration: # Return the left-over data as a single window. From 05c9441811d84208c6aa8ddc2e645e7118c41a32 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 8 Oct 2021 17:57:54 -0700 Subject: [PATCH 15/17] update --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6e3db0a6ba7b..230b52117d97 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -753,7 +753,8 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": if not _epoch_warned: logger.warning( "Dataset contains data from multiple epochs: {}, " - "using the higher epoch number {}. This warning will not " + "likely due to a `rewindow()` call. The higher epoch " + "number {} will be used. This warning will not " "be shown again.".format(set(epochs), max_epoch)) _epoch_warned = True return Dataset(LazyBlockList(calls, metadata, blocks), max_epoch) From 635ea093f15e74d18d83c1d42cdf7c6a23e8a18c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 11 Oct 2021 14:09:54 -0700 Subject: [PATCH 16/17] remove dependency on more_itertools --- python/ray/data/dataset_pipeline.py | 30 +++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index d58c0bd91cd8..babd253aea98 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -1,5 +1,4 @@ import functools -import more_itertools import time from typing import Any, Callable, List, Iterator, Iterable, Generic, Union, \ Optional, TYPE_CHECKING @@ -454,6 +453,33 @@ def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: containing data from that epoch only. """ + class Peekable: + def __init__(self, base_iter: Iterator[T]): + self._iter = base_iter + self._buffer = None + + def _fill_buffer_if_possible(self): + if self._buffer is None: + try: + self._buffer = next(self._iter) + assert self._buffer is not None + except StopIteration: + pass + + def peek(self) -> T: + self._fill_buffer_if_possible() + if self._buffer is None: + raise StopIteration + return self._buffer + + def __next__(self) -> T: + self._fill_buffer_if_possible() + if self._buffer is None: + raise StopIteration + item = self._buffer + self._buffer = None + return item + class SingleEpochIterator: def __init__(self, peekable_iter: Iterator[Dataset[T]]): self._iter = peekable_iter @@ -472,7 +498,7 @@ def __iter__(self): class EpochDelimitedIterator: def __init__(self, pipe): - self._iter = more_itertools.peekable(pipe.iter_datasets()) + self._iter = Peekable(pipe.iter_datasets()) def __next__(self) -> "DatasetPipeline[T]": self._iter.peek() # Raises StopIteration on end of data. From 55700d8819dd8f4bca5a56d4e78258dd44e64584 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 11 Oct 2021 18:49:15 -0700 Subject: [PATCH 17/17] fix --- doc/source/data/dataset-pipeline.rst | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/doc/source/data/dataset-pipeline.rst b/doc/source/data/dataset-pipeline.rst index 56d406f60d95..87407e670a84 100644 --- a/doc/source/data/dataset-pipeline.rst +++ b/doc/source/data/dataset-pipeline.rst @@ -76,30 +76,30 @@ It's common in ML training to want to divide data ingest into epochs, or repetit .. code-block:: python - pipe = ray.data.range(5).repeat(3) + pipe = ray.data.range(5).repeat(3).random_shuffle_each_window() for i, epoch in enumerate(pipe.iter_epochs()): print("Epoch {}", i) - for row in epoch.iter_items(): + for row in epoch.iter_rows(): print(row) # -> # Epoch 0 - # 0 - # 1 # 2 + # 1 # 3 # 4 - # Epoch 1 # 0 - # 1 - # 2 + # Epoch 1 # 3 # 4 - # Epoch 2 # 0 - # 1 # 2 + # 1 + # Epoch 2 # 3 + # 2 # 4 + # 1 + # 0 Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls.