From 60a082cb100225d4b03b733d734e78566c36eba5 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 20 Mar 2024 00:17:25 -0700 Subject: [PATCH] [Data] [Docs] Consolidate shuffling-related information into `Shuffling Data` page (#44098) (#44171) Cherry-pick #44098. Docs-only change. Consolidate shuffling-related information spread out across Ray Data docs into a new Shuffling Data page. Signed-off-by: Scott Lee --- doc/source/data/iterating-over-data.rst | 2 +- doc/source/data/performance-tips.rst | 130 --------------- doc/source/data/shuffling-data.rst | 212 ++++++++++++++++++++++++ doc/source/data/transforming-data.rst | 89 +--------- doc/source/data/user-guide.rst | 1 + 5 files changed, 215 insertions(+), 219 deletions(-) create mode 100644 doc/source/data/shuffling-data.rst diff --git a/doc/source/data/iterating-over-data.rst b/doc/source/data/iterating-over-data.rst index f7564c0d7a39..52af861eae43 100644 --- a/doc/source/data/iterating-over-data.rst +++ b/doc/source/data/iterating-over-data.rst @@ -155,7 +155,7 @@ shuffles all rows. If a full global shuffle isn't required, you can shuffle a su rows up to a provided buffer size during iteration by specifying ``local_shuffle_buffer_size``. While this isn't a true global shuffle like ``random_shuffle``, it's more performant because it doesn't require excessive data -movement. +movement. For more details about these options, see :doc:`Shuffling Data `. .. tip:: diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 9fea21264b4e..ecc81d8a17ca 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -393,136 +393,6 @@ To illustrate these, the following code uses both strategies to coalesce the 10 ... * Output num rows: 10 min, 10 max, 10 mean, 10 total - -.. _optimizing_shuffles: - -Optimizing shuffles -------------------- - -*Shuffle* operations are all-to-all operations where the entire Dataset must be materialized in memory before execution can proceed. -Currently, these are: - -* :meth:`Dataset.groupby ` -* :meth:`Dataset.random_shuffle ` -* :meth:`Dataset.repartition ` -* :meth:`Dataset.sort ` - -.. note:: This is an active area of development. If your Dataset uses a shuffle operation and you are having trouble configuring shuffle, `file a Ray Data issue on GitHub`_ - -When should you use global per-epoch shuffling? -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Use global per-epoch shuffling only if your model is sensitive to the -randomness of the training data. Based on a -`theoretical foundation `__ all -gradient-descent-based model trainers benefit from improved (global) shuffle quality. -In practice, the benefit is particularly pronounced for tabular data/models. -However, the more global the shuffle is, the more expensive the shuffling operation. -The increase compounds with distributed data-parallel training on a multi-node cluster due -to data transfer costs. This cost can be prohibitive when using very large datasets. - -The best route for determining the best tradeoff between preprocessing time and cost and -per-epoch shuffle quality is to measure the precision gain per training step for your -particular model under different shuffling policies: - -* no shuffling, -* local (per-shard) limited-memory shuffle buffer, -* local (per-shard) shuffling, -* windowed (pseudo-global) shuffling, and -* fully global shuffling. - -As long as your data loading and shuffling throughput is higher than your training throughput, your GPU should -be saturated. If you have shuffle-sensitive models, push the -shuffle quality higher until this threshold is hit. - -.. _shuffle_performance_tips: - -Enabling push-based shuffle -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Some Dataset operations require a *shuffle* operation, meaning that data is shuffled from all of the input partitions to all of the output partitions. -These operations include :meth:`Dataset.random_shuffle `, -:meth:`Dataset.sort ` and :meth:`Dataset.groupby `. -Shuffle can be challenging to scale to large data sizes and clusters, especially when the total dataset size can't fit into memory. - -Datasets provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance. -Try this out if your dataset has more than 1000 blocks or is larger than 1 TB in size. - -To try this out locally or on a cluster, you can start with the `nightly release test `_ that Ray runs for :meth:`Dataset.random_shuffle ` and :meth:`Dataset.sort `. -To get an idea of the performance you can expect, here are some run time results for :meth:`Dataset.random_shuffle ` on 1-10 TB of data on 20 machines (m5.4xlarge instances on AWS EC2, each with 16 vCPUs, 64 GB RAM). - -.. image:: https://docs.google.com/spreadsheets/d/e/2PACX-1vQvBWpdxHsW0-loasJsBpdarAixb7rjoo-lTgikghfCeKPQtjQDDo2fY51Yc1B6k_S4bnYEoChmFrH2/pubchart?oid=598567373&format=image - :align: center - -To try out push-based shuffle, set the environment variable ``RAY_DATA_PUSH_BASED_SHUFFLE=1`` when running your application: - -.. code-block:: bash - - $ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py - $ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7 - - # Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total - # [dataset]: Run `pip install tqdm` to enable progress reporting. - # 2022-05-04 17:30:28,806 INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle. - # Finished in 9.571171760559082 - # ... - -You can also specify the shuffle implementation during program execution by -setting the ``DataContext.use_push_based_shuffle`` flag: - -.. testcode:: - :hide: - - import ray - ray.shutdown() - -.. testcode:: - - import ray - - ctx = ray.data.DataContext.get_current() - ctx.use_push_based_shuffle = True - - ds = ( - ray.data.range(1000) - .random_shuffle() - ) - -Large-scale shuffles can take a while to finish. -For debugging purposes, shuffle operations support executing only part of the shuffle, so that you can collect an execution profile more quickly. -Here is an example that shows how to limit a random shuffle operation to two output blocks: - -.. testcode:: - :hide: - - import ray - ray.shutdown() - -.. testcode:: - - import ray - - ctx = ray.data.DataContext.get_current() - ctx.set_config( - "debug_limit_shuffle_execution_to_num_blocks", 2 - ) - - ds = ( - ray.data.range(1000, override_num_blocks=10) - .random_shuffle() - .materialize() - ) - print(ds.stats()) - -.. testoutput:: - :options: +MOCK - - Operator 1 ReadRange->RandomShuffle: executed in 0.08s - - Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed - ... - - Configuring execution --------------------- diff --git a/doc/source/data/shuffling-data.rst b/doc/source/data/shuffling-data.rst new file mode 100644 index 000000000000..464151f381d9 --- /dev/null +++ b/doc/source/data/shuffling-data.rst @@ -0,0 +1,212 @@ +.. _shuffling_data: + +============== +Shuffling Data +============== + +When consuming or iterating over Ray :class:`Datasets `, it can be useful to +shuffle or randomize the order of data (for example, randomizing data ingest order during ML training). +This guide shows several different methods of shuffling data with Ray Data and their respective trade-offs. + +Types of shuffling +================== + +Ray Data provides several different options for shuffling data, trading off the granularity of shuffle +control with memory consumption and runtime. The options below are listed in increasing order of +resource consumption and runtime; choose the most appropriate method for your use case. + +.. _shuffling_file_order: + +Shuffle the ordering of files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To randomly shuffle the ordering of input files before reading, call a :ref:`read function ` function that supports shuffling, such as +:func:`~ray.data.read_images`, and use the ``shuffle="files"`` parameter. This randomly assigns +input files to workers for reading. + +This is the fastest option for shuffle, and is a purely metadata operation. This +option doesn't shuffle the actual rows inside files, so the randomness might be +poor if each file has many rows. + +.. testcode:: + + import ray + + ds = ray.data.read_images( + "s3://anonymous@ray-example-data/image-datasets/simple", + shuffle="files", + ) + +.. _local_shuffle_buffer: + +Local shuffle when iterating over batches +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To locally shuffle a subset of rows using iteration methods, such as :meth:`~ray.data.Dataset.iter_batches`, +:meth:`~ray.data.Dataset.iter_torch_batches`, and :meth:`~ray.data.Dataset.iter_tf_batches`, +specify `local_shuffle_buffer_size`. This shuffles the rows up to a provided buffer +size during iteration. See more details in +:ref:`Iterating over batches with shuffling `. + +This is slower than shuffling ordering of files, and shuffles rows locally without +network transfer. This local shuffle buffer can be used together with shuffling +ordering of files; see :ref:`Shuffle the ordering of files `. + +.. testcode:: + + import ray + + ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple") + + for batch in ds.iter_batches( + batch_size=2, + batch_format="numpy", + local_shuffle_buffer_size=250, + ): + print(batch) + +.. tip:: + + If you observe reduced throughput when using ``local_shuffle_buffer_size``, + check the total time spent in batch creation by + examining the ``ds.stats()`` output (``In batch formatting``, under + ``Batch iteration time breakdown``). If this time is significantly larger than the + time spent in other steps, decrease ``local_shuffle_buffer_size`` or turn off the local + shuffle buffer altogether and only :ref:`shuffle the ordering of files `. + +Shuffle all rows +~~~~~~~~~~~~~~~~ + +To randomly shuffle all rows globally, call :meth:`~ray.data.Dataset.random_shuffle`. +This is the slowest option for shuffle, and requires transferring data across +network between workers. This option achieves the best randomness among all options. + +.. testcode:: + + import ray + + ds = ( + ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple") + .random_shuffle() + ) + +.. _optimizing_shuffles: + +Advanced: Optimizing shuffles +============================= +.. note:: This is an active area of development. If your Dataset uses a shuffle operation and you are having trouble configuring shuffle, + `file a Ray Data issue on GitHub `_. + +When should you use global per-epoch shuffling? +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use global per-epoch shuffling only if your model is sensitive to the +randomness of the training data. Based on a +`theoretical foundation `__, all +gradient-descent-based model trainers benefit from improved (global) shuffle quality. +In practice, the benefit is particularly pronounced for tabular data/models. +However, the more global the shuffle is, the more expensive the shuffling operation. +The increase compounds with distributed data-parallel training on a multi-node cluster due +to data transfer costs. This cost can be prohibitive when using very large datasets. + +The best route for determining the best tradeoff between preprocessing time and cost and +per-epoch shuffle quality is to measure the precision gain per training step for your +particular model under different shuffling policies: + +* no shuffling, +* local (per-shard) limited-memory shuffle buffer, +* local (per-shard) shuffling, +* windowed (pseudo-global) shuffling, and +* fully global shuffling. + +As long as your data loading and shuffling throughput is higher than your training throughput, your GPU should +be saturated. If you have shuffle-sensitive models, push the +shuffle quality higher until this threshold is hit. + +.. _shuffle_performance_tips: + +Enabling push-based shuffle +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Some Dataset operations require a *shuffle* operation, meaning that data is shuffled from all of the input partitions to all of the output partitions. +These operations include :meth:`Dataset.random_shuffle `, +:meth:`Dataset.sort ` and :meth:`Dataset.groupby `. +For example, during a sort operation, data is reordered between blocks and therefore requires shuffling across partitions. +Shuffling can be challenging to scale to large data sizes and clusters, especially when the total dataset size can't fit into memory. + +Ray Data provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance. +Try this out if your dataset has more than 1000 blocks or is larger than 1 TB in size. + +To try this out locally or on a cluster, you can start with the `nightly release test `_ that Ray runs for :meth:`Dataset.random_shuffle ` and :meth:`Dataset.sort `. +To get an idea of the performance you can expect, here are some run time results for :meth:`Dataset.random_shuffle ` on 1-10 TB of data on 20 machines (m5.4xlarge instances on AWS EC2, each with 16 vCPUs, 64 GB RAM). + +.. image:: https://docs.google.com/spreadsheets/d/e/2PACX-1vQvBWpdxHsW0-loasJsBpdarAixb7rjoo-lTgikghfCeKPQtjQDDo2fY51Yc1B6k_S4bnYEoChmFrH2/pubchart?oid=598567373&format=image + :align: center + +To try out push-based shuffle, set the environment variable ``RAY_DATA_PUSH_BASED_SHUFFLE=1`` when running your application: + +.. code-block:: bash + + $ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py + $ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7 + + # Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total + # [dataset]: Run `pip install tqdm` to enable progress reporting. + # 2022-05-04 17:30:28,806 INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle. + # Finished in 9.571171760559082 + # ... + +You can also specify the shuffle implementation during program execution by +setting the ``DataContext.use_push_based_shuffle`` flag: + +.. testcode:: + :hide: + + import ray + ray.shutdown() + +.. testcode:: + + import ray + + ctx = ray.data.DataContext.get_current() + ctx.use_push_based_shuffle = True + + ds = ( + ray.data.range(1000) + .random_shuffle() + ) + +Large-scale shuffles can take a while to finish. +For debugging purposes, shuffle operations support executing only part of the shuffle, so that you can collect an execution profile more quickly. +Here is an example that shows how to limit a random shuffle operation to two output blocks: + +.. testcode:: + :hide: + + import ray + ray.shutdown() + +.. testcode:: + + import ray + + ctx = ray.data.DataContext.get_current() + ctx.set_config( + "debug_limit_shuffle_execution_to_num_blocks", 2 + ) + + ds = ( + ray.data.range(1000, override_num_blocks=10) + .random_shuffle() + .materialize() + ) + print(ds.stats()) + +.. testoutput:: + :options: +MOCK + + Operator 1 ReadRange->RandomShuffle: executed in 0.08s + + Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed + ... \ No newline at end of file diff --git a/doc/source/data/transforming-data.rst b/doc/source/data/transforming-data.rst index 228780b8627b..599e30b27b8a 100644 --- a/doc/source/data/transforming-data.rst +++ b/doc/source/data/transforming-data.rst @@ -16,7 +16,6 @@ This guide shows you how to: * :ref:`Transform batches ` * :ref:`Stateful transforms ` * :ref:`Groupby and transform groups ` -* :ref:`Shuffle data ` * :ref:`Repartition data ` .. _transforming_rows: @@ -162,7 +161,7 @@ program might run out of memory. If you encounter an out-of-memory error, decrea .. _stateful_transforms: Stateful Transforms -============================== +=================== If your transform requires expensive setup such as downloading model weights, use a callable Python class instead of a function to make the transform stateful. When a Python class @@ -305,92 +304,6 @@ To transform groups, call :meth:`~ray.data.Dataset.groupby` to group rows. Then, .map_groups(normalize_features) ) -.. _shuffling_data: - -Shuffling data -============== - -.. _shuffling_file_order: - -Shuffle the ordering of files -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To randomly shuffle the ordering of input files before reading, call a function like -:func:`~ray.data.read_images` and specify ``shuffle="files"``. This randomly assigns -input files to workers for reading. - -.. testcode:: - - import ray - - ds = ray.data.read_images( - "s3://anonymous@ray-example-data/image-datasets/simple", - shuffle="files", - ) - -.. tip:: - - This is the fastest option for shuffle, and is a purely metadata operation. This - option doesn't shuffle the actual rows inside files, so the randomness might be - poor if each file has many rows. - -.. _local_shuffle_buffer: - -Local shuffle when iterating over batches -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To locally shuffle a subset of rows, call a function like :meth:`~ray.data.Dataset.iter_batches` -and specify `local_shuffle_buffer_size`. This shuffles the rows up to a provided buffer -size during iteration. See more details in -:ref:`Iterating over batches with shuffling `. - -.. testcode:: - - import ray - - ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple") - - for batch in ds.iter_batches( - batch_size=2, - batch_format="numpy", - local_shuffle_buffer_size=250, - ): - print(batch) - -.. tip:: - - This is slower than shuffling ordering of files, and shuffles rows locally without - network transfer. This local shuffle buffer can be used together with shuffling - ordering of files; see :ref:`Shuffle the ordering of files `. - - If you observe reduced throughput when using ``local_shuffle_buffer_size``; - one way to diagnose this is to check the total time spent in batch creation by - examining the ``ds.stats()`` output (``In batch formatting``, under - ``Batch iteration time breakdown``). - - If this time is significantly larger than the - time spent in other steps, one way to improve performance is to decrease - ``local_shuffle_buffer_size`` or turn off the local shuffle buffer altogether and only :ref:`shuffle the ordering of files `. - -Shuffle all rows -~~~~~~~~~~~~~~~~ - -To randomly shuffle all rows globally, call :meth:`~ray.data.Dataset.random_shuffle`. - -.. testcode:: - - import ray - - ds = ( - ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple") - .random_shuffle() - ) - -.. tip:: - - This is the slowest option for shuffle, and requires transferring data across - network between workers. This option achieves the best randomness among all options. - .. _repartitioning_data: Repartitioning data diff --git a/doc/source/data/user-guide.rst b/doc/source/data/user-guide.rst index d2052e75484c..61910c8e3299 100644 --- a/doc/source/data/user-guide.rst +++ b/doc/source/data/user-guide.rst @@ -16,6 +16,7 @@ show you how achieve several tasks. transforming-data inspecting-data iterating-over-data + shuffling-data saving-data working-with-images working-with-text