Skip to content

Commit

Permalink
[docs] Editing pass over Dataset docs (ray-project#26935)
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan van der Kleij <[email protected]>
  • Loading branch information
ericl authored and Stefan van der Kleij committed Aug 18, 2022
1 parent 9938e14 commit 9fb1436
Show file tree
Hide file tree
Showing 18 changed files with 701 additions and 1,181 deletions.
2 changes: 2 additions & 0 deletions doc/source/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ parts:
title: Large-scale ML Ingest
- file: data/examples/ocr_example
title: Scaling OCR with Ray Datasets
- file: data/advanced-pipelines
- file: data/random-access
- file: data/faq
- file: data/package-ref
- file: data/integrations
Expand Down
196 changes: 0 additions & 196 deletions doc/source/data/accessing-datasets.rst

This file was deleted.

132 changes: 4 additions & 128 deletions doc/source/data/advanced-pipelines.rst
Original file line number Diff line number Diff line change
@@ -1,64 +1,13 @@
.. _data_pipeline_usage:

-----------------------
Advanced Pipeline Usage
-----------------------
--------------------------
Advanced Pipeline Examples
--------------------------

Handling Epochs
===============

It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset.
DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments.
Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example:

.. code-block:: python
pipe = ray.data.from_items([0, 1, 2, 3, 4]) \
.repeat(3) \
.random_shuffle_each_window()
for i, epoch in enumerate(pipe.iter_epochs()):
print("Epoch {}", i)
for row in epoch.iter_rows():
print(row)
# ->
# Epoch 0
# 2
# 1
# 3
# 4
# 0
# Epoch 1
# 3
# 4
# 0
# 2
# 1
# Epoch 2
# 3
# 2
# 4
# 1
# 0
Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls.
This page covers more advanced examples for dataset pipelines.

.. _dataset-pipeline-per-epoch-shuffle:

Per-Epoch Shuffle Pipeline
==========================
.. tip::

If you interested in distributed ingest for deep learning, it is
recommended to use Ray Datasets in conjunction with :ref:`Ray Train <train-docs>`.
See the :ref:`example below<dataset-pipeline-ray-train>` for more info.

..
https://docs.google.com/drawings/d/1vWQ-Zfxy2_Gthq8l3KmNsJ7nOCuYUQS9QMZpj5GHYx0/edit
The other method of creating a pipeline is calling ``.repeat()`` on an existing Dataset.
This creates a DatasetPipeline over an infinite sequence of the same original Dataset.
Readers pulling batches from the pipeline will see the same data blocks repeatedly, which is useful for distributed training.

Pre-repeat vs post-repeat transforms
====================================

Expand Down Expand Up @@ -103,79 +52,6 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc

Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.fully_executed().repeat()``.

Splitting pipelines for distributed ingest
==========================================

Similar to how you can ``.split()`` a Dataset, you can also split a DatasetPipeline with the same method call. This returns a number of DatasetPipeline shards that share a common parent pipeline. Each shard can be passed to a remote task or actor.

**Code**:

.. code-block:: python
# Create a pipeline that loops over its source dataset indefinitely.
pipe: DatasetPipeline = ray.data \
.read_parquet("s3://bucket/dir") \
.repeat() \
.random_shuffle_each_window()
@ray.remote(num_gpus=1)
class TrainingWorker:
def __init__(self, rank: int, shard: DatasetPipeline):
self.rank = rank
self.shard = shard
...
shards: List[DatasetPipeline] = pipe.split(n=3)
workers = [TrainingWorker.remote(rank, s) for rank, s in enumerate(shards)]
...
**Pipeline**:

.. image:: images/dataset-repeat-2.svg

.. _dataset-pipeline-ray-train:

Distributed Ingest with Ray Train
=================================

Ray Datasets integrates with :ref:`Ray Train <train-docs>`, further simplifying your distributed ingest pipeline.

Ray Train is a lightweight library for scalable deep learning on Ray.

1. It allows you to focus on the training logic and automatically handles distributed setup for your framework of choice (PyTorch, Tensorflow, or Horovod).
2. It has out of the box fault-tolerance and elastic training
3. And it comes with support for standard ML tools and features that practitioners love such as checkpointing and logging.

**Code**

.. code-block:: python
def train_func():
# This is a dummy train function just iterating over the dataset shard.
# You should replace this with your training logic.
shard = ray.train.get_dataset_shard()
for row in shard.iter_rows():
print(row)
# Create a pipeline that loops over its source dataset indefinitely.
pipe: DatasetPipeline = ray.data \
.read_parquet(...) \
.repeat() \
.random_shuffle_each_window()
# Pass in the pipeline to the Trainer.
# The Trainer will automatically split the DatasetPipeline for you.
trainer = Trainer(num_workers=8, backend="torch")
result = trainer.run(
train_func,
config={"worker_batch_size": 64, "num_epochs": 2},
dataset=pipe)
Ray Train is responsible for the orchestration of the training workers and will automatically split the Dataset for you.
See :ref:`the Train User Guide <train-dataset-pipeline>` for more details.

Changing Pipeline Structure
===========================

Expand Down
Loading

0 comments on commit 9fb1436

Please sign in to comment.