Skip to content

Commit

Permalink
Merge branch 'releases/2.4.0' of https://github.com/ray-project/ray i…
Browse files Browse the repository at this point in the history
…nto releases/2.4.0
  • Loading branch information
jianoaix committed Apr 12, 2023
2 parents 0838993 + 127e007 commit a9e83ad
Show file tree
Hide file tree
Showing 87 changed files with 1,128 additions and 449 deletions.
4 changes: 4 additions & 0 deletions dashboard/client/src/pages/metrics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ const METRICS_CONFIG: MetricsSectionConfig[] = [
title: "Active Actors by Name",
pathParams: "orgId=1&theme=light&panelId=36",
},
{
title: "Out of Memory Failures by Name",
pathParams: "orgId=1&theme=light&panelId=44",
},
],
},
{
Expand Down
12 changes: 12 additions & 0 deletions dashboard/modules/metrics/dashboards/default_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ def max_plus_pending(max_resource, pending_resource):
),
],
),
Panel(
id=44,
title="Node Out of Memory Failures by Name",
description="The number of tasks and actors killed by the Ray Out of Memory killer due to high memory pressure. Metrics are broken down by IP and the name. https://docs.ray.io/en/master/ray-core/scheduling/ray-oom-prevention.html.",
unit="failures",
targets=[
Target(
expr='ray_memory_manager_worker_eviction_total{{instance=~"$Instance",{global_filters}}}',
legend="OOM Killed: {{Name}}, {{instance}}",
),
],
),
Panel(
id=34,
title="Node Memory by Component",
Expand Down
16 changes: 14 additions & 2 deletions doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ I/O and Conversion
Dataset.write_csv
Dataset.write_numpy
Dataset.write_tfrecords
Dataset.write_webdataset
Dataset.write_mongo
Dataset.write_datasource
Dataset.to_torch
Expand Down Expand Up @@ -138,8 +139,7 @@ Execution
.. autosummary::
:toctree: doc/

Dataset.cache
Dataset.is_cached
Dataset.materialize

Serialization
-------------
Expand All @@ -150,3 +150,15 @@ Serialization
Dataset.has_serializable_lineage
Dataset.serialize_lineage
Dataset.deserialize_lineage

Internals
---------

.. autosummary::
:toctree: doc/

Dataset.__init__
Dataset.dataset_format
Dataset.fully_executed
Dataset.is_fully_executed
Dataset.lazy
2 changes: 2 additions & 0 deletions doc/source/data/api/execution_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Constructor

.. autosummary::
:toctree: doc/
:template: autosummary/class_without_autosummary.rst

ExecutionOptions

Expand All @@ -18,5 +19,6 @@ Resource Options

.. autosummary::
:toctree: doc/
:template: autosummary/class_without_autosummary.rst

ExecutionResources
2 changes: 1 addition & 1 deletion doc/source/data/creating-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -884,4 +884,4 @@ inspection functions like :meth:`ds.schema() <ray.data.Dataset.schema>` and
:meth:`ds.show() <ray.data.Dataset.show>` will trigger execution of only one or some
tasks, instead of all tasks. This allows metadata to be inspected right away. Execution
of all read tasks can be triggered manually using the
:meth:`ds.cache() <ray.data.Dataset.cache>` API.
:meth:`ds.materialize() <ray.data.Dataset.materialize>` API.
133 changes: 131 additions & 2 deletions doc/source/data/dataset-internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,20 @@ This should be considered for advanced use cases to improve performance predicta
Execution
=========

This section covers the Datasets execution model and performance considerations.
The Datasets execution by default is:

- **Lazy**: This means that transformations on Dataset are not executed until a
consumption operation (e.g. :meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`)
or :meth:`Dataset.materialize() <ray.data.Dataset.materialize>` is called. This creates
opportunities for optimizing the execution plan (e.g. :ref:`stage fusion <datasets_stage_fusion>`).
- **Pipelined**: This means that Dataset transformations will be executed in a
streaming way, incrementally on the base data, instead of on all of the data
at once, and overlapping the execution of operations. This can be used for streaming
data loading into ML training to overlap the data preprocessing and model training,
or to execute batch transformations on large datasets without needing to load the
entire dataset into cluster memory.

.. _datasets_lazy_execution:

Lazy Execution
~~~~~~~~~~~~~~
Expand All @@ -75,7 +88,7 @@ to stage fusion optimizations and aggressive garbage collection of intermediate
Dataset creation and transformation APIs are lazy, with execution only triggered via "sink"
APIs, such as consuming (:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`),
writing (:meth:`ds.write_parquet() <ray.data.Dataset.write_parquet>`), or manually triggering via
:meth:`ds.cache() <ray.data.Dataset.cache>`. There are a few
:meth:`ds.materialize() <ray.data.Dataset.materialize>`. There are a few
exceptions to this rule, where transformations such as :meth:`ds.union()
<ray.data.Dataset.union>` and
:meth:`ds.limit() <ray.data.Dataset.limit>` trigger execution; we plan to make these
Expand All @@ -85,6 +98,122 @@ Check the API docs for Datasets methods to see if they
trigger execution. Those that do trigger execution will have a ``Note`` indicating as
much.

.. _datasets_streaming_execution:

Streaming Execution
~~~~~~~~~~~~~~~~~~~

The following code is a hello world example which invokes the execution with
:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>` consumption. We will also enable verbose progress reporting, which shows per-operator progress in addition to overall progress.

.. code-block::
import ray
import time
# Enable verbose reporting. This can also be toggled on by setting
# the environment variable RAY_DATA_VERBOSE_PROGRESS=1.
ctx = ray.data.DatasetContext.get_current()
ctx.execution_options.verbose_progress = True
def sleep(x):
time.sleep(0.1)
return x
for _ in (
ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200)
.map_batches(sleep, num_cpus=2)
.map_batches(sleep, compute=ray.data.ActorPoolStrategy(2, 4))
.map_batches(sleep, num_cpus=1)
.iter_batches()
):
pass
This launches a simple 4-stage pipeline. We use different compute args for each stage, which forces them to be run as separate operators instead of getting fused together. You should see a log message indicating streaming execution is being used:

.. code-block::
2023-03-30 16:40:10,076 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(sleep)] -> ActorPoolMapOperator[MapBatches(sleep)] -> TaskPoolMapOperator[MapBatches(sleep)]
The next few lines will show execution progress. Here is how to interpret the output:

.. code-block::
Running: 7.0/16.0 CPU, 0.0/0.0 GPU, 76.91 MiB/2.25 GiB object_store_memory 65%|██▊ | 130/200 [00:08<00:02, 22.52it/s]
This line tells you how many resources are currently being used by the streaming executor out of the limits, as well as the number of completed output blocks. The streaming executor will attempt to keep resource usage under the printed limits by throttling task executions.

.. code-block::
ReadRange: 2 active, 37 queued, 7.32 MiB objects 1: 80%|████████▊ | 161/200 [00:08<00:02, 17.81it/s]
MapBatches(sleep): 5 active, 5 queued, 18.31 MiB objects 2: 76%|██▎| 151/200 [00:08<00:02, 19.93it/s]
MapBatches(sleep): 7 active, 2 queued, 25.64 MiB objects, 2 actors [all objects local] 3: 71%|▋| 142/
MapBatches(sleep): 2 active, 0 queued, 7.32 MiB objects 4: 70%|██▊ | 139/200 [00:08<00:02, 23.16it/s]
These lines are only shown when verbose progress reporting is enabled. The `active` count indicates the number of running tasks for the operator. The `queued` count is the number of input blocks for the operator that are computed but are not yet submitted for execution. For operators that use actor-pool execution, the number of running actors is shown as `actors`.

.. tip::

Avoid returning large outputs from the final operation of a pipeline you are iterating over, since the consumer process will be a serial bottleneck.

Configuring Resources and Locality
----------------------------------

By default, the CPU and GPU limits are set to the cluster size, and the object store memory limit conservatively to 1/4 of the total object store size to avoid the possibility of disk spilling.

You may want to customize these limits in the following scenarios:
- If running multiple concurrent jobs on the cluster, setting lower limits can avoid resource contention between the jobs.
- If you want to fine-tune the memory limit to maximize performance.
- For data loading into training jobs, you may want to set the object store memory to a low value (e.g., 2GB) to limit resource usage.

Execution options can be configured via the global DatasetContext. The options will be applied for future jobs launched in the process:

.. code-block::
ctx = ray.data.DatasetContext.get_current()
ctx.execution_options.resource_limits.cpu = 10
ctx.execution_options.resource_limits.gpu = 5
ctx.execution_options.resource_limits.object_store_memory = 10e9
Deterministic Execution
-----------------------

.. code-block::
# By default, this is set to False.
ctx.execution_options.preserve_order = True
To enable deterministic execution, set the above to True. This may decrease performance, but will ensure block ordering is preserved through execution. This flag defaults to False.

Actor Locality Optimization (ML inference use case)
---------------------------------------------------

.. code-block::
# By default, this is set to True already.
ctx.execution_options.actor_locality_enabled = True
The actor locality optimization (if you're using actor pools) tries to schedule objects that are already local to an actor's node to the same actor. This reduces network traffic across nodes. When actor locality is enabled, you'll see a report in the progress output of the hit rate:

.. code-block::
MapBatches(Model): 0 active, 0 queued, 0 actors [992 locality hits, 8 misses]: 100%|██████████| 1000/1000 [00:59<00:00, 16.84it/s]
Locality with Output (ML ingest use case)
-----------------------------------------

.. code-block::
ctx.execution_options.locality_with_output = True
Setting this to True tells Datasets to prefer placing operator tasks onto the consumer node in the cluster, rather than spreading them evenly across the cluster. This can be useful if you know you'll be consuming the output data directly on the consumer node (i.e., for ML training ingest). However, this may incur a performance penalty for other use cases.

Scalability
-----------
We expect the data streaming backend to scale to tens of thousands of files / blocks and up to hundreds of terabytes of data. Please report if you experience performance degradation at these scales, we would be very interested to investigate!

.. _datasets_stage_fusion:

Stage Fusion Optimization
~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/doc_code/creating_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@
"example://iris.parquet",
columns=["sepal.length", "variety"],
filter=pa.dataset.field("sepal.length") > 5.0,
).cache() # Force a full read of the file.
).materialize() # Force a full read of the file.
# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string})

ds.show(2)
Expand Down
12 changes: 6 additions & 6 deletions doc/source/data/doc_code/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def single_col_udf(batch: pd.DataFrame) -> pd.DataFrame:


ds.map_batches(single_col_udf)
ds.cache()
ds.materialize()
# -> Dataset(num_blocks=17, num_rows=1000,
# schema={__value__: TensorDtype(shape=(128, 128, 3), dtype=int64)})
# __create_pandas_end__
Expand All @@ -74,7 +74,7 @@ def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame:


ds.map_batches(multi_col_udf)
ds.cache()
ds.materialize()
# -> Dataset(num_blocks=17, num_rows=1000,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=int64),
# embed: TensorDtype(shape=(256,), dtype=uint8)})
Expand Down Expand Up @@ -156,7 +156,7 @@ def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame:
# two: extension<arrow.py_extension_type<ArrowTensorType>>
# __create_parquet_2_end__

ds.cache()
ds.materialize()
shutil.rmtree(path)

# __create_parquet_3_begin__
Expand Down Expand Up @@ -193,7 +193,7 @@ def cast_udf(block: pa.Table) -> pa.Table:
# -> one: int64
# two: extension<arrow.py_extension_type<ArrowTensorType>>
# __create_parquet_3_end__
ds.cache()
ds.materialize()

# __create_images_begin__
ds = ray.data.read_images("example://image-datasets/simple")
Expand Down Expand Up @@ -449,7 +449,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:
# __consume_numpy_2_end__


ds.cache()
ds.materialize()
shutil.rmtree("/tmp/some_path")

# __write_1_begin__
Expand All @@ -468,7 +468,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:
# label: string
# __write_1_end__

read_ds.cache()
read_ds.materialize()
shutil.rmtree("/tmp/some_path")

# __write_2_begin__
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/examples/advanced-pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This page covers more advanced examples for dataset pipelines.
Pre-repeat vs post-repeat transforms
====================================

Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.cache()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset.
Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.materialize()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset.

For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline. However, if we omitted the map transformation, then the pipeline would re-read from the base data on each repetition.

Expand Down Expand Up @@ -50,7 +50,7 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc

.. important::

Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.cache().repeat()``.
Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.materialize().repeat()``.

Changing Pipeline Structure
===========================
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/examples/nyc_taxi_basic_processing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@
}
],
"source": [
"ds.cache().size_bytes()"
"ds.materialize().size_bytes()"
]
},
{
Expand Down Expand Up @@ -654,7 +654,7 @@
")\n",
"\n",
"# Force full execution of both of the file reads.\n",
"pushdown_ds = pushdown_ds.cache()\n",
"pushdown_ds = pushdown_ds.materialize()\n",
"pushdown_ds"
]
},
Expand Down
5 changes: 4 additions & 1 deletion doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ Execution mode
==============

Most transformations are lazy. They don't execute until you consume a dataset or call
:meth:`Dataset.cache() <ray.data.Dataset.cache>`.
:meth:`Dataset.materialize() <ray.data.Dataset.materialize>`.

The transformations are executed in a streaming way, incrementally on the data and
with operators processed in parallel, see :ref:`Streaming Execution <datasets_streaming_execution>`.

For an in-depth guide on Datasets execution, read :ref:`Execution <datasets_execution>`.

Expand Down
7 changes: 7 additions & 0 deletions doc/source/data/pipelining-compute.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
.. _pipelining_datasets:

.. note::

The DatasetPipeline is expected to be deprecated in Ray 2.5. If your use case doesn't
need per-window shuffle, we recommend using just plain Datasets, which supports the
streaming execution by default in Ray 2.4. For more detail, see
:ref:`Streaming Execution <datasets_streaming_execution>`.

==================
Pipelining Compute
==================
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ aggregation has been computed.
for x in range(10)])
# Group by the A column and calculate the per-group mean for B and C columns.
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).cache()
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).materialize()
# -> Sort Sample: 100%|███████████████████████████████████████| 10/10 [00:01<00:00, 9.04it/s]
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 23.66it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 937.21it/s]
Expand Down Expand Up @@ -542,7 +542,7 @@ with calculated column means.
return df
ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")
ds.cache()
ds.materialize()
# -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 144.79it/s]
# -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: double, C: double})
Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-air/check-ingest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ Performance Tips
Dataset Sharing
~~~~~~~~~~~~~~~

When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.cache()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial.
When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.materialize()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial.


FAQ
Expand Down
Loading

0 comments on commit a9e83ad

Please sign in to comment.