diff --git a/doc/source/data/dataset-pipeline.rst b/doc/source/data/dataset-pipeline.rst index d954df8051eb..87407e670a84 100644 --- a/doc/source/data/dataset-pipeline.rst +++ b/doc/source/data/dataset-pipeline.rst @@ -69,6 +69,40 @@ You can also create a DatasetPipeline from a custom iterator over dataset creato splits = ray.data.range(1000, parallelism=200).split(20) pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits]) +Handling Epochs +~~~~~~~~~~~~~~~ + +It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset. DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments. Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example: + +.. code-block:: python + + pipe = ray.data.range(5).repeat(3).random_shuffle_each_window() + for i, epoch in enumerate(pipe.iter_epochs()): + print("Epoch {}", i) + for row in epoch.iter_rows(): + print(row) + # -> + # Epoch 0 + # 2 + # 1 + # 3 + # 4 + # 0 + # Epoch 1 + # 3 + # 4 + # 0 + # 2 + # 1 + # Epoch 2 + # 3 + # 2 + # 4 + # 1 + # 0 + +Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls. + Per-Window Transformations ~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -79,12 +113,14 @@ While most Dataset operations are per-row (e.g., map, filter), some operations a # Example of randomly shuffling each window of a pipeline. ray.data.range(5).repeat(2).random_shuffle_each_window().show_windows() # -> + # ----- Epoch 0 ------ # === Window 0 === # 4 # 3 # 1 # 0 # 2 + # ----- Epoch 1 ------ # === Window 1 === # 2 # 1 @@ -99,12 +135,14 @@ You can also apply arbitrary transformations to each window using ``DatasetPipel # Equivalent transformation using .foreach_window() ray.data.range(5).repeat(2).foreach_window(lambda w: w.random_shuffle()).show_windows() # -> + # ----- Epoch 0 ------ # === Window 0 === # 1 # 0 # 4 # 2 # 3 + # ----- Epoch 1 ------ # === Window 1 === # 4 # 2 @@ -256,6 +294,7 @@ Sometimes, you may want to change the structure of an existing pipeline. For exa .repeat(2) \ .show_windows() # -> + # ------ Epoch 0 ------ # === Window 0 === # 0 # 1 @@ -264,6 +303,7 @@ Sometimes, you may want to change the structure of an existing pipeline. For exa # 3 # === Window 2 === # 4 + # ------ Epoch 1 ------ # === Window 3 === # 0 # 1 @@ -273,18 +313,22 @@ Sometimes, you may want to change the structure of an existing pipeline. For exa # === Window 5 === # 4 - # Repeat followed by window. + # Repeat followed by window. Note that epoch 1 contains some leftover + # data from the tail end of epoch 0, since re-windowing can merge windows + # across epochs. ray.data.range(5) \ .repeat(2) \ .rewindow(blocks_per_window=2) \ .show_windows() # -> + # ------ Epoch 0 ------ # === Window 0 === # 0 # 1 # === Window 1 === # 2 # 3 + # ------ Epoch 1 ------ # === Window 2 === # 4 # 0 diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0b8a7fad6ca5..08c83265e640 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -44,6 +44,9 @@ logger = logging.getLogger(__name__) +# Whether we have warned of Datasets containing multiple epochs of data. +_epoch_warned = False + @PublicAPI(stability="beta") class Dataset(Generic[T]): @@ -63,7 +66,7 @@ class Dataset(Generic[T]): and simple repartition, but currently not aggregations and joins. """ - def __init__(self, blocks: BlockList[T]): + def __init__(self, blocks: BlockList[T], epoch: int): """Construct a Dataset (internal API). The constructor is not part of the Dataset API. Use the ``ray.data.*`` @@ -71,6 +74,7 @@ def __init__(self, blocks: BlockList[T]): """ self._blocks: BlockList[T] = blocks self._uuid = uuid4().hex + self._epoch = epoch assert isinstance(self._blocks, BlockList), self._blocks def map(self, @@ -125,7 +129,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def map_batches(self, fn: Union[CallableClass, Callable[[BatchType], BatchType]], @@ -219,7 +225,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def flat_map(self, fn: Union[CallableClass, Callable[[T], Iterable[U]]], @@ -257,7 +265,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def filter(self, fn: Union[CallableClass, Callable[[T], bool]], @@ -295,7 +305,9 @@ def transform(block: Block) -> Block: compute = get_compute(compute) - return Dataset(compute.apply(transform, ray_remote_args, self._blocks)) + return Dataset( + compute.apply(transform, ray_remote_args, self._blocks), + self._epoch) def repartition(self, num_blocks: int) -> "Dataset[T]": """Repartition the dataset into exactly this number of blocks. @@ -316,7 +328,7 @@ def repartition(self, num_blocks: int) -> "Dataset[T]": """ new_blocks = simple_shuffle(self._blocks, num_blocks) - return Dataset(new_blocks) + return Dataset(new_blocks, self._epoch) def random_shuffle( self, @@ -360,7 +372,7 @@ def random_shuffle( random_shuffle=True, random_seed=seed, _spread_resource_prefix=_spread_resource_prefix) - return Dataset(new_blocks) + return Dataset(new_blocks, self._epoch) def _move_blocks(self): blocks = self._blocks.copy() @@ -557,7 +569,8 @@ def equalize(splits: List[Dataset[T]], return equalize([ Dataset( BlockList( - list(blocks), [metadata_mapping[b] for b in blocks])) + list(blocks), [metadata_mapping[b] + for b in blocks]), self._epoch) for blocks in np.array_split(block_refs, n) if not equal or len(blocks) > 0 ], n) @@ -657,7 +670,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]: BlockList( allocation_per_actor[actor], [metadata_mapping[b] - for b in allocation_per_actor[actor]])) + for b in allocation_per_actor[actor]]), self._epoch) for actor in locality_hints ], n) @@ -733,7 +746,18 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": metadata.extend(bl._metadata) blocks.extend(bl._blocks) - return Dataset(LazyBlockList(calls, metadata, blocks)) + epochs = [ds._get_epoch() for ds in datasets] + max_epoch = max(*epochs) + if len(set(epochs)) > 1: + global _epoch_warned + if not _epoch_warned: + logger.warning( + "Dataset contains data from multiple epochs: {}, " + "likely due to a `rewindow()` call. The higher epoch " + "number {} will be used. This warning will not " + "be shown again.".format(set(epochs), max_epoch)) + _epoch_warned = True + return Dataset(LazyBlockList(calls, metadata, blocks), max_epoch) def sort(self, key: Union[None, str, List[str], Callable[[T], Any]] = None, @@ -772,7 +796,7 @@ def sort(self, # Handle empty dataset. if self.num_blocks() == 0: return self - return Dataset(sort_impl(self._blocks, key, descending)) + return Dataset(sort_impl(self._blocks, key, descending), self._epoch) def zip(self, other: "Dataset[U]") -> "Dataset[(T, U)]": """Zip this dataset with the elements of another. @@ -823,7 +847,7 @@ def do_zip(block1: Block, block2: Block) -> (Block, BlockMetadata): # TODO(ekl) it might be nice to have a progress bar here. metadata = ray.get(metadata) - return Dataset(BlockList(blocks, metadata)) + return Dataset(BlockList(blocks, metadata), self._epoch) def limit(self, limit: int) -> "Dataset[T]": """Limit the dataset to the first number of records specified. @@ -1623,6 +1647,9 @@ def repeat(self, times: int = None) -> "DatasetPipeline[T]": Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset. + Note that every repeat of the dataset is considered an "epoch" for + the purposes of ``DatasetPipeline.iter_epochs()``. + Examples: >>> # Infinite pipeline of numbers [0, 5) >>> ray.data.range(5).repeat().take() @@ -1653,6 +1680,7 @@ def __init__(self, ds: "Dataset[T]"): def __next__(self) -> "Dataset[T]": if times and self._i >= times: raise StopIteration + self._ds._set_epoch(self._i) self._i += 1 return lambda: self._ds @@ -1719,8 +1747,9 @@ def window(self, *, blocks_per_window: int = 10) -> "DatasetPipeline[T]": from ray.data.dataset_pipeline import DatasetPipeline class Iterator: - def __init__(self, splits): + def __init__(self, splits, epoch): self._splits = splits.copy() + self._epoch = epoch def __next__(self) -> "Dataset[T]": if not self._splits: @@ -1729,18 +1758,19 @@ def __next__(self) -> "Dataset[T]": blocks = self._splits.pop(0) def gen(): - return Dataset(blocks) + return Dataset(blocks, self._epoch) return gen class Iterable: - def __init__(self, blocks): + def __init__(self, blocks, epoch): self._splits = blocks.split(split_size=blocks_per_window) + self._epoch = epoch def __iter__(self): - return Iterator(self._splits) + return Iterator(self._splits, self._epoch) - it = Iterable(self._blocks) + it = Iterable(self._blocks, self._epoch) return DatasetPipeline(it, length=len(it._splits)) @DeveloperAPI @@ -1791,16 +1821,17 @@ def _split(self, index: int, right_metadata.append(ray.get(m1)) count += num_rows - left = Dataset(BlockList(left_blocks, left_metadata)) + left = Dataset(BlockList(left_blocks, left_metadata), self._epoch) if return_right_half: - right = Dataset(BlockList(right_blocks, right_metadata)) + right = Dataset( + BlockList(right_blocks, right_metadata), self._epoch) else: right = None return left, right def _divide(self, block_idx: int) -> ("Dataset[T]", "Dataset[T]"): left, right = self._blocks.divide(block_idx) - return Dataset(left), Dataset(right) + return Dataset(left, self._epoch), Dataset(right, self._epoch) def __repr__(self) -> str: schema = self.schema() @@ -1840,6 +1871,12 @@ def _get_uuid(self) -> str: def _set_uuid(self, uuid: str) -> None: self._uuid = uuid + def _get_epoch(self) -> int: + return self._epoch + + def _set_epoch(self, epoch: int) -> None: + self._epoch = epoch + def _get_num_rows(block: Block) -> int: block = BlockAccessor.for_block(block) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 962961105f89..babd253aea98 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -272,6 +272,8 @@ def __next__(self) -> Dataset[T]: # Slice off the left-most chunk and return it. res, self._buffer = self._buffer._divide(blocks_per_window) assert res.num_blocks() <= blocks_per_window, res + if self._buffer.num_blocks() == 0: + self._buffer = None return lambda: res except StopIteration: # Return the left-over data as a single window. @@ -303,6 +305,10 @@ def repeat(self, times: int = None) -> "DatasetPipeline[T]": Transformations done on the repeated pipeline are evaluated on each loop of the pipeline over the base pipeline. + Note that every repeat of the pipeline is considered an "epoch" for + the purposes of ``iter_epochs()``. If there are multiple repeat calls, + the latest repeat takes precedence for the purpose of defining epochs. + Args: times: The number of times to loop over this pipeline, or None to repeat indefinitely. @@ -326,6 +332,7 @@ def __next__(self) -> Dataset[T]: if self._original_iter: try: res = next(self._original_iter) + res._set_epoch(0) self._results.append(res) return lambda: res except StopIteration: @@ -338,6 +345,7 @@ def __next__(self) -> Dataset[T]: # Going through a repeat of the pipeline. if self._i < self._max_i: res = self._results[self._i % len(self._results)] + res._set_epoch(1 + self._i // len(self._results)) self._i += 1 return lambda: res else: @@ -416,10 +424,93 @@ def show_windows(self, limit_per_dataset: int = 10) -> None: Args: limit_per_dataset: Rows to print per window/dataset. """ + epoch = None for i, ds in enumerate(self.iter_datasets()): + if ds._get_epoch() != epoch: + epoch = ds._get_epoch() + print("------ Epoch {} ------".format(epoch)) print("=== Window {} ===".format(i)) ds.show(limit_per_dataset) + def iter_epochs(self) -> Iterator["DatasetPipeline[T]"]: + """Split this pipeline up by epoch. + + This allows reading of data per-epoch for repeated Datasets, which is + useful for ML training. For example, ``ray.data.range(10).repeat(50)`` + generates a pipeline with 500 rows total split across 50 epochs. This + method allows iterating over the data individually per epoch + (repetition) of the original data. + + Examples: + >>> epochs = ray.data.range(10).repeat(50).iter_epochs() + >>> for i, epoch in enumerate(epochs): + ... print("Epoch", i) + ... for row in epoch.iter_rows(): + ... print(row) + + Returns: + Iterator over epoch objects, where each epoch is a DatasetPipeline + containing data from that epoch only. + """ + + class Peekable: + def __init__(self, base_iter: Iterator[T]): + self._iter = base_iter + self._buffer = None + + def _fill_buffer_if_possible(self): + if self._buffer is None: + try: + self._buffer = next(self._iter) + assert self._buffer is not None + except StopIteration: + pass + + def peek(self) -> T: + self._fill_buffer_if_possible() + if self._buffer is None: + raise StopIteration + return self._buffer + + def __next__(self) -> T: + self._fill_buffer_if_possible() + if self._buffer is None: + raise StopIteration + item = self._buffer + self._buffer = None + return item + + class SingleEpochIterator: + def __init__(self, peekable_iter: Iterator[Dataset[T]]): + self._iter = peekable_iter + self._epoch = None + + def __next__(self) -> Dataset[T]: + if (self._epoch is not None + and self._iter.peek()._get_epoch() != self._epoch): + raise StopIteration + ds = next(self._iter) + self._epoch = ds._get_epoch() + return lambda: ds + + def __iter__(self): + return self + + class EpochDelimitedIterator: + def __init__(self, pipe): + self._iter = Peekable(pipe.iter_datasets()) + + def __next__(self) -> "DatasetPipeline[T]": + self._iter.peek() # Raises StopIteration on end of data. + epoch_pipe = DatasetPipeline.from_iterable( + SingleEpochIterator(self._iter)) + return epoch_pipe + + def __iter__(self): + return self + + return EpochDelimitedIterator(self) + @DeveloperAPI def iter_datasets(self) -> Iterator[Dataset[T]]: """Iterate over the output datasets of this pipeline. diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index fb9856148998..e9f88d4f0bd9 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -61,7 +61,7 @@ def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]: BlockAccessor.for_block(block).get_metadata(input_files=None)) i += block_size - return Dataset(BlockList(blocks, metadata)) + return Dataset(BlockList(blocks, metadata), 0) @PublicAPI(stability="beta") @@ -202,7 +202,7 @@ def remote_read(task: ReadTask) -> Block: input_files=metadata[0].input_files, )) - return Dataset(block_list) + return Dataset(block_list, 0) @PublicAPI(stability="beta") @@ -563,7 +563,7 @@ def from_pandas_refs( res = [df_to_block.remote(df) for df in dfs] blocks, metadata = zip(*res) - return Dataset(BlockList(blocks, ray.get(list(metadata)))) + return Dataset(BlockList(blocks, ray.get(list(metadata))), 0) def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: @@ -579,7 +579,7 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: res = [ndarray_to_block.remote(ndarray) for ndarray in ndarrays] blocks, metadata = zip(*res) - return Dataset(BlockList(blocks, ray.get(list(metadata)))) + return Dataset(BlockList(blocks, ray.get(list(metadata))), 0) @PublicAPI(stability="beta") @@ -611,7 +611,7 @@ def from_arrow_refs(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]] """ get_metadata = cached_remote_fn(_get_metadata) metadata = [get_metadata.remote(t) for t in tables] - return Dataset(BlockList(tables, ray.get(metadata))) + return Dataset(BlockList(tables, ray.get(metadata)), 0) @PublicAPI(stability="beta") diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 91aa91e5c2eb..457d65c42a00 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -197,7 +197,7 @@ def _test_equal_split_balanced(block_sizes, num_splits): metadata.append(BlockAccessor.for_block(block).get_metadata(None)) total_rows += block_size block_list = BlockList(blocks, metadata) - ds = Dataset(block_list) + ds = Dataset(block_list, 0) splits = ds.split(num_splits, equal=True) split_counts = [split.count() for split in splits] diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index cffb378f3686..ceb3b3b59778 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -35,6 +35,25 @@ def block_on_ones(x: int) -> int: assert pipe.take(1) == [0] +def test_epoch(ray_start_regular_shared): + # Test dataset repeat. + pipe = ray.data.range(5).map(lambda x: x * 2).repeat(3).map( + lambda x: x * 2) + results = [p.take() for p in pipe.iter_epochs()] + assert results == [[0, 4, 8, 12, 16], [0, 4, 8, 12, 16], [0, 4, 8, 12, 16]] + + # Test dataset pipeline repeat. + pipe = ray.data.range(3).window(blocks_per_window=2).repeat(3) + results = [p.take() for p in pipe.iter_epochs()] + assert results == [[0, 1, 2], [0, 1, 2], [0, 1, 2]] + + # Test nested repeat. + pipe = ray.data.range(5).repeat(2).repeat(2) + results = [p.take() for p in pipe.iter_epochs()] + assert results == [[0, 1, 2, 3, 4, 0, 1, 2, 3, 4], + [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] + + def test_cannot_read_twice(ray_start_regular_shared): ds = ray.data.range(10) pipe = ds.window(blocks_per_window=1)