diff --git a/doc/source/data/advanced-pipelines.rst b/doc/source/data/advanced-pipelines.rst index 7b5f21ce4474..591fc604f245 100644 --- a/doc/source/data/advanced-pipelines.rst +++ b/doc/source/data/advanced-pipelines.rst @@ -140,6 +140,14 @@ Transformations made prior to the Dataset prior to the call to ``.repeat()`` are For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline. +.. note:: + Global per-epoch shuffling is an expensive operation that will slow down your ML + ingest pipeline, prevents you from using a fully-streaming ML ingest pipeline, and + can cause large increases in memory utilization and spilling to disk; only use + global per-epoch shuffling if your model benefits from it! If your model doesn't + benefit from global per-epoch shuffling and/or you run into performance or stability + issues, you should try out windowed or local per-epoch shuffling. + **Code**: .. code-block:: python diff --git a/doc/source/data/creating-datasets.rst b/doc/source/data/creating-datasets.rst index 9f774b3fd7f5..eab8b5dbf890 100644 --- a/doc/source/data/creating-datasets.rst +++ b/doc/source/data/creating-datasets.rst @@ -303,6 +303,8 @@ This default parallelism can be overridden via the ``parallelism`` argument; see :ref:`performance guide ` for tips on how to tune this read parallelism. +.. _dataset_deferred_reading: + Deferred Read Task Execution ============================ diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 25681554de4e..e537fd626c43 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -57,6 +57,122 @@ Dataset Execution Model This page overviews the execution model of Datasets, which may be useful for understanding and tuning performance. +By default, all Datasets operations are eager (except for data reading, which is +semi-lazy; see the :ref:`deferred reading docs `), executing +each stage synchronously. This provides a simpler iterative development and debugging +experience, allowing you to inspect up-to-date metadata (schema, row count, etc.) after +each operation, greatly improving the typical "Getting Started" experience. + +However, this eager execution mode can result in less optimal (i.e. slower) execution +and increased memory utilization compared to what's possible with a lazy execution mode. +That's why Datasets offers a lazy execution mode, which you can transition to after +you're done prototyping your Datasets pipeline. + +Lazy Execution Mode +=================== + +Lazy execution mode can be enabled by calling ``ds = ds.experimental_lazy()``, which +returns a dataset whose all subsequent operations will be **lazy**. These operations +won't be executed until the dataset is consumed (e.g. via +:meth:`ds.take() `, +:meth:`ds.iter_batches() `, +:meth:`ds.to_torch() `, etc.) or if +:meth:`ds.fully_executed() ` is called to manually +trigger execution. + +The big optimizations that lazy execution enables are **automatic stage fusion** and +**block move semantics**. + +Automatic Stage Fusion +~~~~~~~~~~~~~~~~~~~~~~ + +Automatic fusion of stages/operations can significantly lower the Ray task overhead of +Datasets workloads, since a chain of reading and many map-like transformations will be +condensed into a single stage of Ray tasks; this results in less data needing to be put +into Ray's object store and transferred across nodes, and therefore resulting in lower +memory utilization and faster task execution. + +Datasets will automatically fuse together lazy operations that are compatible: + +* Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle +* Same compute strategy: Ray tasks vs Ray actors +* Same resource specification, e.g. ``num_cpus`` or ``num_cpus`` requests + +Read and subsequent map-like transformations +(e.g. :meth:`ds.map_batches() `, +:meth:`ds.filter() `, etc.) will usually be fused together. +All-to-all transformations such as +:meth:`ds.random_shuffle() ` can be fused with earlier +map-like stages, but not later stages. + +.. note:: + + For eager mode Datasets, reads are semi-lazy, so the transformation stage right after + the read stage (that triggers the full data read) will fuse with the read stage. Note + that this currently incurs re-reading of any already-read blocks (a fix for this is + currently in progress.) + + +You can tell if stage fusion is enabled by checking the :ref:`Dataset stats ` and looking for fused stages (e.g., ``read->map_batches``). + +.. code-block:: + + Stage N read->map_batches->shuffle_map: N/N blocks executed in T + * Remote wall time: T min, T max, T mean, T total + * Remote cpu time: T min, T max, T mean, T total + * Output num rows: N min, N max, N mean, N total + +Block Move Semantics +~~~~~~~~~~~~~~~~~~~~ + +In addition to fusing together stages, lazy execution mode further optimizes memory +utilization by eagerly releasing the data produced by intermediate operations in a +chain. + +For example, if you have a chain of ``read() -> map() -> filter()`` operations: + +.. code-block:: + + ds = ds.read_parquet().experimental_lazy().map(udf).filter(filter_udf) + +that, for the sake of this example, aren't fused together, Datasets can eagerly release +the outputs of the ``read()`` stage and the ``map()`` stage before the subsequent stage +(``map()`` and ``filter()``, respectively) have finished. This was not possible in eager +mode, since every operation materialized the data and returned the references back to +the user. But in lazy execution mode, we know that the outputs of the ``read()`` and +``map()`` stages are only going to be used by the downstream stages, so we can more +aggressively release them. + + +Dataset Pipelines Execution Model +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To avoid unnecessary data movement in the distributed setting, +:class:`DatasetPipelines ` will always use +these lazy execution optimizations (stage fusion and block move semantics) +under-the-hood. Because a ``DatasetPipeline`` doesn't support creating more than one +``DatasetPipeline`` from a ``DatasetPipeline`` (i.e. no fan-out), we can clear block +data extra aggressively. + +.. note:: + + When creating a pipeline (i.e. calling :meth:`ds.window() ` + or :meth:`ds.repeat() `) immediately after a read stage, any + already read data will be dropped, and the read stage will be absorbed into the + pipeline and be made fully lazy. This allows you to easily create ML ingest pipelines + that re-read data from storage on every epoch, as well as streaming batch inference + pipelines that window all the way down to the file reading. + + .. code-block:: + + # ML ingest re-reading from storage on every epoch. + ray.data.read_parquet().repeat().random_shuffle().to_torch() + + # Streaming batch inference pipeline that pipelines the transforming of a single + # file with the reading of a single file (at most 2 file's worth of data in-flight + # at a time). + ray.data.read_parquet().window(blocks_per_window=1).map_batches(udf) + Reading Data ============ @@ -75,6 +191,8 @@ In the common case, each read task produces a single output block. Read tasks ma Block splitting is off by default. See the :ref:`performance section ` on how to enable block splitting (beta). +.. _dataset_defeferred_reading: + Deferred Read Task Execution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -107,6 +225,50 @@ You can also change the partitioning of a Dataset using ``.random_shuffle`` or ` .. https://docs.google.com/drawings/d/132jhE3KXZsf29ho1yUdPrCHB9uheHBWHJhDQMXqIVPA/edit +Resource Allocation Model +========================= + +Unlike other libraries in Ray's ML ecosystem, such as Tune and Train, Datasets does not +natively use placement groups to allocate resources for Datasets workloads (tasks and +actor pools). Instead, Datasets makes plain CPU/GPU resource requests to the cluster, +and in order to not compete with Tune/Train for resources within those library's +placement groups, Datasets **escapes placement groups by default**. Any Datasets +tasks launched from within a placement group will be executed outside of that placement +group by default. This can be thought of as Datasets requesting resources from the +margins of the cluster, outside of those ML library placement groups. + +Although this is the default behavior, you can force all Datasets workloads to be +schedule without a placement group by specifying a placement group as the global +scheduling strategy for all Datasets tasks/actors, using the global +:class:`DatasetContext `: + +.. code-block:: + + import ray + from ray.data.context import DatasetContext + from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + + # Create a single-CPU local cluster. + ray.init(num_cpus=1) + ctx = DatasetContext.get_current() + # Create a placement group that takes up the single core on the cluster. + placement_group = ray.util.placement_group( + name="core_hog", + strategy="SPREAD", + bundles=[ + {"CPU": 1}, + ], + ) + ray.get(placement_group.ready()) + + # Tell Datasets to use the placement group for all Datasets tasks. + ctx.scheduling_strategy = PlacementGroupSchedulingStrategy(placement_group) + # This Dataset workload will use that placement group for all read and map tasks. + ds = ray.data.range(100, parallelism=2) \ + .map(lambda x: x + 1) + + assert ds.take_all() == list(range(1, 101)) + Memory Management ================= @@ -142,17 +304,3 @@ Datasets uses the Ray object store to store data blocks, which means it inherits **Reference Counting**: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object. **Load Balancing**: Datasets uses Ray scheduling hints to spread read tasks out across the cluster to balance memory usage. - -Stage Fusion Optimization -========================= - -To avoid unnecessary data movement in the distributed setting, Dataset pipelines will *fuse* compatible stages (i.e., stages with the same compute strategy and resource specifications). Read and map-like stages are always fused if possible. All-to-all dataset transformations such as ``random_shuffle`` can be fused with earlier map-like stages, but not later stages. For Datasets, only read stages are fused. This is since non-pipelined Datasets are eagerly executed except for their initial read stage. - -You can tell if stage fusion is enabled by checking the :ref:`Dataset stats ` and looking for fused stages (e.g., ``read->map_batches``). - -.. code-block:: - - Stage N read->map_batches->shuffle_map: N/N blocks executed in T - * Remote wall time: T min, T max, T mean, T total - * Remote cpu time: T min, T max, T mean, T total - * Output num rows: N min, N max, N mean, N total