Skip to content

Commit

Permalink
Misc. GA P0s.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed May 18, 2022
1 parent 4444150 commit 1cafa4f
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 14 deletions.
8 changes: 8 additions & 0 deletions doc/source/data/advanced-pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ Transformations made prior to the Dataset prior to the call to ``.repeat()`` are

For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline.

.. note::
Global per-epoch shuffling is an expensive operation that will slow down your ML
ingest pipeline, prevents you from using a fully-streaming ML ingest pipeline, and
can cause large increases in memory utilization and spilling to disk; only use
global per-epoch shuffling if your model benefits from it! If your model doesn't
benefit from global per-epoch shuffling and/or you run into performance or stability
issues, you should try out windowed or local per-epoch shuffling.

**Code**:

.. code-block:: python
Expand Down
2 changes: 2 additions & 0 deletions doc/source/data/creating-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ This default parallelism can be overridden via the ``parallelism`` argument; see
:ref:`performance guide <data_performance_tips>` for tips on how to tune this read
parallelism.

.. _dataset_deferred_reading:

Deferred Read Task Execution
============================

Expand Down
176 changes: 162 additions & 14 deletions doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,122 @@ Dataset Execution Model

This page overviews the execution model of Datasets, which may be useful for understanding and tuning performance.

By default, all Datasets operations are eager (except for data reading, which is
semi-lazy; see the :ref:`deferred reading docs <dataset_deferred_reading>`), executing
each stage synchronously. This provides a simpler iterative development and debugging
experience, allowing you to inspect up-to-date metadata (schema, row count, etc.) after
each operation, greatly improving the typical "Getting Started" experience.

However, this eager execution mode can result in less optimal (i.e. slower) execution
and increased memory utilization compared to what's possible with a lazy execution mode.
That's why Datasets offers a lazy execution mode, which you can transition to after
you're done prototyping your Datasets pipeline.

Lazy Execution Mode
===================

Lazy execution mode can be enabled by calling ``ds = ds.experimental_lazy()``, which
returns a dataset whose all subsequent operations will be **lazy**. These operations
won't be executed until the dataset is consumed (e.g. via
:meth:`ds.take() <ray.data.Dataset.take>`,
:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`,
:meth:`ds.to_torch() <ray.data.Dataset.to_torch>`, etc.) or if
:meth:`ds.fully_executed() <ray.data.Dataset.fully_executed>` is called to manually
trigger execution.

The big optimizations that lazy execution enables are **automatic stage fusion** and
**block move semantics**.

Automatic Stage Fusion
~~~~~~~~~~~~~~~~~~~~~~

Automatic fusion of stages/operations can significantly lower the Ray task overhead of
Datasets workloads, since a chain of reading and many map-like transformations will be
condensed into a single stage of Ray tasks; this results in less data needing to be put
into Ray's object store and transferred across nodes, and therefore resulting in lower
memory utilization and faster task execution.

Datasets will automatically fuse together lazy operations that are compatible:

* Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle
* Same compute strategy: Ray tasks vs Ray actors
* Same resource specification, e.g. ``num_cpus`` or ``num_cpus`` requests

Read and subsequent map-like transformations
(e.g. :meth:`ds.map_batches() <ray.data.Dataset.map_batches>`,
:meth:`ds.filter() <ray.data.Dataset.filter>`, etc.) will usually be fused together.
All-to-all transformations such as
:meth:`ds.random_shuffle() <ray.data.Dataset.random_shuffle>` can be fused with earlier
map-like stages, but not later stages.

.. note::

For eager mode Datasets, reads are semi-lazy, so the transformation stage right after
the read stage (that triggers the full data read) will fuse with the read stage. Note
that this currently incurs re-reading of any already-read blocks (a fix for this is
currently in progress.)


You can tell if stage fusion is enabled by checking the :ref:`Dataset stats <data_performance_tips>` and looking for fused stages (e.g., ``read->map_batches``).

.. code-block::
Stage N read->map_batches->shuffle_map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total
Block Move Semantics
~~~~~~~~~~~~~~~~~~~~

In addition to fusing together stages, lazy execution mode further optimizes memory
utilization by eagerly releasing the data produced by intermediate operations in a
chain.

For example, if you have a chain of ``read() -> map() -> filter()`` operations:

.. code-block::
ds = ds.read_parquet().experimental_lazy().map(udf).filter(filter_udf)
that, for the sake of this example, aren't fused together, Datasets can eagerly release
the outputs of the ``read()`` stage and the ``map()`` stage before the subsequent stage
(``map()`` and ``filter()``, respectively) have finished. This was not possible in eager
mode, since every operation materialized the data and returned the references back to
the user. But in lazy execution mode, we know that the outputs of the ``read()`` and
``map()`` stages are only going to be used by the downstream stages, so we can more
aggressively release them.


Dataset Pipelines Execution Model
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To avoid unnecessary data movement in the distributed setting,
:class:`DatasetPipelines <ray.data.dataset_pipelines.DatasetPipeline>` will always use
these lazy execution optimizations (stage fusion and block move semantics)
under-the-hood. Because a ``DatasetPipeline`` doesn't support creating more than one
``DatasetPipeline`` from a ``DatasetPipeline`` (i.e. no fan-out), we can clear block
data extra aggressively.

.. note::

When creating a pipeline (i.e. calling :meth:`ds.window() <ray.data.Dataset.window>`
or :meth:`ds.repeat() <ray.data.Dataset.repeat>`) immediately after a read stage, any
already read data will be dropped, and the read stage will be absorbed into the
pipeline and be made fully lazy. This allows you to easily create ML ingest pipelines
that re-read data from storage on every epoch, as well as streaming batch inference
pipelines that window all the way down to the file reading.

.. code-block::
# ML ingest re-reading from storage on every epoch.
ray.data.read_parquet().repeat().random_shuffle().to_torch()
# Streaming batch inference pipeline that pipelines the transforming of a single
# file with the reading of a single file (at most 2 file's worth of data in-flight
# at a time).
ray.data.read_parquet().window(blocks_per_window=1).map_batches(udf)
Reading Data
============

Expand All @@ -75,6 +191,8 @@ In the common case, each read task produces a single output block. Read tasks ma

Block splitting is off by default. See the :ref:`performance section <data_performance_tips>` on how to enable block splitting (beta).

.. _dataset_defeferred_reading:

Deferred Read Task Execution
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -107,6 +225,50 @@ You can also change the partitioning of a Dataset using ``.random_shuffle`` or `
..
https://docs.google.com/drawings/d/132jhE3KXZsf29ho1yUdPrCHB9uheHBWHJhDQMXqIVPA/edit
Resource Allocation Model
=========================

Unlike other libraries in Ray's ML ecosystem, such as Tune and Train, Datasets does not
natively use placement groups to allocate resources for Datasets workloads (tasks and
actor pools). Instead, Datasets makes plain CPU/GPU resource requests to the cluster,
and in order to not compete with Tune/Train for resources within those library's
placement groups, Datasets **escapes placement groups by default**. Any Datasets
tasks launched from within a placement group will be executed outside of that placement
group by default. This can be thought of as Datasets requesting resources from the
margins of the cluster, outside of those ML library placement groups.

Although this is the default behavior, you can force all Datasets workloads to be
schedule without a placement group by specifying a placement group as the global
scheduling strategy for all Datasets tasks/actors, using the global
:class:`DatasetContext <ray.data.DatasetContext>`:

.. code-block::
import ray
from ray.data.context import DatasetContext
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Create a single-CPU local cluster.
ray.init(num_cpus=1)
ctx = DatasetContext.get_current()
# Create a placement group that takes up the single core on the cluster.
placement_group = ray.util.placement_group(
name="core_hog",
strategy="SPREAD",
bundles=[
{"CPU": 1},
],
)
ray.get(placement_group.ready())
# Tell Datasets to use the placement group for all Datasets tasks.
ctx.scheduling_strategy = PlacementGroupSchedulingStrategy(placement_group)
# This Dataset workload will use that placement group for all read and map tasks.
ds = ray.data.range(100, parallelism=2) \
.map(lambda x: x + 1)
assert ds.take_all() == list(range(1, 101))
Memory Management
=================

Expand Down Expand Up @@ -142,17 +304,3 @@ Datasets uses the Ray object store to store data blocks, which means it inherits
**Reference Counting**: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object.

**Load Balancing**: Datasets uses Ray scheduling hints to spread read tasks out across the cluster to balance memory usage.

Stage Fusion Optimization
=========================

To avoid unnecessary data movement in the distributed setting, Dataset pipelines will *fuse* compatible stages (i.e., stages with the same compute strategy and resource specifications). Read and map-like stages are always fused if possible. All-to-all dataset transformations such as ``random_shuffle`` can be fused with earlier map-like stages, but not later stages. For Datasets, only read stages are fused. This is since non-pipelined Datasets are eagerly executed except for their initial read stage.

You can tell if stage fusion is enabled by checking the :ref:`Dataset stats <data_performance_tips>` and looking for fused stages (e.g., ``read->map_batches``).

.. code-block::
Stage N read->map_batches->shuffle_map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total

0 comments on commit 1cafa4f

Please sign in to comment.