From 008eecfbff20063887417e9b985675894b451939 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 24 Jul 2022 09:59:29 -0700 Subject: [PATCH] [docs] Update the AIR data ingest guide (#26909) --- doc/source/ray-air/check-ingest.rst | 297 ++++++++++------------ doc/source/ray-air/doc_code/air_ingest.py | 86 ++++++- doc/source/ray-air/images/ingest.svg | 2 +- 3 files changed, 221 insertions(+), 164 deletions(-) diff --git a/doc/source/ray-air/check-ingest.rst b/doc/source/ray-air/check-ingest.rst index 12f4f212017d..128c0f734954 100644 --- a/doc/source/ray-air/check-ingest.rst +++ b/doc/source/ray-air/check-ingest.rst @@ -3,19 +3,12 @@ Configuring Training Datasets ============================= -This page is a guide for setting up, understanding, and diagnosing data ingest problems in Ray AIR. +AIR builds its training data pipeline on :ref:`Ray Datasets `, which is a scalable, framework-agnostic data loading and preprocessing library. Datasets enables AIR to seamlessly load data for local and distributed training with Train. -Data ingest is the process of loading data from storage, apply preprocessing steps, and feeding the data into Trainers in AIR. -For datasets that are small, ingest is usually not an issue. However, ingest can be tricky to set up when datasets grow -large enough so that they may not fit fully in memory on a single node, or even in aggregate cluster memory. +This page describes how to setup and configure these datasets in Train under different scenarios and scales. -AIR builds its ingest pipeline on :ref:`Ray Datasets `, which is a framework-agnostic distributed data loading library. If you have -an existing ingest pipeline (e.g., based on TF data), there is some upfront effort porting your loading code to Datasets. -In return, AIR provides portability across ML frameworks as well as advanced capabilities such as global random shuffles, -which are not possible in less general ML data preprocessing libraries. - -Ingest Basics -------------- +Overview +-------- .. _ingest_basics: @@ -29,175 +22,84 @@ user-defined function to preprocess batches of data, and (3) runs an AIR Trainer Let's walk through the stages of what happens when ``Trainer.fit()`` is called. -**Read**: First, AIR will read the Dataset into the Ray object store by calling ``ds.fully_executed()`` on the datasets -that you pass to the Trainer. Dataset blocks that don't fit into memory will be spilled to disk. Note that when you create -the dataset initially, typically only the first block and block metadata is read into memory. The rest of the blocks are -not loaded until ``fit`` is called. - -**Preprocessing**: Next, if a preprocessor is defined, AIR will by default ``fit`` the preprocessor (e.g., compute statistics) on the +**Preprocessing**: First, AIR will ``fit`` the preprocessor (e.g., compute statistics) on the ``"train"`` dataset, and then ``transform`` all given datasets with the fitted preprocessor. This is done by calling ``prep.fit_transform()`` -on the train dataset passed to the Trainer, followed by ``prep.transform()`` on remaining datasets. Preprocessors use Dataset APIs to execute -preprocessing in a parallelized way across the cluster. Both read and preprocessing stages use Ray tasks under the hood. - -**Training**: Finally, AIR passes a reference to the preprocessed dataset to Train workers (Ray actors) launched by the Trainer. Each worker then -typically calls ``iter_batches``, ``to_tf``, or ``to_torch`` to iterate over the dataset reader retrieved by ``get_dataset_shard``. -These read methods load blocks of the dataset into the local worker's memory in a streaming fashion, only fetching / prefetching a -limited number of blocks at once. Workers loop over the dataset blocks repeatedly until training completes. - -Configuring Ingest Per-Dataset -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -It is common to customize processing per-dataset. For example, you may want to enable sharding -on a validation dataset, disable preprocessing of an auxiliary dataset, or adjust ingest strategy per dataset. - -Each DataParallelTrainer has a default per-dataset config given by a ``Trainer._dataset_config`` class field. It is a mapping -from dataset names to ``DatasetConfig`` objects, and implements the default behavior described in :ref:`Ingest Basics `: - -.. code:: python - - # The default DataParallelTrainer dataset config, which is inherited - # by sub-classes such as TorchTrainer, HorovodTrainer, etc. - _dataset_config = { - # Fit preprocessors on the train dataset only. Split the dataset - # across workers if scaling_config["num_workers"] > 1. - "train": DatasetConfig(fit=True, split=True), - # For all other datasets, use the defaults (don't fit, don't split). - # The datasets will be transformed by the fitted preprocessor. - "*": DatasetConfig(), - } - -These configs can be overriden via the ``dataset_config`` kwarg, which is recursively merged with the Trainer defaults. -Here are some examples of configuring Dataset ingest options and what they do: - -.. tabbed:: Split All - - This example shows overriding the split config for the "valid" and "test" datasets. This means that - both the valid and test datasets here will be ``.split()`` across the training workers. - - .. literalinclude:: doc_code/air_ingest.py - :language: python - :start-after: __config_1__ - :end-before: __config_1_end__ - -.. tabbed:: Disable Transform - - This example shows overriding the transform config for the "side" dataset. This means that - the original dataset will be returned by ``.get_dataset_shard("side")``. - - .. literalinclude:: doc_code/air_ingest.py - :language: python - :start-after: __config_2__ - :end-before: __config_2_end__ - - -Bulk vs Streaming Reads ------------------------ +on the train dataset passed to the Trainer, followed by ``prep.transform()`` on remaining datasets. -Bulk Ingest -~~~~~~~~~~~ +**Training**: Then, AIR passes the preprocessed dataset to Train workers (Ray actors) launched by the Trainer. Each worker calls ``get_dataset_shard`` to get a handle to its assigned data shard, and then calls one of ``iter_batches``, ``iter_torch_batches``, or ``iter_tf_batches`` to loop over the data. -By default, AIR loads all Dataset blocks into the object store at the start of training. This provides the best performance if the -cluster has enough aggregate memory to fit all the data blocks in object store memory, or if your preprocessing step is expensive -and you don't want it to be re-run on each epoch. Note that data often requires more space -when loaded uncompressed in memory than when resident in storage. +Getting Started +--------------- -If there is insufficient object store memory, blocks may be spilled to disk during reads or preprocessing. Ray will print log messages -if spilling is occuring, and you can check this as well with the ``ray memory --stats-only`` utility. If spilling is happening, take -care to ensure the cluster has enough disk space to handle the spilled blocks. Alternatively, consider using machine with more memory / -more machines to avoid spilling. - -In short, use bulk ingest when: - * you have enough memory to fit data blocks in cluster object store; - * your preprocessing step is expensive per each epoch; and - * you want best performance when both or either the above conditions are met. - -Streaming Ingest (experimental) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -AIR also supports streaming ingest via the DatasetPipeline feature. Streaming ingest is preferable when you are using large datasets -that don't fit into memory, and prefer to read *windows* of data from storage to minimize the active memory required for data ingest. -Note that streaming ingest will re-execute preprocessing on each pass over the data. If preprocessing is a bottleneck, consider -using bulk ingest instead for better performance. - -To enable streaming ingest, set ``use_stream_api=True`` in the dataset config. By default, this will configure streaming ingest with a window -size of 1GiB, which means AIR will load ~1 GiB of data at a time from the datasource. -Performance can be increased with larger window sizes, which can be adjusted using the ``stream_window_size`` config. -A reasonable stream window size is something like 20% of available object store memory. Note that the data may be larger -once deserialized in memory, or if individual files are larger than the window size. - -If the window size is set to -1, then an infinite window size will be used. This case is equivalent to using bulk loading -(including the performance advantages of caching preprocessed blocks), but still exposing a DatasetPipeline reader. - -In short, use streaming ingest when: - * you have large datasets that don't fit into memory; - * you want to process small chunks or blocks per window; - * you can use small windows with small data blocks minimizing or avoiding memory starvation or OOM errors; and - * your preprocessing step is not a bottleneck or not an expensive operation since it's re-executed on each pass over the data. - -.. warning:: - - Streaming ingest only applies to preprocessor transform, not preprocessor fitting. - This means that the preprocessor will be initially fit in bulk, after which data will be transformed - as it is loaded in a streaming manner. - -Reading Data -~~~~~~~~~~~~ - -The ``get_dataset_shard`` method returns a reader object that is either a ``Dataset`` or ``DatasetPipeline``, depending on whether the ``use_stream_api`` -option is set. The former is a finite set of records, and the latter represents an infinite stream of records. -See the following examples for clarification: +The following is a simple example of how to configure ingest for a dummy ``TorchTrainer``. Below, we are passing a small tensor dataset to the Trainer via the ``datasets`` argument. In the Trainer's ``train_loop_per_worker``, we access the preprocessed dataset using ``get_dataset_shard()``. .. tabbed:: Bulk Ingest - This example shows bulk ingest (the default). Data is bulk loaded and made available - directly via a ``Dataset`` object that can be looped over manually. + By default, AIR loads all datasets into the Ray object store at the start of training. + This provides the best performance if the cluster can fit the datasets + entirely in memory, or if the preprocessing step is expensive to run more than once. .. literalinclude:: doc_code/air_ingest.py :language: python :start-after: __config_4__ :end-before: __config_4_end__ -.. tabbed:: Streaming Ingest + You should use bulk ingest when: + + * you have enough memory to fit data blocks in cluster object store; + * your preprocessing step is expensive per each epoch; and + * you want best performance when both or either the above conditions are met. + +.. tabbed:: Streaming Ingest (experimental) + + In streaming ingest mode, ``get_dataset_shard`` returns a ``DatasetPipeline`` pipeline that + can be used to read data in a streaming way. + To enable streaming ingest, set ``use_stream_api=True`` in the dataset config. - This example shows enabling streaming ingest for the "train" dataset with a *N-byte* window. - This means that AIR will only load *N* bytes of data from the datasource at a time (the data - may be larger once deserialized in memory or if individual files are larger than the window). + By default, this will tell AIR to load *windows* of 1GiB of data into memory at a time. + Performance can be increased with larger window sizes, which can be adjusted using the + ``stream_window_size`` config. + A reasonable stream window size is something like 20% of available object store memory. .. literalinclude:: doc_code/air_ingest.py :language: python :start-after: __config_5__ :end-before: __config_5_end__ + Use streaming ingest when: + + * you have large datasets that don't fit into memory; + * you want to process small chunks or blocks per window; + * you can use small windows with small data blocks minimizing or avoiding memory starvation or OOM errors; and + * your preprocessing step is not a bottleneck or not an expensive operation since it's re-executed on each pass over the data. + Shuffling Data ~~~~~~~~~~~~~~ -AIR offers several options for per-epoch shuffling, including *local -(per-shard) shuffling* and *global (whole-dataset) shuffling*. +Shuffling or data randomization is important for training high-quality models. By default, AIR will randomize the order the data files (blocks) are read from. AIR also offers options for further randomizing data records within each file: .. tabbed:: Local Shuffling + + Local shuffling is the recommended approach for randomizing data order. To use local shuffle, + simply specify a non-zero ``local_shuffle_buffer_size`` as an argument to ``iter_batches()``. + The iterator will then use a local buffer of the given size to randomize record order. The + larger the buffer size, the more randomization will be applied, but it will also use more + memory. - Local shuffling is an in-iterator shuffle that fills a trainer-local in-memory shuffle - buffer with records and then pops random samples as batches, keeping the buffer above a - user-provided threshold to ensure samples are mixed throughout the entirety of the - trainer's shard. This local shuffle doesn't mix samples across trainer shards between - epochs as the global shuffle does, and will therefore be a lower-quality shuffle; - however, since this shuffle only involves a local in-memory buffer, it is much less - expensive. - - For configuring the size of the in-memory shuffle buffer, it is recommended to - allocate as large of a buffer as the trainer's CPU memory constraints will allow; - note that the ceiling of the CPU memory usage on a given node will be - ``(# of trainers on node) * max(memory used by prefetching, memory used by shuffle - buffer)``, where the aggressiveness of the prefetching is controlled by the - ``prefetch_blocks`` argument. See - :meth:`ds.iter_batches() ` for details. + See :meth:`ds.iter_batches() ` for more details. .. literalinclude:: doc_code/air_ingest.py :language: python :start-after: __local_shuffling_start__ :end-before: __local_shuffling_end__ -.. tabbed:: Global Shuffling + You should use local shuffling when: + + * a small in-memory buffer provides enough randomization; or + * you want the highest possible ingest performance; or + * your model is not overly sensitive to shuffle quality + +.. tabbed:: Global Shuffling (slower) Global shuffling provides more uniformly random (decorrelated) samples and is carried out via a distributed map-reduce operation. This higher quality shuffle can often lead @@ -214,28 +116,95 @@ AIR offers several options for per-epoch shuffling, including *local :start-after: __global_shuffling_start__ :end-before: __global_shuffling_end__ -Ingest and Ray Tune -------------------- + You should use global shuffling when: -.. note:: + * you suspect high-quality shuffles may significantly improve model quality; and + * absolute ingest performance is less of a concern - Train always uses Tune as the execution backend under the hood, even when running just ``Trainer.fit()`` directly (this is treated as a single-trial experiment). This ensures consistency of execution. +Configuring Ingest +~~~~~~~~~~~~~~~~~~ -Placement Group Behavior -~~~~~~~~~~~~~~~~~~~~~~~~ +You can use the ``DatasetConfig`` object to configure how Datasets are preprocessed and split across training workers. Each ``DataParallelTrainer`` has a default ``_dataset_config`` class field. It is a mapping +from dataset names to ``DatasetConfig`` objects, and implements the default behavior described in the :ref:`overview `: -Tune typically creates a placement group reserving resource for each of its trials. These placement groups only reserve resources for the Train actors, however. By default, Dataset preprocessing tasks run using "spare" CPU resources in the cluster, which enables better autoscaling and utilization of resources. It is also possible to set aside node CPUs for Dataset tasks using the ``_max_cpu_fraction_per_node`` option of DatasetConfig (Experimental). +.. code:: python -.. warning:: + # The default DataParallelTrainer dataset config, which is inherited + # by sub-classes such as TorchTrainer, HorovodTrainer, etc. + _dataset_config = { + # Fit preprocessors on the train dataset only. Split the dataset + # across workers if scaling_config["num_workers"] > 1. + "train": DatasetConfig(fit=True, split=True), + # For all other datasets, use the defaults (don't fit, don't split). + # The datasets will be transformed by the fitted preprocessor. + "*": DatasetConfig(), + } - If trial placement groups reserve all the CPUs in the cluster, then it may be that no CPUs are left for Datasets to use, and trials can hang. This can easily happen when using CPU-only trainers. For example, if you can change the above ingest example to use ``ray.init(num_cpus=2)``, such a hang will happen. +These configs can be overriden via the ``dataset_config`` constructor argument. +Here are some examples of configuring Dataset ingest options and what they do: -Refer to the :ref:`Datasets in Tune Example ` to understand how to configure your Trainer. We recommend starting with the default of allowing tasks to run using spare cluster resources, and only changing this if you encounter hangs or want more performance predictability. +.. tabbed:: Example: Split All Datasets -Dataset Sharing -~~~~~~~~~~~~~~~ + This example shows overriding the split config for the "valid" and "test" datasets. This means that + both the valid and test datasets here will be ``.split()`` across the training workers. -When you pass Datasets to a Tuner, it is important to understand that the Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call `ds = ds.fully_executed()` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial. + .. literalinclude:: doc_code/air_ingest.py + :language: python + :start-after: __config_1__ + :end-before: __config_1_end__ + +.. tabbed:: Example: Disable Transform on Aux Dataset + + This example shows overriding the transform config for the "side" dataset. This means that + the original dataset will be returned by ``.get_dataset_shard("side")``. + + .. literalinclude:: doc_code/air_ingest.py + :language: python + :start-after: __config_2__ + :end-before: __config_2_end__ + +Dataset Resources +~~~~~~~~~~~~~~~~~ + +Datasets uses Ray tasks to execute data processing operations. These tasks use CPU resources in the cluster during execution, which may compete with resources needed for Training. + +.. tabbed:: Unreserved CPUs + + By default, Dataset tasks use cluster CPU resources for execution. This can sometimes + conflict with Trainer resource requests. For example, if Trainers allocate all CPU resources + in the cluster, then no Datasets tasks can run. + + .. literalinclude:: ./doc_code/air_ingest.py + :language: python + :start-after: __resource_allocation_1_begin__ + :end-before: __resource_allocation_1_end__ + + Unreserved CPUs work well when: + + * you are running only one Trainer and the cluster has enough CPUs; or + * your Trainers are configured to use GPUs and not CPUs + +.. tabbed:: Using Reserved CPUs (experimental) + + The ``_max_cpu_fraction_per_node`` option can be used to exclude CPUs from placement + group scheduling. In the below example, setting this parameter to ``0.8`` enables Tune + trials to run smoothly without risk of deadlock by reserving 20% of node CPUs for + Dataset execution. + + .. literalinclude:: ./doc_code/air_ingest.py + :language: python + :start-after: __resource_allocation_2_begin__ + :end-before: __resource_allocation_2_end__ + + You should use reserved CPUs when: + + * you are running multiple concurrent CPU Trainers using Tune; or + * you want to ensure predictable Datasets performance + + .. warning:: + + ``_max_cpu_fraction_per_node`` is experimental and not currently recommended for use with + autoscaling clusters (scale-up will not trigger properly). Debugging Ingest with the ``DummyTrainer`` ------------------------------------------ @@ -370,3 +339,9 @@ Performance Tips **Autoscaling**: We generally recommend first trying out AIR training with a fixed size cluster. This makes it easier to understand and debug issues. Autoscaling can be enabled after you are happy with performance to autoscale experiment sweeps with Tune, etc. We also recommend starting with autoscaling with a single node type. Autoscaling with hetereogeneous clusters can optimize costs, but may complicate performance debugging. **Partitioning**: By default, Datasets will create up to 200 blocks per Dataset, or less if there are fewer base files for the Dataset. If you run into out-of-memory errors during preprocessing, consider increasing the number of blocks to reduce their size. To increase the max number of partitions, set the ``parallelism`` option when calling ``ray.data.read_*()``. To change the number of partitions at runtime, use ``ds.repartition(N)``. As a rule of thumb, blocks should be not more than 1-2GiB each. + +Dataset Sharing +~~~~~~~~~~~~~~~ + +When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.fully_executed()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial. + diff --git a/doc/source/ray-air/doc_code/air_ingest.py b/doc/source/ray-air/doc_code/air_ingest.py index 6b180fe67719..091b4b8f0ec3 100644 --- a/doc/source/ray-air/doc_code/air_ingest.py +++ b/doc/source/ray-air/doc_code/air_ingest.py @@ -92,9 +92,12 @@ from ray.train.torch import TorchTrainer from ray.air.config import ScalingConfig +# A simple preprocessor that just scales all values by 2.0. +preprocessor = BatchMapper(lambda df: df * 2) + def train_loop_per_worker(): - # By default, bulk loading is used and returns a Dataset object. + # Get a handle to the worker's assigned Dataset shard. data_shard: Dataset = session.get_dataset_shard("train") # Manually iterate over the data 10 times (10 epochs). @@ -102,7 +105,7 @@ def train_loop_per_worker(): for batch in data_shard.iter_batches(): print("Do some training on batch", batch) - # View the stats for performance debugging. + # Print the stats for performance debugging. print(data_shard.stats()) @@ -112,6 +115,7 @@ def train_loop_per_worker(): datasets={ "train": ray.data.range_tensor(1000), }, + preprocessor=preprocessor, ) my_trainer.fit() # __config_4_end__ @@ -123,6 +127,9 @@ def train_loop_per_worker(): from ray.train.torch import TorchTrainer from ray.air.config import ScalingConfig, DatasetConfig +# A simple preprocessor that just scales all values by 2.0. +preprocessor = BatchMapper(lambda df: df * 2) + def train_loop_per_worker(): # A DatasetPipeline object is returned when `use_stream_api` is set. @@ -149,6 +156,7 @@ def train_loop_per_worker(): dataset_config={ "train": DatasetConfig(use_stream_api=True, stream_window_size=N), }, + preprocessor=preprocessor, ) my_trainer.fit() # __config_5_end__ @@ -223,3 +231,77 @@ def train_loop_per_worker(): # -> {'train': DatasetConfig(fit=True, split=True, global_shuffle=False, ...)} my_trainer.fit() # __local_shuffling_end__ + +ray.shutdown() + +# __resource_allocation_1_begin__ +import ray +from ray.air import session +from ray.data.preprocessors import BatchMapper +from ray.train.torch import TorchTrainer +from ray.air.config import ScalingConfig + +# Create a cluster with 4 CPU slots available. +ray.init(num_cpus=4) + +# A simple example training loop. +def train_loop_per_worker(): + data_shard = session.get_dataset_shard("train") + for _ in range(10): + for batch in data_shard.iter_batches(): + print("Do some training on batch", batch) + + +# A simple preprocessor that just scales all values by 2.0. +preprocessor = BatchMapper(lambda df: df * 2) + +my_trainer = TorchTrainer( + train_loop_per_worker, + # This will hang if you set num_workers=4, since the + # Trainer will reserve all 4 CPUs for workers, leaving + # none left for Datasets execution. + scaling_config=ScalingConfig(num_workers=2), + datasets={ + "train": ray.data.range_tensor(1000), + }, + preprocessor=preprocessor, +) +my_trainer.fit() +# __resource_allocation_1_end__ + +ray.shutdown() + +# __resource_allocation_2_begin__ +import ray +from ray.air import session +from ray.data.preprocessors import BatchMapper +from ray.train.torch import TorchTrainer +from ray.air.config import ScalingConfig + +# Create a cluster with 4 CPU slots available. +ray.init(num_cpus=4) + +# A simple example training loop. +def train_loop_per_worker(): + data_shard = session.get_dataset_shard("train") + for _ in range(10): + for batch in data_shard.iter_batches(): + print("Do some training on batch", batch) + + +# A simple preprocessor that just scales all values by 2.0. +preprocessor = BatchMapper(lambda df: df * 2) + +my_trainer = TorchTrainer( + train_loop_per_worker, + # This will hang if you set num_workers=4, since the + # Trainer will reserve all 4 CPUs for workers, leaving + # none left for Datasets execution. + scaling_config=ScalingConfig(num_workers=2), + datasets={ + "train": ray.data.range_tensor(1000), + }, + preprocessor=preprocessor, +) +my_trainer.fit() +# __resource_allocation_2_end__ diff --git a/doc/source/ray-air/images/ingest.svg b/doc/source/ray-air/images/ingest.svg index 136a71fdfa28..a17d82251d63 100644 --- a/doc/source/ray-air/images/ingest.svg +++ b/doc/source/ray-air/images/ingest.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file