From 0b6505e8c6b26e2027edfef66f842a898f0f48f0 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Wed, 18 May 2022 16:17:48 -0700 Subject: [PATCH] [Datasets] Miscellaneous GA docs P0s. (#24891) This PR knocks off a few miscellaneous GA docs P0s given in our docs tracker. Namely: - Documents Datasets resource allocation model. - De-emphasizes global/windowed shuffling. - Documents lazy execution mode, and expands our execution model docs in general. --- doc/source/data/advanced-pipelines.rst | 8 ++ doc/source/data/creating-datasets.rst | 2 + doc/source/data/doc_code/key_concepts.py | 69 +++++++++ doc/source/data/key-concepts.rst | 171 +++++++++++++++++++++-- 4 files changed, 237 insertions(+), 13 deletions(-) create mode 100644 doc/source/data/doc_code/key_concepts.py diff --git a/doc/source/data/advanced-pipelines.rst b/doc/source/data/advanced-pipelines.rst index 7b5f21ce4474..591fc604f245 100644 --- a/doc/source/data/advanced-pipelines.rst +++ b/doc/source/data/advanced-pipelines.rst @@ -140,6 +140,14 @@ Transformations made prior to the Dataset prior to the call to ``.repeat()`` are For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline. +.. note:: + Global per-epoch shuffling is an expensive operation that will slow down your ML + ingest pipeline, prevents you from using a fully-streaming ML ingest pipeline, and + can cause large increases in memory utilization and spilling to disk; only use + global per-epoch shuffling if your model benefits from it! If your model doesn't + benefit from global per-epoch shuffling and/or you run into performance or stability + issues, you should try out windowed or local per-epoch shuffling. + **Code**: .. code-block:: python diff --git a/doc/source/data/creating-datasets.rst b/doc/source/data/creating-datasets.rst index 9f774b3fd7f5..eab8b5dbf890 100644 --- a/doc/source/data/creating-datasets.rst +++ b/doc/source/data/creating-datasets.rst @@ -303,6 +303,8 @@ This default parallelism can be overridden via the ``parallelism`` argument; see :ref:`performance guide ` for tips on how to tune this read parallelism. +.. _dataset_deferred_reading: + Deferred Read Task Execution ============================ diff --git a/doc/source/data/doc_code/key_concepts.py b/doc/source/data/doc_code/key_concepts.py new file mode 100644 index 000000000000..8b95c485a7ec --- /dev/null +++ b/doc/source/data/doc_code/key_concepts.py @@ -0,0 +1,69 @@ +# flake8: noqa + +# fmt: off +# __resource_allocation_begin__ +import ray +from ray.data.context import DatasetContext +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + +# Create a single-CPU local cluster. +ray.init(num_cpus=1) +ctx = DatasetContext.get_current() +# Create a placement group that takes up the single core on the cluster. +placement_group = ray.util.placement_group( + name="core_hog", + strategy="SPREAD", + bundles=[ + {"CPU": 1}, + ], +) +ray.get(placement_group.ready()) + +# Tell Datasets to use the placement group for all Datasets tasks. +ctx.scheduling_strategy = PlacementGroupSchedulingStrategy(placement_group) +# This Dataset workload will use that placement group for all read and map tasks. +ds = ray.data.range(100, parallelism=2) \ + .map(lambda x: x + 1) + +assert ds.take_all() == list(range(1, 101)) +# __resource_allocation_end__ +# fmt: on + +# fmt: off +# __block_move_begin__ +import ray +from ray.data.context import DatasetContext + +ctx = DatasetContext.get_current() +ctx.optimize_fuse_stages = False + +def map_udf(df): + df["sepal.area"] = df["sepal.length"] * df["sepal.width"] + return df + +ds = ray.data.read_parquet("example://iris.parquet") \ + .experimental_lazy() \ + .map_batches(map_df) \ + .filter(lambda row: row["sepal.area"] > 15) +# __block_move_end__ +# fmt: on + +# fmt: off +# __dataset_pipelines_execution_begin__ +import ray + +# ML ingest re-reading from storage on every epoch. +torch_ds = ray.data.read_parquet("example://iris.parquet") \ + .repeat() \ + .random_shuffle() \ + .to_torch() + +# Streaming batch inference pipeline that pipelines the transforming of a single +# file with the reading of a single file (at most 2 file's worth of data in-flight +# at a time). +infer_ds = ray.data.read_binary_files("example://mniset_subset_partitioned/") \ + .window(blocks_per_window=1) \ + .map(lambda bytes_: np.asarray(PIL.Image.open(BytesIO(bytes_)).convert("L"))) \ + .map_batches(lambda imgs: [img.mean() > 0.5 for img in imgs]) +# __dataset_pipelines_execution_end__ +# fmt: on diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 25681554de4e..6190d5383ddd 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -36,7 +36,8 @@ This flexibility is a unique characteristic of Ray Datasets. Compared to `Spark RDDs `__ and `Dask Bags `__, Ray Datasets offers a more basic set of features, and executes operations eagerly for simplicity. -It is intended that users cast Datasets into more feature-rich dataframe types (e.g., ``ds.to_dask()``) for advanced operations. +It is intended that users cast Datasets into more feature-rich dataframe types (e.g., +:meth:`ds.to_dask() `) for advanced operations. .. _dataset_pipeline_concept: @@ -51,9 +52,9 @@ A DatasetPipeline is an unified iterator over a (potentially infinite) sequence .. _dataset_execution_concept: ------------------------ -Dataset Execution Model ------------------------ +------------------------ +Datasets Execution Model +------------------------ This page overviews the execution model of Datasets, which may be useful for understanding and tuning performance. @@ -75,16 +76,24 @@ In the common case, each read task produces a single output block. Read tasks ma Block splitting is off by default. See the :ref:`performance section ` on how to enable block splitting (beta). +.. _dataset_defeferred_reading: + Deferred Read Task Execution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -When a Dataset is created using ``ray.data.read_*``, only the first read task will be executed initially. This avoids blocking Dataset creation on the reading of all data files, enabling inspection functions like ``ds.schema()`` and ``ds.show()`` to be used right away. Executing further transformations on the Dataset will trigger execution of all read tasks. - +When a Dataset is created using ``ray.data.read_*``, only the first read task will be +executed initially. This avoids blocking Dataset creation on the reading of all data +files, enabling inspection functions like :meth:`ds.schema() `` +and :meth:`ds.show() ` to be used right away. Executing further +transformations on the Dataset will trigger execution of all read tasks. Dataset Transforms ================== -Datasets use either Ray tasks or Ray actors to transform datasets (i.e., for ``.map``, ``.flat_map``, or ``.map_batches``). By default, tasks are used (``compute="tasks"``). Actors can be specified with ``compute="actors"``, in which case an autoscaling pool of Ray actors will be used to apply transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be re-used. Whichever compute strategy is used, each map task generally takes in one block and produces one or more output blocks. The output block splitting rule is the same as for file reads (blocks are split after hitting the target max block size of 2GiB): +Datasets use either Ray tasks or Ray actors to transform datasets (i.e., for +:meth:`ds.map_batches() `, +:meth:`ds.map() `, or +:meth:`ds.flat_map() `). By default, tasks are used (``compute="tasks"``). Actors can be specified with ``compute="actors"``, in which case an autoscaling pool of Ray actors will be used to apply transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be re-used. Whichever compute strategy is used, each map task generally takes in one block and produces one or more output blocks. The output block splitting rule is the same as for file reads (blocks are split after hitting the target max block size of 2GiB): .. image:: images/dataset-map.svg :width: 650px @@ -96,9 +105,12 @@ Datasets use either Ray tasks or Ray actors to transform datasets (i.e., for ``. Shuffling Data ============== -Certain operations like ``.sort`` and ``.groupby`` require data blocks to be partitioned by value. Datasets executes this in three phases. First, a wave of sampling tasks determines suitable partition boundaries based on a random sample of data. Second, map tasks divide each input block into a number of output blocks equal to the number of reduce tasks. Third, reduce tasks take assigned output blocks from each map task and combines them into one block. Overall, this strategy generates ``O(n^2)`` intermediate objects where ``n`` is the number of input blocks. +Certain operations like :meth:`ds.sort() ` and +:meth:`ds.groupby() ` require data blocks to be partitioned by value. Datasets executes this in three phases. First, a wave of sampling tasks determines suitable partition boundaries based on a random sample of data. Second, map tasks divide each input block into a number of output blocks equal to the number of reduce tasks. Third, reduce tasks take assigned output blocks from each map task and combines them into one block. Overall, this strategy generates ``O(n^2)`` intermediate objects where ``n`` is the number of input blocks. -You can also change the partitioning of a Dataset using ``.random_shuffle`` or ``.repartition``. The former should be used if you want to randomize the order of elements in the dataset. The second should be used if you only want to equalize the size of the Dataset blocks (e.g., after a read or transformation that may skew the distribution of block sizes). Note that repartition has two modes, ``shuffle=False``, which performs the minimal data movement needed to equalize block sizes, and ``shuffle=True``, which performs a full (non-random) distributed shuffle: +You can also change the partitioning of a Dataset using :meth:`ds.random_shuffle() +` or +:meth:`ds.repartition() `. The former should be used if you want to randomize the order of elements in the dataset. The second should be used if you only want to equalize the size of the Dataset blocks (e.g., after a read or transformation that may skew the distribution of block sizes). Note that repartition has two modes, ``shuffle=False``, which performs the minimal data movement needed to equalize block sizes, and ``shuffle=True``, which performs a full (non-random) distributed shuffle: .. image:: images/dataset-shuffle.svg :width: 650px @@ -107,6 +119,29 @@ You can also change the partitioning of a Dataset using ``.random_shuffle`` or ` .. https://docs.google.com/drawings/d/132jhE3KXZsf29ho1yUdPrCHB9uheHBWHJhDQMXqIVPA/edit +Resource Allocation Model +========================= + +Unlike other libraries in Ray's ML ecosystem, such as Tune and Train, Datasets does not +natively use placement groups to allocate resources for Datasets workloads (tasks and +actor pools). Instead, Datasets makes plain CPU/GPU resource requests to the cluster, +and in order to not compete with Tune/Train for resources within those library's +placement groups, Datasets **escapes placement groups by default**. Any Datasets +tasks launched from within a placement group will be executed outside of that placement +group by default. This can be thought of as Datasets requesting resources from the +margins of the cluster, outside of those ML library placement groups. + +Although this is the default behavior, you can force all Datasets workloads to be +schedule without a placement group by specifying a placement group as the global +scheduling strategy for all Datasets tasks/actors, using the global +:class:`DatasetContext `: + + +.. literalinclude:: ./doc_code/key_concepts.py + :language: python + :start-after: __resource_allocation_begin__ + :end-before: __resource_allocation_end__ + Memory Management ================= @@ -117,7 +152,13 @@ Execution Memory During execution, certain types of intermediate data must fit in memory. This includes the input block of a task, as well as at least one of the output blocks of the task (when a task has multiple output blocks, only one needs to fit in memory at any given time). The input block consumes object stored shared memory (Python heap memory for non-Arrow data). The output blocks consume Python heap memory (prior to putting in the object store) as well as object store memory (after being put in the object store). -This means that large block sizes can lead to potential out-of-memory situations. To avoid OOM errors, Datasets can split blocks during map and read tasks into pieces smaller than the target max block size. In some cases, this splitting is not possible (e.g., if a single item in a block is extremely large, or the function given to ``.map_batches`` returns a very large batch). To avoid these issues, make sure no single item in your Datasets is too large, and always call ``.map_batches`` with batch size small enough such that the output batch can comfortably fit into memory. +This means that large block sizes can lead to potential out-of-memory situations. To +avoid OOM errors, Datasets can split blocks during map and read tasks into pieces +smaller than the target max block size. In some cases, this splitting is not possible +(e.g., if a single item in a block is extremely large, or the function given to +:meth:`ds.map_batches() ` returns a very large batch). To +avoid these issues, make sure no single item in your Datasets is too large, and always +call :meth:`ds.map_batches() ` with batch size small enough such that the output batch can comfortably fit into memory. .. note:: @@ -143,10 +184,68 @@ Datasets uses the Ray object store to store data blocks, which means it inherits **Load Balancing**: Datasets uses Ray scheduling hints to spread read tasks out across the cluster to balance memory usage. -Stage Fusion Optimization -========================= +Lazy Execution Mode +=================== + +.. note:: + + Lazy execution mode is experimental. If you run into any issues, please reach + out on `Discourse `__ or open an issue on the + `Ray GitHub repo `__. + +By default, all Datasets operations are eager (except for data reading, which is +semi-lazy; see the :ref:`deferred reading docs `), executing +each stage synchronously. This provides a simpler iterative development and debugging +experience, allowing you to inspect up-to-date metadata (schema, row count, etc.) after +each operation, greatly improving the typical "Getting Started" experience. + +However, this eager execution mode can result in less optimal (i.e. slower) execution +and increased memory utilization compared to what's possible with a lazy execution mode. +That's why Datasets offers a lazy execution mode, which you can transition to after +you're done prototyping your Datasets pipeline. + +Lazy execution mode can be enabled by calling +:meth:`ds = ds.experimental_lazy() `, which +returns a dataset whose all subsequent operations will be **lazy**. These operations +won't be executed until the dataset is consumed (e.g. via +:meth:`ds.take() `, +:meth:`ds.iter_batches() `, +:meth:`ds.to_torch() `, etc.) or if +:meth:`ds.fully_executed() ` is called to manually +trigger execution. + +The big optimizations that lazy execution enables are **automatic stage fusion** and +**block move semantics**. + +Automatic Stage Fusion +~~~~~~~~~~~~~~~~~~~~~~ + +Automatic fusion of stages/operations can significantly lower the Ray task overhead of +Datasets workloads, since a chain of reading and many map-like transformations will be +condensed into a single stage of Ray tasks; this results in less data needing to be put +into Ray's object store and transferred across nodes, and therefore resulting in lower +memory utilization and faster task execution. + +Datasets will automatically fuse together lazy operations that are compatible: + +* Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle +* Same compute strategy: Ray tasks vs Ray actors +* Same resource specification, e.g. ``num_cpus`` or ``num_cpus`` requests + +Read and subsequent map-like transformations +(e.g. :meth:`ds.map_batches() `, +:meth:`ds.filter() `, etc.) will usually be fused together. +All-to-all transformations such as +:meth:`ds.random_shuffle() ` can be fused with earlier +map-like stages, but not later stages. + +.. note:: + + For eager mode Datasets, reads are semi-lazy, so the transformation stage right after + the read stage (that triggers the full data read) will fuse with the read stage. Note + that this currently incurs re-reading of any already-read blocks (a fix for this is + currently in progress.) -To avoid unnecessary data movement in the distributed setting, Dataset pipelines will *fuse* compatible stages (i.e., stages with the same compute strategy and resource specifications). Read and map-like stages are always fused if possible. All-to-all dataset transformations such as ``random_shuffle`` can be fused with earlier map-like stages, but not later stages. For Datasets, only read stages are fused. This is since non-pipelined Datasets are eagerly executed except for their initial read stage. You can tell if stage fusion is enabled by checking the :ref:`Dataset stats ` and looking for fused stages (e.g., ``read->map_batches``). @@ -156,3 +255,49 @@ You can tell if stage fusion is enabled by checking the :ref:`Dataset stats map_batches() -> filter()`` operations: + +.. literalinclude:: ./doc_code/key_concepts.py + :language: python + :start-after: __block_move_begin__ + :end-before: __block_move_end__ + +that, for the sake of this example, aren't fused together, Datasets can eagerly release +the outputs of the ``read_parquet()`` stage and the ``map_batches()`` stage before the +subsequent stage (``map_batches()`` and ``filter()``, respectively) have finished. This +was not possible in eager mode, since every operation materialized the data and returned +the references back to the user. But in lazy execution mode, we know that the outputs of +the ``read_parquet()`` and ``map_batches()`` stages are only going to be used by the +downstream stages, so we can more aggressively release them. + +Dataset Pipelines Execution Model +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To avoid unnecessary data movement in the distributed setting, +:class:`DatasetPipelines ` will always use +these lazy execution optimizations (stage fusion and block move semantics) +under-the-hood. Because a ``DatasetPipeline`` doesn't support creating more than one +``DatasetPipeline`` from a ``DatasetPipeline`` (i.e. no fan-out), we can clear block +data extra aggressively. + +.. note:: + + When creating a pipeline (i.e. calling :meth:`ds.window() ` + or :meth:`ds.repeat() `) immediately after a read stage, any + already read data will be dropped, and the read stage will be absorbed into the + pipeline and be made fully lazy. This allows you to easily create ML ingest pipelines + that re-read data from storage on every epoch, as well as streaming batch inference + pipelines that window all the way down to the file reading. + +.. literalinclude:: ./doc_code/key_concepts.py + :language: python + :start-after: __dataset_pipelines_execution_begin__ + :end-before: __dataset_pipelines_execution_end__