diff --git a/doc/source/_toc.yml b/doc/source/_toc.yml index 475430e48406..ac6bc9b3cb89 100644 --- a/doc/source/_toc.yml +++ b/doc/source/_toc.yml @@ -57,6 +57,8 @@ parts: title: Large-scale ML Ingest - file: data/examples/ocr_example title: Scaling OCR with Ray Datasets + - file: data/advanced-pipelines + - file: data/random-access - file: data/faq - file: data/package-ref - file: data/integrations diff --git a/doc/source/data/accessing-datasets.rst b/doc/source/data/accessing-datasets.rst deleted file mode 100644 index 1c356f9d3537..000000000000 --- a/doc/source/data/accessing-datasets.rst +++ /dev/null @@ -1,196 +0,0 @@ -.. _accessing_datasets: - -=================== -Accessing Datasets -=================== - -The data underlying a ``Dataset`` can be accessed in several ways: - -* Retrieving a limited prefix of rows. -* Iterating over rows and batches. -* Converting into a Torch dataset or a TensorFlow dataset. -* Converting into a RandomAccessDataset for random access (experimental). - -Retrieving limited set of rows -============================== - -A limited set of rows can be retried from a ``Dataset`` via the -:meth:`ds.take() ` API, along with its sibling helper APIs -:meth:`ds.take_all() `, for retrieving **all** rows, and -:meth:`ds.show() `, for printing a limited set of rows. These -methods are convenient for quickly inspecting a subset (prefix) of rows. They have the -benefit that, if used right after reading, they will only trigger more files to be -read if needed to retrieve rows from that file; if inspecting a small prefix of rows, -often only the first file will need to be read. - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __take_begin__ - :end-before: __take_end__ - -Iterating over Datasets -======================= - -Datasets can be consumed a row at a time using the -:meth:`ds.iter_rows() ` API - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __iter_rows_begin__ - :end-before: __iter_rows_end__ - -or a batch at a time using the -:meth:`ds.iter_batches() ` API, where you can specify -batch size as well as the desired batch format. By default, the batch format is -``"native"``, which means that the batch format that's native to the data type will be -returned. For tabular data, the native format is a Pandas DataFrame; for Python objects, -it's a list. - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __iter_batches_begin__ - :end-before: __iter_batches_end__ - - -Datasets can be passed to Ray tasks or actors and accessed by these iteration methods. -This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects: - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __remote_iterators_begin__ - :end-before: __remote_iterators_end__ - -Converting to Torch dataset -=========================== - -For ingestion into one or more Torch trainers, Datasets offers a :meth:`ds.to_torch() -` API that returns a -`Torch IterableDataset `__ -that the Torch trainers can consume. This API takes care of both batching and converting -the underlying Datasets data to Torch tensors, building on top of the -:meth:`ds.iter_batches() ` API. - -.. note:: - - The returned ``torch.utils.data.IterableDataset`` instance should be consumed directly - in your training loop directly; it should **not** be used with the Torch data loader. - Using Torch's data loader isn't necessary because upstream Ray Datasets preprocessing - operations in conjunction with :meth:`ds.to_torch() ` - implements the data loader functionality (shuffling, batching, prefetching, etc.). If - you use the Torch data loader with this ``IterableDataset``, it will perform - inefficient unbatching and rebatching without adding any value. - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __torch_begin__ - :end-before: __torch_end__ - -When performing supervised learning, we'll have both feature columns and a label column -that we may want to split into separate tensors. By informing ``ds.to_torch()`` of the -label column, it will yield ``(features, label)`` tensor pairs for each batch. - -.. note:: - - We set ``unsqueeze_label_tensor=False`` in order to remove a redundant unit column - dimension. E.g., with ``batch_size=2`` and ``unsqueeze_label_tensor=True``, you would - get ``(2, 1)``-shaped label tensor batches instead of the desired ``(2,)`` shape. - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __torch_with_label_begin__ - :end-before: __torch_with_label_end__ - -The types of the label and feature columns will be inferred from the data by default; -these can be overridden with the ``label_column_dtype`` and ``feature_column_dtypes`` -args. - -By default, all feature columns will be concatenated into a single tensor; however, -depending on the structure of the ``feature_columns`` argument, you can also get feature -column batches as a list of tensors or a dict of tensors (with one or more column in -each tensor). See the :meth:`.to_torch() API docs ` for -details. - -.. note:: - - If we have tensor feature columns (where each item in the column is an multi-dimensional - tensor) and any of the feature columns are different shapes, these columns are - incompatible and we will not be able to stack the column tensors into a single tensor. - Instead, we will need to group the columns by compatibility in the ``feature_columns`` - argument. - - Check out the :ref:`tensor data feature guide ` for more - information on how to handle this. - -Converting to TensorFlow dataset -================================ - -For ingestion into one or more TensorFlow trainers, Datasets offers a :meth:`ds.to_tf() -` API that returns a -`tf.data.Dataset `__ -that the TensorFlow trainers can consume. This API takes care of both batching and converting -the underlying Datasets data to TensorFlow tensors, building on top of the -:meth:`ds.iter_batches() ` API. - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __tf_begin__ - :end-before: __tf_end__ - -When performing supervised learning, we'll have both feature columns and a label column -that we may want to split into separate tensors. By informing ``ds.to_tf()`` of the -label column, it will yield ``(features, label)`` tensor pairs for each batch. - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __tf_with_label_begin__ - :end-before: __tf_with_label_end__ - -The types of the label and feature columns will be inferred from the data by default; -these can be overridden with the ``label_column_dtype`` and ``feature_column_dtypes`` -args. - -By default, all feature columns will be concatenated into a single tensor; however, -depending on the structure of the ``feature_columns`` argument, you can also get feature -column batches as a list of tensors or a dict of tensors (with one or more column in -each tensor). See the :meth:`.to_tf() API docs ` for -details. - -.. note:: - - If we have tensor feature columns (where each item in the column is an multi-dimensional - tensor) and any of the feature columns are different shapes, these columns are - incompatible and we will not be able to stack the column tensors into a single tensor. - Instead, we will need to group the columns by compatibility in the ``feature_columns`` - argument. - - Check out the :ref:`tensor data feature guide ` for more - information on how to handle this. - -Splitting Into and Consuming Shards -=================================== - -Datasets can be split up into disjoint sub-datasets, or shards. -Locality-aware splitting is supported if you pass in a list of actor handles to the -:meth:`ds.split() ` function along with the number of desired splits. -This is a common pattern useful for loading and sharding data between distributed training actors: - -.. note:: - - If using :ref:`Ray Train ` for distributed training, you do not need to split the dataset; Ray - Train will automatically do locality-aware splitting into per-trainer shards for you! - -.. literalinclude:: ./doc_code/accessing_datasets.py - :language: python - :start-after: __split_begin__ - :end-before: __split_end__ - -Random Access Datasets (Experimental) -===================================== - -Datasets can be converted to a format that supports efficient random access with -:meth:`ds.to_random_access_dataset() API `, -which partitions the dataset on a sort key and provides random access via distributed -binary search. - -See the :ref:`random access feature guide ` for more -information. diff --git a/doc/source/data/advanced-pipelines.rst b/doc/source/data/advanced-pipelines.rst index 4057ae7d208b..b20b3681023b 100644 --- a/doc/source/data/advanced-pipelines.rst +++ b/doc/source/data/advanced-pipelines.rst @@ -1,64 +1,13 @@ .. _data_pipeline_usage: ------------------------ -Advanced Pipeline Usage ------------------------ +-------------------------- +Advanced Pipeline Examples +-------------------------- -Handling Epochs -=============== - -It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset. -DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments. -Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example: - -.. code-block:: python - - pipe = ray.data.from_items([0, 1, 2, 3, 4]) \ - .repeat(3) \ - .random_shuffle_each_window() - for i, epoch in enumerate(pipe.iter_epochs()): - print("Epoch {}", i) - for row in epoch.iter_rows(): - print(row) - # -> - # Epoch 0 - # 2 - # 1 - # 3 - # 4 - # 0 - # Epoch 1 - # 3 - # 4 - # 0 - # 2 - # 1 - # Epoch 2 - # 3 - # 2 - # 4 - # 1 - # 0 - -Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls. +This page covers more advanced examples for dataset pipelines. .. _dataset-pipeline-per-epoch-shuffle: -Per-Epoch Shuffle Pipeline -========================== -.. tip:: - - If you interested in distributed ingest for deep learning, it is - recommended to use Ray Datasets in conjunction with :ref:`Ray Train `. - See the :ref:`example below` for more info. - -.. - https://docs.google.com/drawings/d/1vWQ-Zfxy2_Gthq8l3KmNsJ7nOCuYUQS9QMZpj5GHYx0/edit - -The other method of creating a pipeline is calling ``.repeat()`` on an existing Dataset. -This creates a DatasetPipeline over an infinite sequence of the same original Dataset. -Readers pulling batches from the pipeline will see the same data blocks repeatedly, which is useful for distributed training. - Pre-repeat vs post-repeat transforms ==================================== @@ -103,79 +52,6 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ``ray.data.read_parquet(...).repeat()``), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.fully_executed().repeat()``. -Splitting pipelines for distributed ingest -========================================== - -Similar to how you can ``.split()`` a Dataset, you can also split a DatasetPipeline with the same method call. This returns a number of DatasetPipeline shards that share a common parent pipeline. Each shard can be passed to a remote task or actor. - -**Code**: - -.. code-block:: python - - # Create a pipeline that loops over its source dataset indefinitely. - pipe: DatasetPipeline = ray.data \ - .read_parquet("s3://bucket/dir") \ - .repeat() \ - .random_shuffle_each_window() - - @ray.remote(num_gpus=1) - class TrainingWorker: - def __init__(self, rank: int, shard: DatasetPipeline): - self.rank = rank - self.shard = shard - ... - - shards: List[DatasetPipeline] = pipe.split(n=3) - workers = [TrainingWorker.remote(rank, s) for rank, s in enumerate(shards)] - ... - - -**Pipeline**: - -.. image:: images/dataset-repeat-2.svg - -.. _dataset-pipeline-ray-train: - -Distributed Ingest with Ray Train -================================= - -Ray Datasets integrates with :ref:`Ray Train `, further simplifying your distributed ingest pipeline. - -Ray Train is a lightweight library for scalable deep learning on Ray. - -1. It allows you to focus on the training logic and automatically handles distributed setup for your framework of choice (PyTorch, Tensorflow, or Horovod). -2. It has out of the box fault-tolerance and elastic training -3. And it comes with support for standard ML tools and features that practitioners love such as checkpointing and logging. - -**Code** - -.. code-block:: python - - def train_func(): - # This is a dummy train function just iterating over the dataset shard. - # You should replace this with your training logic. - shard = ray.train.get_dataset_shard() - for row in shard.iter_rows(): - print(row) - - # Create a pipeline that loops over its source dataset indefinitely. - pipe: DatasetPipeline = ray.data \ - .read_parquet(...) \ - .repeat() \ - .random_shuffle_each_window() - - - # Pass in the pipeline to the Trainer. - # The Trainer will automatically split the DatasetPipeline for you. - trainer = Trainer(num_workers=8, backend="torch") - result = trainer.run( - train_func, - config={"worker_batch_size": 64, "num_epochs": 2}, - dataset=pipe) - -Ray Train is responsible for the orchestration of the training workers and will automatically split the Dataset for you. -See :ref:`the Train User Guide ` for more details. - Changing Pipeline Structure =========================== diff --git a/doc/source/data/consuming-datasets.rst b/doc/source/data/consuming-datasets.rst new file mode 100644 index 000000000000..0b6fb1337be0 --- /dev/null +++ b/doc/source/data/consuming-datasets.rst @@ -0,0 +1,118 @@ +.. _consuming_datasets: + +================== +Consuming Datasets +================== + +The data underlying a ``Dataset`` can be consumed in several ways: + +* Retrieving a limited prefix of rows. +* Iterating over rows and batches. +* Saving to files. + +Retrieving a limited set of rows +================================ + +A limited set of rows can be retried from a ``Dataset`` via the +:meth:`ds.take() ` API, along with its sibling helper APIs +:meth:`ds.take_all() `, for retrieving **all** rows, and +:meth:`ds.show() `, for printing a limited set of rows. These +methods are convenient for quickly inspecting a subset (prefix) of rows. They have the +benefit that, if used right after reading, they will only trigger more files to be +read if needed to retrieve rows from that file; if inspecting a small prefix of rows, +often only the first file will need to be read. + +.. literalinclude:: ./doc_code/accessing_datasets.py + :language: python + :start-after: __take_begin__ + :end-before: __take_end__ + +Iterating over Datasets +======================= + +Datasets can be consumed a row at a time using the +:meth:`ds.iter_rows() ` API + +.. literalinclude:: ./doc_code/accessing_datasets.py + :language: python + :start-after: __iter_rows_begin__ + :end-before: __iter_rows_end__ + +or a batch at a time using the +:meth:`ds.iter_batches() ` API, where you can specify +batch size as well as the desired batch format. By default, the batch format is +``"native"``, which means that the batch format that's native to the data type will be +returned. For tabular data, the native format is a Pandas DataFrame; for Python objects, +it's a list. + +.. literalinclude:: ./doc_code/accessing_datasets.py + :language: python + :start-after: __iter_batches_begin__ + :end-before: __iter_batches_end__ + + +Datasets can be passed to Ray tasks or actors and accessed by these iteration methods. +This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects: + +.. literalinclude:: ./doc_code/accessing_datasets.py + :language: python + :start-after: __remote_iterators_begin__ + :end-before: __remote_iterators_end__ + + +Splitting Into and Consuming Shards +=================================== + +Datasets can be split up into disjoint sub-datasets, or shards. +Locality-aware splitting is supported if you pass in a list of actor handles to the +:meth:`ds.split() ` function along with the number of desired splits. +This is a common pattern useful for loading and sharding data between distributed training actors: + +.. note:: + + If using :ref:`Ray Train ` for distributed training, you do not need to split the dataset; Ray + Train will automatically do locality-aware splitting into per-trainer shards for you! + +.. literalinclude:: ./doc_code/accessing_datasets.py + :language: python + :start-after: __split_begin__ + :end-before: __split_end__ + +.. _saving_datasets: + +=============== +Saving Datasets +=============== + +Datasets can be written to local or remote storage in the desired data format. +The supported formats include Parquet, CSV, JSON, NumPy. To control the number +of output files, you may use :meth:`ds.repartition() ` +to repartition the Dataset before writing out. + +.. tabbed:: Parquet + + .. literalinclude:: ./doc_code/saving_datasets.py + :language: python + :start-after: __write_parquet_begin__ + :end-before: __write_parquet_end__ + +.. tabbed:: CSV + + .. literalinclude:: ./doc_code/saving_datasets.py + :language: python + :start-after: __write_csv_begin__ + :end-before: __write_csv_end__ + +.. tabbed:: JSON + + .. literalinclude:: ./doc_code/saving_datasets.py + :language: python + :start-after: __write_json_begin__ + :end-before: __write_json_end__ + +.. tabbed:: NumPy + + .. literalinclude:: ./doc_code/saving_datasets.py + :language: python + :start-after: __write_numpy_begin__ + :end-before: __write_numpy_end__ diff --git a/doc/source/data/creating-datasets.rst b/doc/source/data/creating-datasets.rst index 54be748d7b82..650be332a11b 100644 --- a/doc/source/data/creating-datasets.rst +++ b/doc/source/data/creating-datasets.rst @@ -4,49 +4,12 @@ Creating Datasets ================= -A :class:`Dataset ` can be created from: +Ray :class:`Datasets ` can be created from: * generated synthetic data, * local and distributed in-memory data, and * local and external storage systems (local disk, cloud storage, HDFS, etc.). -Creation from existing in-memory data is enabled via Datasets' integrations -with familiar single-node data libraries (`Pandas `__, -`NumPy `__, `Arrow `__) and distributed -data processing frameworks (:ref:`Dask `, :ref:`Spark `, -:ref:`Modin `, :ref:`Mars `). Creating datasets from -persistent storage is enabled by Datasets' support for reading many common file -formats (Parquet, CSV, JSON, NPY, text, binary). - -A :class:`Dataset ` can hold plain Python objects (simple datasets), -Arrow records (Arrow datasets), or Pandas records (Pandas datasets). These records are -grouped into one or more data **blocks**, and these blocks can be spread across -multiple Ray nodes. Simple datasets are represented by simple blocks (lists of Python -objects), Arrow datasets are represented by Arrow blocks ( -`Arrow Tables `__ -), and Pandas -datasets are represented by Pandas blocks ( -`Pandas DataFrames `__ -). - -The method of -creating the dataset will determine the format of its internal block representation. - -.. dropdown:: See more about Datasets' internal block representation - - The following details the block representation for each creation method: - - * Reading tabular files (Parquet, CSV, JSON) and converting directly from Arrow produces Arrow datasets. - * Converting from Pandas, Dask, Modin, and Mars produces Pandas datasets. - * Reading NumPy files or converting from NumPy ndarrays produces Arrow datasets. - * Reading text and raw binary files produces simple datasets. - - The following figure visualizes a ``Dataset`` that has three - `Arrow Table `__ - blocks, each block holding 1000 rows: - - .. image:: images/dataset-arch.svg - This guide surveys the many ways to create a ``Dataset``. If none of these meet your needs, please reach out on `Discourse `__ or open a feature request on the `Ray GitHub repo `__, and check out @@ -59,12 +22,6 @@ if you're interested in rolling your own integration! Generating Synthetic Data ------------------------- -Using Datasets with small, generated data is a great way to test out some of Datasets' -features. Each of these synthetic data generators will generate data in Ray tasks that -will execute in parallel and will be load-balanced across your Ray cluster; the -``Dataset`` will hold a set of futures representing the return values of those tasks, -serving as pointers to a collection of distributed data blocks. - .. tabbed:: Int Range Create a ``Dataset`` from a range of integers. @@ -94,181 +51,6 @@ serving as pointers to a collection of distributed data blocks. :start-after: __gen_synth_tensor_range_begin__ :end-before: __gen_synth_tensor_range_end__ -.. _dataset_from_in_memory_data: - ------------------------------------------ -From Local and Distributed In-Memory Data ------------------------------------------ - -Datasets can be constructed from existing in-memory data. In addition to being able to -construct a ``Dataset`` from plain Python objects, Datasets also interoperates with popular -single-node libraries (`Pandas `__, -`NumPy `__, `Arrow `__) as well as -distributed frameworks (:ref:`Dask `, :ref:`Spark `, -:ref:`Modin `, :ref:`Mars `). - -.. _dataset_from_in_memory_data_single_node: - -Integration with Single-Node Data Libraries -=========================================== - -In this section, we demonstrate creating a ``Dataset`` from single-node in-memory data. - -.. tabbed:: Pandas - - Create a ``Dataset`` from a Pandas DataFrame. This constructs a ``Dataset`` - backed by a single Pandas DataFrame block. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_pandas_begin__ - :end-before: __from_pandas_end__ - - We can also build a ``Dataset`` from more than one Pandas DataFrame, where each said - DataFrame will become a block in the ``Dataset``. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_pandas_mult_begin__ - :end-before: __from_pandas_mult_end__ - -.. tabbed:: NumPy - - Create a ``Dataset`` from a NumPy ndarray. This constructs a ``Dataset`` - backed by a single-column Arrow table block; the outer dimension of the ndarray - will be treated as the row dimension, and the column will have name ``"__value__"``. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_numpy_begin__ - :end-before: __from_numpy_end__ - - We can also build a ``Dataset`` from more than one NumPy ndarray, where each said - ndarray will become a single-column Arrow table block in the ``Dataset``. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_numpy_mult_begin__ - :end-before: __from_numpy_mult_end__ - -.. tabbed:: Arrow - - Create a ``Dataset`` from an - `Arrow Table `__. - This constructs a ``Dataset`` backed by a single Arrow ``Table`` block. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_arrow_begin__ - :end-before: __from_arrow_end__ - - We can also build a ``Dataset`` from more than one Arrow Table, where each said - ``Table`` will become a block in the ``Dataset``. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_arrow_mult_begin__ - :end-before: __from_arrow_mult_end__ - -.. tabbed:: Python Objects - - Create a ``Dataset`` from a list of Python objects; since each object in this - particular list is a dictionary, Datasets will treat this list as a list of tabular - records, and will construct an Arrow ``Dataset``. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_items_begin__ - :end-before: __from_items_end__ - -.. _dataset_from_in_memory_data_distributed: - -Integration with Distributed Data Processing Frameworks -======================================================= - -In addition to working with single-node in-memory data, Datasets can be constructed from -distributed (multi-node) in-memory data, interoperating with popular distributed -data processing frameworks such as :ref:`Dask `, :ref:`Spark `, -:ref:`Modin `, and :ref:`Mars `. - -The common paradigm used by -these conversions is to send out Ray tasks converting each Dask/Spark/Modin/Mars -data partition to a format that Datasets can understand (if needed), and using the -futures representing the return value of those conversion tasks as the ``Dataset`` block -futures. If the upstream framework's data partitions are already in a format that -Datasets understands (e.g. Arrow or Pandas), Datasets will elide the conversion task and -will instead reinterpret those data partitions directly as its blocks. - -.. note:: - - These data processing frameworks must be running on Ray in order for these Datasets - integrations to work. See how these frameworks can be run on Ray in our - :ref:`data processing integrations docs `. - -.. tabbed:: Dask - - Create a ``Dataset`` from a - `Dask DataFrame `__. This constructs a - ``Dataset`` backed by the distributed Pandas DataFrame partitions that underly the - Dask DataFrame. - - .. note:: - - This conversion should have near-zero overhead: it involves zero data copying and - zero data movement. Datasets simply reinterprets the existing Dask DataFrame partitions - as Ray Datasets partitions without touching the underlying data. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_dask_begin__ - :end-before: __from_dask_end__ - -.. tabbed:: Spark - - Create a ``Dataset`` from a `Spark DataFrame - `__. - This constructs a ``Dataset`` backed by the distributed Spark DataFrame partitions - that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP) - will save the Spark DataFrame partitions to Ray's object store in the Arrow format, - which Datasets will then interpret as its blocks. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_spark_begin__ - :end-before: __from_spark_end__ - -.. tabbed:: Modin - - Create a ``Dataset`` from a Modin DataFrame. This constructs a ``Dataset`` - backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame. - - .. note:: - - This conversion should have near-zero overhead: it involves zero data copying and - zero data movement. Datasets simply reinterprets the existing Modin DataFrame partitions - as Ray Datasets partitions without touching the underlying data. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_modin_begin__ - :end-before: __from_modin_end__ - -.. tabbed:: Mars - - Create a ``Dataset`` from a Mars DataFrame. This constructs a ``Dataset`` - backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame. - - .. note:: - - This conversion should have near-zero overhead: it involves zero data copying and - zero data movement. Datasets simply reinterprets the existing Mars DataFrame partitions - as Ray Datasets partitions without touching the underlying data. - - .. literalinclude:: ./doc_code/creating_datasets.py - :language: python - :start-after: __from_mars_begin__ - :end-before: __from_mars_end__ - .. _dataset_reading_from_storage: -------------------------- @@ -281,48 +63,10 @@ or remote storage system such as S3, GCS, Azure Blob Storage, or HDFS. Any files can be used to specify file locations, and many common file formats are supported: Parquet, CSV, JSON, NPY, text, binary. -Parallel + Distributed Reading -============================== - Each of these APIs take a path or list of paths to files or directories. Any directories provided will be walked in order to obtain concrete file paths, at which point all files will be read in parallel. -Datasets automatically selects the read ``parallelism`` according to the following procedure: -1. The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. -2. The parallelism is set to the estimated number of CPUs multiplied by 2. If the parallelism is less than 8, it is set to 8. -3. The in-memory data size is estimated. If the parallelism would create in-memory blocks that are larger on average than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. -4. The parallelism is truncated to ``min(num_files, parallelism)``. - -To perform the read, ``parallelism`` parallel read tasks will be -launched, each reading one or more files and each creating a single block of data. -When reading from remote datasources, these parallel read tasks will be spread across -the nodes in your Ray cluster, creating the distributed collection of blocks that makes -up a distributed Ray Dataset. - -.. image:: images/dataset-read.svg - :width: 650px - :align: center - -This default parallelism can be overridden via the ``parallelism`` argument; see the -:ref:`performance guide ` for tips on how to tune this read -parallelism. - -.. _dataset_deferred_reading: - -Deferred Read Task Execution -============================ - -Datasets created via the ``ray.data.read_*()`` APIs are semi-lazy: initially, only the -first read task will be executed. This avoids blocking Dataset creation on the reading -of all data files, enabling inspection functions like -:meth:`ds.schema() ` and -:meth:`ds.show() ` to be used right away. Executing further -transformations on the Dataset will trigger execution of all read tasks, and execution -of all read tasks can be triggered manually using the -:meth:`ds.fully_executed() ` API. - - .. _dataset_supported_file_formats: Supported File Formats @@ -473,11 +217,6 @@ are supported for each of these storage systems. `S3FileSystem `__ instance to :func:`read_parquet() `. - .. note:: - - This example is not runnable as-is; to run it on your own private S3 data, add in a - path to your private bucket and specify your S3 credentials. - .. literalinclude:: ./doc_code/creating_datasets.py :language: python :start-after: __read_parquet_s3_with_fs_begin__ @@ -503,11 +242,6 @@ are supported for each of these storage systems. `__ instance to :func:`read_parquet() `. - .. note:: - - This example is not runnable as-is; you'll need to point it at your HDFS - cluster/data. - .. literalinclude:: ./doc_code/creating_datasets.py :language: python :start-after: __read_parquet_hdfs_with_fs_begin__ @@ -539,6 +273,178 @@ are supported for each of these storage systems. :start-after: __read_parquet_az_begin__ :end-before: __read_parquet_az_end__ +.. _dataset_from_in_memory_data: + +------------------- +From In-Memory Data +------------------- + +Datasets can be constructed from existing in-memory data. In addition to being able to +construct a ``Dataset`` from plain Python objects, Datasets also interoperates with popular +single-node libraries (`Pandas `__, +`NumPy `__, `Arrow `__) as well as +distributed frameworks (:ref:`Dask `, :ref:`Spark `, +:ref:`Modin `, :ref:`Mars `). + +.. _dataset_from_in_memory_data_single_node: + +From Single-Node Data Libraries +=============================== + +In this section, we demonstrate creating a ``Dataset`` from single-node in-memory data. + +.. tabbed:: Pandas + + Create a ``Dataset`` from a Pandas DataFrame. This constructs a ``Dataset`` + backed by a single Pandas DataFrame block. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_pandas_begin__ + :end-before: __from_pandas_end__ + + We can also build a ``Dataset`` from more than one Pandas DataFrame, where each said + DataFrame will become a block in the ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_pandas_mult_begin__ + :end-before: __from_pandas_mult_end__ + +.. tabbed:: NumPy + + Create a ``Dataset`` from a NumPy ndarray. This constructs a ``Dataset`` + backed by a single-column Arrow table block; the outer dimension of the ndarray + will be treated as the row dimension, and the column will have name ``"__value__"``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_numpy_begin__ + :end-before: __from_numpy_end__ + + We can also build a ``Dataset`` from more than one NumPy ndarray, where each said + ndarray will become a single-column Arrow table block in the ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_numpy_mult_begin__ + :end-before: __from_numpy_mult_end__ + +.. tabbed:: Arrow + + Create a ``Dataset`` from an + `Arrow Table `__. + This constructs a ``Dataset`` backed by a single Arrow ``Table`` block. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_arrow_begin__ + :end-before: __from_arrow_end__ + + We can also build a ``Dataset`` from more than one Arrow Table, where each said + ``Table`` will become a block in the ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_arrow_mult_begin__ + :end-before: __from_arrow_mult_end__ + +.. tabbed:: Python Objects + + Create a ``Dataset`` from a list of Python objects; since each object in this + particular list is a dictionary, Datasets will treat this list as a list of tabular + records, and will construct an Arrow ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_items_begin__ + :end-before: __from_items_end__ + +.. _dataset_from_in_memory_data_distributed: + +From Distributed Data Processing Frameworks +=========================================== + +In addition to working with single-node in-memory data, Datasets can be constructed from +distributed (multi-node) in-memory data, interoperating with popular distributed +data processing frameworks such as :ref:`Dask `, :ref:`Spark `, +:ref:`Modin `, and :ref:`Mars `. + +These conversions work by running Ray tasks converting each Dask/Spark/Modin/Mars +data partition to a block format supported by Datasets (copying data if needed), and using the +futures representing the return value of those conversion tasks as the ``Dataset`` block +futures. + +.. note:: + + These data processing frameworks must be running on Ray in order for these Datasets + integrations to work. See how these frameworks can be run on Ray in our + :ref:`data processing integrations docs `. + +.. tabbed:: Dask + + Create a ``Dataset`` from a + `Dask DataFrame `__. This constructs a + ``Dataset`` backed by the distributed Pandas DataFrame partitions that underly the + Dask DataFrame. + + .. note:: + + This conversion should have near-zero overhead: it involves zero data copying and + zero data movement. Datasets simply reinterprets the existing Dask DataFrame partitions + as Ray Datasets partitions without touching the underlying data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_dask_begin__ + :end-before: __from_dask_end__ + +.. tabbed:: Spark + + Create a ``Dataset`` from a `Spark DataFrame + `__. + This constructs a ``Dataset`` backed by the distributed Spark DataFrame partitions + that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP) + will save the Spark DataFrame partitions to Ray's object store in the Arrow format, + which Datasets will then interpret as its blocks. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_spark_begin__ + :end-before: __from_spark_end__ + +.. tabbed:: Modin + + Create a ``Dataset`` from a Modin DataFrame. This constructs a ``Dataset`` + backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame. + + .. note:: + + This conversion should have near-zero overhead: it involves zero data copying and + zero data movement. Datasets simply reinterprets the existing Modin DataFrame partitions + as Ray Datasets partitions without touching the underlying data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_modin_begin__ + :end-before: __from_modin_end__ + +.. tabbed:: Mars + + Create a ``Dataset`` from a Mars DataFrame. This constructs a ``Dataset`` + backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame. + + .. note:: + + This conversion should have near-zero overhead: it involves zero data copying and + zero data movement. Datasets simply reinterprets the existing Mars DataFrame partitions + as Ray Datasets partitions without touching the underlying data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_mars_begin__ + :end-before: __from_mars_end__ + .. _dataset_from_torch_tf: ------------------------- @@ -620,3 +526,63 @@ converts it into a Ray Dataset directly. ray_datasets = ray.data.from_huggingface(hf_datasets) ray_datasets["train"].take(2) # [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}] + +.. _datasets_custom_datasource: + +------------------ +Custom Datasources +------------------ + +Datasets can read and write in parallel to `custom datasources `__ defined in Python. +Once you have implemented `YourCustomDataSource`, you can use it like any other source in Ray Data: + +.. code-block:: python + + # Read from a custom datasource. + ds = ray.data.read_datasource(YourCustomDatasource(), **read_args) + + # Write to a custom datasource. + ds.write_datasource(YourCustomDatasource(), **write_args) + +-------------------------- +Performance Considerations +-------------------------- + +Read Parallelism +================ + +Datasets automatically selects the read ``parallelism`` according to the following procedure: + +1. The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. +2. The parallelism is set to the estimated number of CPUs multiplied by 2. If the parallelism is less than 8, it is set to 8. +3. The in-memory data size is estimated. If the parallelism would create in-memory blocks that are larger on average than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. +4. The parallelism is truncated to ``min(num_files, parallelism)``. + +To perform the read, ``parallelism`` parallel read tasks will be +launched, each reading one or more files and each creating a single block of data. +When reading from remote datasources, these parallel read tasks will be spread across +the nodes in your Ray cluster, creating the distributed collection of blocks that makes +up a distributed Ray Dataset. + +.. image:: images/dataset-read.svg + :width: 650px + :align: center + +This default parallelism can be overridden via the ``parallelism`` argument; see the +:ref:`performance guide ` for tips on how to tune this read +parallelism. + +.. _dataset_deferred_reading: + +Deferred Read Task Execution +============================ + +Datasets created via the ``ray.data.read_*()`` APIs are semi-lazy: initially, only the +first read task will be executed. This avoids blocking Dataset creation on the reading +of all data files, enabling inspection functions like +:meth:`ds.schema() ` and +:meth:`ds.show() ` to be used right away. Executing further +transformations on the Dataset will trigger execution of all read tasks, and execution +of all read tasks can be triggered manually using the +:meth:`ds.fully_executed() ` API. + diff --git a/doc/source/data/custom-data.rst b/doc/source/data/custom-data.rst deleted file mode 100644 index 094185294144..000000000000 --- a/doc/source/data/custom-data.rst +++ /dev/null @@ -1,16 +0,0 @@ -.. _datasets_custom_datasource: - ------------------------- -Using Custom Datasources ------------------------- - -Datasets can read and write in parallel to `custom datasources `__ defined in Python. -Once you have implemented `YourCustomDataSource`, you can use it like any other source in Ray Data: - -.. code-block:: python - - # Read from a custom datasource. - ds = ray.data.read_datasource(YourCustomDatasource(), **read_args) - - # Write to a custom datasource. - ds.write_datasource(YourCustomDatasource(), **write_args) diff --git a/doc/source/data/dataset-internals.rst b/doc/source/data/dataset-internals.rst new file mode 100644 index 000000000000..e6d4158e494f --- /dev/null +++ b/doc/source/data/dataset-internals.rst @@ -0,0 +1,143 @@ +.. _datasets_scheduling: + +============================================ +Scheduling, Execution, and Memory Management +============================================ + +Scheduling +========== + +Datasets uses Ray core for execution, and hence is subject to the same scheduling considerations as normal Ray tasks and actors. Datasets uses the following custom scheduling settings by default for improved performance: + +* The ``SPREAD`` scheduling strategy is used to ensure data blocks are evenly balanced across the cluster. +* Retries of application-level exceptions are enabled to handle transient errors from remote datasources. +* Dataset tasks ignore placement groups by default, see :ref:`Datasets and Placement Groups `. + +.. _datasets_tune: + +Datasets and Tune +~~~~~~~~~~~~~~~~~ + +When using Datasets in conjunction with :ref:`Ray Tune `, it is important to ensure there are enough free CPUs for Datasets to run on. By default, Tune will try to fully utilize cluster CPUs. This can prevent Datasets from scheduling tasks, reducing performance or causing workloads to hang. + +As an example, the following shows two ways to use Datasets together with Tune: + +.. tabbed:: Limiting Tune Concurrency + + By limiting the number of concurrent Tune trials, we ensure CPU resources are always available for Datasets execution. + This can be done using the ``max_concurrent_trials`` Tune option. + + .. literalinclude:: ./doc_code/key_concepts.py + :language: python + :start-after: __resource_allocation_1_begin__ + :end-before: __resource_allocation_1_end__ + +.. tabbed:: Reserving CPUs (Experimental) + + Alternatively, we can tell Tune to set aside CPU resources for other libraries. + This can be done by setting ``_max_cpu_fraction_per_node=0.8``, which reserves + 20% of node CPUs for Dataset execution. + + .. literalinclude:: ./doc_code/key_concepts.py + :language: python + :start-after: __resource_allocation_2_begin__ + :end-before: __resource_allocation_2_end__ + + .. warning:: + + This option is experimental and not currently recommended for use with + autoscaling clusters (scale-up will not trigger properly). + +.. _datasets_pg: + +Datasets and Placement Groups +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By default, Datasets configures its tasks and actors to use the cluster-default scheduling strategy ("DEFAULT"). You can inspect this configuration variable here: +``ray.data.context.DatasetContext.get_current().scheduling_strategy``. This scheduling strategy will schedule these tasks and actors outside any present +placement group. If you want to force Datasets to schedule tasks within the current placement group (i.e., to use current placement group resources specifically for Datasets), you can set ``ray.data.context.DatasetContext.get_current().scheduling_strategy = None``. + +This should be considered for advanced use cases to improve performance predictability only. We generally recommend letting Datasets run outside placement groups as documented in the :ref:`Datasets and Other Libraries ` section. + +Execution +========= + +This section covers Dataset execution modes and performance considerations. + +Lazy Execution Mode +~~~~~~~~~~~~~~~~~~~ + +By default, most Datasets operations are eager, which provides a simpler iterative +development experience. Datasets also has a lazy execution mode that can offer +improved performance due to stage fusion optimizations. + +Lazy execution mode can be enabled by calling +:meth:`ds = ds.lazy() `, which +returns a Dataset whose all subsequent operations will be lazy. These operations +won't be executed until the dataset is consumed or +:meth:`ds.fully_executed() ` is called to manually +trigger execution. + +Stage Fusion Optimization +~~~~~~~~~~~~~~~~~~~~~~~~~ + +In order to reduce memory usage and task overheads, Datasets will automatically fuse together +lazy operations that are compatible: + +* Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle +* Same compute strategy: Ray tasks vs Ray actors +* Same resource specification, e.g. ``num_cpus`` or ``num_cpus`` requests + +Read stages and subsequent map-like transformations will usually be fused together. +All-to-all transformations such as +:meth:`ds.random_shuffle() ` can be fused with earlier +map-like stages, but not later stages. + +You can tell if stage fusion is enabled by checking the :ref:`Dataset stats ` and looking for fused stages (e.g., ``read->map_batches``). + +.. code-block:: + + Stage N read->map_batches->shuffle_map: N/N blocks executed in T + * Remote wall time: T min, T max, T mean, T total + * Remote cpu time: T min, T max, T mean, T total + * Output num rows: N min, N max, N mean, N total + +To avoid unnecessary data movement in the distributed setting, +:class:`DatasetPipelines ` will always use +lazy execution under the hood. + +Memory Management +================= + +This section describes how Datasets manages execution and object store memory. + +Execution Memory +~~~~~~~~~~~~~~~~ + +During execution, certain types of intermediate data must fit in memory. This includes the input block of a task, as well as at least one of the output blocks of the task (when a task has multiple output blocks, only one needs to fit in memory at any given time). The input block consumes object stored shared memory (and Python heap memory if conversion to non-Arrow format is needed). The output blocks consume Python heap memory (prior to putting in the object store) as well as object store memory (after being put in the object store). + +This means that large block sizes can lead to potential out-of-memory situations. To avoid these issues, make sure no single item in your Datasets is too large, and always call :meth:`ds.map_batches() ` with batch size small enough such that the output batch can comfortably fit into memory. + +Object Store Memory +~~~~~~~~~~~~~~~~~~~ + +Datasets uses the Ray object store to store data blocks, which means it inherits the memory management features of the Ray object store. This section discusses the relevant features: + +* Object Spilling: Since Datasets uses the Ray object store to store data blocks, any blocks that can't fit into object store memory are automatically spilled to disk. The objects are automatically reloaded when needed by downstream compute tasks: +* Locality Scheduling: Ray will preferentially schedule compute tasks on nodes that already have a local copy of the object, reducing the need to transfer objects between nodes in the cluster. +* Reference Counting: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object. + +Block Data Formats +~~~~~~~~~~~~~~~~~~ + +In order to optimize conversion costs, Datasets can hold tabular data in-memory +as either `Arrow Tables `__ +or `Pandas DataFrames `__. + +Different ways of creating Datasets leads to a different starting internal format: +* Reading tabular files (Parquet, CSV, JSON) creates Arrow blocks initially. +* Converting from Pandas, Dask, Modin, and Mars creates Pandas blocks initially. +* Reading NumPy files or converting from NumPy ndarrays creaates Arrow blocks. + +However, this internal format is not exposed to the user. Datasets converts between formats +as needed internally depending on the specified ``batch_format`` of transformations. diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index cedad2bdfc8c..d2172e24caa8 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -1,7 +1,7 @@ .. _datasets_tensor_support: -Working with Tensors -==================== +ML Tensor Support +================= Tables with tensor columns ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/data/dataset.rst b/doc/source/data/dataset.rst index 328f9958362c..0466dca23f47 100644 --- a/doc/source/data/dataset.rst +++ b/doc/source/data/dataset.rst @@ -97,7 +97,7 @@ Advanced users can refer directly to the Ray Datasets :ref:`API reference `, :ref:`save datasets`, :ref:`transform datasets`, - :ref:`access and exchange datasets`, :ref:`pipeline + :ref:`access and exchange datasets`, :ref:`pipeline transformations`, :ref:`load and process data for ML`, work with :ref:`tensor data`, or :ref:`use pipelines`. diff --git a/doc/source/data/getting-started.rst b/doc/source/data/getting-started.rst index 67826caba04a..640a739a33a3 100644 --- a/doc/source/data/getting-started.rst +++ b/doc/source/data/getting-started.rst @@ -82,8 +82,8 @@ setup is expensive. See the :ref:`Transforming Datasets guide ` for an in-depth guide on transforming datasets. -Passing and accessing datasets ------------------------------- +Accessing and exchanging datasets +--------------------------------- Datasets can be passed to Ray tasks or actors and accessed with :meth:`.iter_batches() ` or @@ -108,5 +108,5 @@ training actors: :start-after: __dataset_split_begin__ :end-before: __dataset_split_end__ -See the :ref:`Accessing Datasets guide ` for an in-depth guide +See the :ref:`Consuming Datasets guide ` for an in-depth guide on accessing and exchanging datasets. diff --git a/doc/source/data/images/dataset-pipeline-2-mini.svg b/doc/source/data/images/dataset-pipeline-2-mini.svg new file mode 100644 index 000000000000..4855d82cecb4 --- /dev/null +++ b/doc/source/data/images/dataset-pipeline-2-mini.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index f48a21ca1df4..d1c95c3b4745 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -13,7 +13,8 @@ Datasets A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either `Arrow table format `__ or a Python list (for non-tabular data). -Having multiple blocks in a dataset allows for parallel transformation and ingest of the data. +For ML use cases, Datasets also natively supports :ref:`Tensor data `. +Having multiple blocks in a dataset allows for parallel transformation and ingest. The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each: @@ -75,8 +76,8 @@ Datasets relies on :ref:`task-based fault tolerance ` in R There are a few cases that are not currently supported: - * If the original worker process that created the Dataset dies. This is because the creator stores the metadata for the :ref:`objects ` that comprise the Dataset. - * When ``compute=ActorPoolStrategy()`` is specified for transformations. +* If the original worker process that created the Dataset dies. This is because the creator stores the metadata for the :ref:`objects ` that comprise the Dataset. +* When ``compute=ActorPoolStrategy()`` is specified for transformations. .. _dataset_pipeline_concept: @@ -86,40 +87,9 @@ Dataset Pipelines Dataset pipelines allow Dataset transformations to be executed incrementally on *windows* of the base data, instead of on all of the data at once. This can be used for streaming data loading into ML training, or to execute batch transformations on large datasets without needing to load the entire dataset into cluster memory. -See the :ref:`Dataset Pipelines Guide ` for an in-depth guide on pipelining compute. - ----------------------------- -Datasets and Other Libraries ----------------------------- - -When using Datasets in conjunction with other Ray libraries, it is important to ensure there are enough free CPUs for Datasets to run on. Libraries such as Tune by default try to fully utilize cluster CPUs. This can prevent Datasets from scheduling tasks, reducing performance or causing workloads to hang. - -.. _datasets_tune: - -As an example, the following shows two ways to use Datasets together with the Ray Tune library: - -.. tabbed:: Limiting Tune Concurrency - - By limiting the number of concurrent Tune trials, we ensure CPU resources are always available for Datasets execution. - This can be done using the ``max_concurrent_trials`` Tune option. - - .. literalinclude:: ./doc_code/key_concepts.py - :language: python - :start-after: __resource_allocation_1_begin__ - :end-before: __resource_allocation_1_end__ - -.. tabbed:: Reserving CPUs (Experimental) - - Alternatively, we can tell Tune to set aside CPU resources for other libraries. - This can be done by setting ``_max_cpu_fraction_per_node=0.8``, which reserves - 20% of node CPUs for Dataset execution. - - .. literalinclude:: ./doc_code/key_concepts.py - :language: python - :start-after: __resource_allocation_2_begin__ - :end-before: __resource_allocation_2_end__ +.. + https://docs.google.com/drawings/d/1A_nWvignkdvs4GPRShCNYcnb1T--iQoSEeS4uWRVQ4k/edit - .. warning:: +.. image:: images/dataset-pipeline-2-mini.svg - This option is experimental and not currently recommended for use with - autoscaling clusters (scale-up will not trigger properly). +Dataset pipelines can be read in a streaming fashion by one consumer, or split into multiple sub-pipelines and read in parallel by multiple consumers for distributd training. See the :ref:`Dataset Pipelines Guide ` for an in-depth guide on pipelining compute. diff --git a/doc/source/data/memory-management.rst b/doc/source/data/memory-management.rst deleted file mode 100644 index 1c9a9fd0fa49..000000000000 --- a/doc/source/data/memory-management.rst +++ /dev/null @@ -1,161 +0,0 @@ -.. _data_advanced: - -Memory Management -================= - -This section deals with how Datasets manages execution and object store memory. - -Execution Memory -~~~~~~~~~~~~~~~~ - -During execution, certain types of intermediate data must fit in memory. This includes the input block of a task, as well as at least one of the output blocks of the task (when a task has multiple output blocks, only one needs to fit in memory at any given time). The input block consumes object stored shared memory (Python heap memory for non-Arrow data). The output blocks consume Python heap memory (prior to putting in the object store) as well as object store memory (after being put in the object store). - -This means that large block sizes can lead to potential out-of-memory situations. To -avoid OOM errors, Datasets can split blocks during map and read tasks into pieces -smaller than the target max block size. In some cases, this splitting is not possible -(e.g., if a single item in a block is extremely large, or the function given to -:meth:`ds.map_batches() ` returns a very large batch). To -avoid these issues, make sure no single item in your Datasets is too large, and always -call :meth:`ds.map_batches() ` with batch size small enough such that the output batch can comfortably fit into memory. - -.. note:: - - Block splitting is off by default. See the :ref:`performance section ` on how to enable block splitting (beta). - -Object Store Memory -~~~~~~~~~~~~~~~~~~~ - -Datasets uses the Ray object store to store data blocks, which means it inherits the memory management features of the Ray object store. This section discusses the relevant features: - -**Object Spilling**: Since Datasets uses the Ray object store to store data blocks, any blocks that can't fit into object store memory are automatically spilled to disk. The objects are automatically reloaded when needed by downstream compute tasks: - -.. image:: images/dataset-spill.svg - :width: 650px - :align: center - -.. - https://docs.google.com/drawings/d/1H_vDiaXgyLU16rVHKqM3rEl0hYdttECXfxCj8YPrbks/edit - -**Locality Scheduling**: Ray will preferentially schedule compute tasks on nodes that already have a local copy of the object, reducing the need to transfer objects between nodes in the cluster. - -**Reference Counting**: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object. - -**Load Balancing**: Datasets uses Ray scheduling hints to spread read tasks out across the cluster to balance memory usage. - -Lazy Execution Mode -~~~~~~~~~~~~~~~~~~~ - -.. note:: - - Lazy execution mode is experimental. If you run into any issues, please reach - out on `Discourse `__ or open an issue on the - `Ray GitHub repo `__. - -By default, all Datasets operations are eager (except for data reading, which is -semi-lazy; see the :ref:`deferred reading docs `), executing -each stage synchronously. This provides a simpler iterative development and debugging -experience, allowing you to inspect up-to-date metadata (schema, row count, etc.) after -each operation, greatly improving the typical "Getting Started" experience. - -However, this eager execution mode can result in less optimal (i.e. slower) execution -and increased memory utilization compared to what's possible with a lazy execution mode. -That's why Datasets offers a lazy execution mode, which you can transition to after -you're done prototyping your Datasets pipeline. - -Lazy execution mode can be enabled by calling -:meth:`ds = ds.lazy() `, which -returns a dataset whose all subsequent operations will be **lazy**. These operations -won't be executed until the dataset is consumed (e.g. via -:meth:`ds.take() `, -:meth:`ds.iter_batches() `, -:meth:`ds.to_torch() `, etc.) or if -:meth:`ds.fully_executed() ` is called to manually -trigger execution. - -The big optimizations that lazy execution enables are **automatic stage fusion** and -**block move semantics**. - -Automatic Stage Fusion -~~~~~~~~~~~~~~~~~~~~~~ - -Automatic fusion of stages/operations can significantly lower the Ray task overhead of -Datasets workloads, since a chain of reading and many map-like transformations will be -condensed into a single stage of Ray tasks; this results in less data needing to be put -into Ray's object store and transferred across nodes, and therefore resulting in lower -memory utilization and faster task execution. - -Datasets will automatically fuse together lazy operations that are compatible: - -* Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle -* Same compute strategy: Ray tasks vs Ray actors -* Same resource specification, e.g. ``num_cpus`` or ``num_cpus`` requests - -Read and subsequent map-like transformations -(e.g. :meth:`ds.map_batches() `, -:meth:`ds.filter() `, etc.) will usually be fused together. -All-to-all transformations such as -:meth:`ds.random_shuffle() ` can be fused with earlier -map-like stages, but not later stages. - -.. note:: - - For eager mode Datasets, reads are semi-lazy, so the transformation stage right after - the read stage (that triggers the full data read) will fuse with the read stage. Note - that this currently incurs re-reading of any already-read blocks (a fix for this is - currently in progress.) - - -You can tell if stage fusion is enabled by checking the :ref:`Dataset stats ` and looking for fused stages (e.g., ``read->map_batches``). - -.. code-block:: - - Stage N read->map_batches->shuffle_map: N/N blocks executed in T - * Remote wall time: T min, T max, T mean, T total - * Remote cpu time: T min, T max, T mean, T total - * Output num rows: N min, N max, N mean, N total - -Block Move Semantics -~~~~~~~~~~~~~~~~~~~~ - -In addition to fusing together stages, lazy execution mode further optimizes memory -utilization by eagerly releasing the data produced by intermediate operations in a -chain. - -For example, if you have a chain of ``read_parquet() -> map_batches() -> filter()`` operations: - -.. literalinclude:: ./doc_code/key_concepts.py - :language: python - :start-after: __block_move_begin__ - :end-before: __block_move_end__ - -that, for the sake of this example, aren't fused together, Datasets can eagerly release -the outputs of the ``read_parquet()`` stage and the ``map_batches()`` stage before the -subsequent stage (``map_batches()`` and ``filter()``, respectively) have finished. This -was not possible in eager mode, since every operation materialized the data and returned -the references back to the user. But in lazy execution mode, we know that the outputs of -the ``read_parquet()`` and ``map_batches()`` stages are only going to be used by the -downstream stages, so we can more aggressively release them. - -Dataset Pipelines and Stage Fusion -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To avoid unnecessary data movement in the distributed setting, -:class:`DatasetPipelines ` will always use -these lazy execution optimizations (stage fusion and block move semantics) -under-the-hood. Because a ``DatasetPipeline`` doesn't support creating more than one -``DatasetPipeline`` from a ``DatasetPipeline`` (i.e. no fan-out), we can clear block -data extra aggressively. - -.. note:: - - When creating a pipeline (i.e. calling :meth:`ds.window() ` - or :meth:`ds.repeat() `) immediately after a read stage, any - already read data will be dropped, and the read stage will be absorbed into the - pipeline and be made fully lazy. This allows you to easily create ML ingest pipelines - that re-read data from storage on every epoch, as well as streaming batch inference - pipelines that window all the way down to the file reading. - -.. literalinclude:: ./doc_code/key_concepts.py - :language: python - :start-after: __dataset_pipelines_execution_begin__ - :end-before: __dataset_pipelines_execution_end__ diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index b719753a37cd..748055790edc 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -7,14 +7,7 @@ Debugging Statistics ~~~~~~~~~~~~~~~~~~~~ You can view debug stats for your Dataset and DatasetPipeline executions via ``ds.stats()``. -These stats can be used to understand the performance of your Datasets workload and can help you debug problematic bottlenecks. - -At a high level, execution stats for tasks (e.g., CPU time) are attached to block metadata objects. -Datasets have stats objects that hold references to these stats and parent dataset stats (this avoids stats holding references to parent datasets, allowing them to be garbage collected). -Similarly, DatasetPipelines hold stats from recently computed datasets. -In addition, we also collect statistics about iterator timings (time spent waiting / processing / in user code). -Here's a sample output of getting stats in one of the most advanced use cases, -namely iterating over a split of a dataset pipeline in a remote task: +These stats can be used to understand the performance of your Dataset workload and can help you debug problematic bottlenecks. Note that both execution and iterator statistics are available: .. code-block:: python @@ -117,38 +110,10 @@ By default, Datasets automatically selects the read parallelism based on the cur However, the number of read tasks can also be increased manually via the ``parallelism`` parameter. For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. -Tuning Max Block Size -~~~~~~~~~~~~~~~~~~~~~ - -Block splitting is off by default. To enable block splitting (beta), run ``ray.data.context.DatasetContext.get_current().block_splitting_enabled = True``. - -Once enabled, the max target block size can be adjusted via the Dataset context API. -For example, to configure a max target block size of 8GiB, run ``ray.data.context.DatasetContext.get_current().target_max_block_size = 8192 * 1024 * 1024`` prior to creating the Dataset. -Lower block sizes reduce the max amount of object store and Python heap memory required during execution. -However, having too many blocks may introduce task scheduling overheads. - -We do not recommend adjusting this value for most workloads. -However, if shuffling a large amount of data, increasing the block size limit reduces the number of intermediate blocks (as a rule of thumb, shuffle creates ``O(num_blocks**2)`` intermediate blocks). -Alternatively, you can ``.repartition()`` the dataset to reduce the number of blocks prior to shuffle/groupby operations. -If you're seeing out of memory errors during map tasks, reducing the max block size may also be worth trying. - -Note that the number of blocks a Dataset created from ``ray.data.read_*`` contains is not fully known until all read tasks are fully executed. -The number of blocks printed in the Dataset's string representation is initially set to the number of read tasks generated. -To view the actual number of blocks created after block splitting, use ``len(ds.get_internal_block_refs())``, which will block until all data has been read. - -Datasets and Placement Groups -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -By default, Datasets configures its tasks and actors to use the cluster-default scheduling strategy ("DEFAULT"). You can inspect this configuration variable here: -``ray.data.context.DatasetContext.get_current().scheduling_strategy``. This scheduling strategy will schedule these tasks and actors outside any present -placement group. If you want to force Datasets to schedule tasks within the current placement group (i.e., to use current placement group resources specifically for Datasets), you can set ``ray.data.context.DatasetContext.get_current().scheduling_strategy = None``. - -This should be considered for advanced use cases to improve performance predictability only. We generally recommend letting Datasets run outside placement groups as documented in the :ref:`Datasets and Other Libraries ` section. - .. _shuffle_performance_tips: -Improving shuffle performance -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Enabling Push-Based Shuffle +~~~~~~~~~~~~~~~~~~~~~~~~~~~ Some Dataset operations require a *shuffle* operation, meaning that data is shuffled from all of the input partitions to all of the output partitions. These operations include ``Dataset.random_shuffle``, ``Dataset.sort`` and ``Dataset.groupby``. diff --git a/doc/source/data/pipelining-compute.rst b/doc/source/data/pipelining-compute.rst index 50759b9ba937..c436c5abe3aa 100644 --- a/doc/source/data/pipelining-compute.rst +++ b/doc/source/data/pipelining-compute.rst @@ -4,6 +4,10 @@ Pipelining Compute ================== +Dataset pipelines allow Dataset transformations to be executed incrementally on *windows* of the base data, instead of on all of the data at once. This can be used for streaming data loading into ML training, or to execute batch transformations on large datasets without needing to load the entire dataset into cluster memory. + +Dataset pipelines can be read in a streaming fashion by one consumer, or split into multiple sub-pipelines and read in parallel by multiple consumers for distributed traininmg. + Creating a DatasetPipeline ========================== @@ -66,15 +70,10 @@ You can also create a DatasetPipeline from a custom iterator over dataset creato pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits]) -Per-Window Transformations -========================== - -While most Dataset operations are per-row (e.g., map, filter), some operations apply to the Dataset as a whole (e.g., sort, shuffle). When applied to a pipeline, holistic transforms like shuffle are applied separately to each window in the pipeline: - -.. important:: +Transforming Pipeline Windows +============================= - Windowed shuffle or global shuffle are expensive operations. Use only if you really need them. - Alternatively, you may consider local shuffle after converting to_tf() or to_torch(), if simple shuffle is sufficient. +While most Dataset operations are per-row (e.g., map, filter), some operations apply to the Dataset as a whole (e.g., sort, shuffle). Per-row operations apply to rows in the pipeline independently in the same way they do in a normal Dataset. However, when used in a pipeline, holistic transforms like shuffle are applied separately to each window in the pipeline: .. code-block:: python @@ -161,7 +160,7 @@ Ignoring the output, the above script has three separate stages: loading, prepro .. image:: images/dataset-pipeline-1.svg Enabling Pipelining -=================== +~~~~~~~~~~~~~~~~~~~ We can optimize this by *pipelining* the execution of the dataset with the ``.window()`` call, which returns a DatasetPipeline instead of a Dataset object. The pipeline supports similar transformations to the original Dataset: @@ -187,7 +186,7 @@ Pipelined Writes When calling ``write_()`` on a pipeline, data is written separately for each window. This means that in the above example, JSON files will start being written as soon as the first window is finished, in a incremental / pipelined way. Tuning Parallelism -================== +~~~~~~~~~~~~~~~~~~ Tune the throughput vs latency of your pipeline with the ``blocks_per_window`` setting. As a rule of thumb, higher parallelism settings perform better, however ``blocks_per_window == num_blocks`` effectively disables pipelining, since the DatasetPipeline will only contain a single Dataset. The other extreme is setting ``blocks_per_window=1``, which minimizes the latency to initial output but only allows one concurrent transformation task per stage: @@ -202,3 +201,88 @@ You can also specify the size of each window using ``bytes_per_window``. In this .read_binary_files("s3://bucket/image-dir") \ .window(bytes_per_window=10e9) # -> INFO -- Created DatasetPipeline with 73 windows: 9120MiB min, 9431MiB max, 9287MiB mean + # -> INFO -- Blocks per window: 10 min, 16 max, 14 mean + # -> INFO -- ✔️ This pipeline's per-window parallelism is high enough to fully utilize the cluster. + # -> INFO -- ✔️ This pipeline's windows can each fit in object store memory without spilling. + +Datasets will warn you if the windows are too large or each window has insufficient parallelism (too few blocks). Check out the reported statistics for window size and blocks per window to ensure efficient pipeline execution. + +Pipelines for ML Ingest +======================= + +Dataset pipelines can also be used for streaming data loading into distributed training in Ray. + +.. note:: + + Ray Train is the standard libary for distributed training in Ray. Train will automatically create + and split DatasetPipelines for you. See :ref:`Configuring Training Datasets ` + for the recommended way to get started with distributed training. + +Splitting pipelines for distributed ingest +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Similar to how you can ``.split()`` a Dataset, you can also split a DatasetPipeline with the same method call. This returns a number of DatasetPipeline shards that share a common parent pipeline. Each shard can be passed to a remote task or actor. + +**Code**: + +.. code-block:: python + + # Create a pipeline that loops over its source dataset indefinitely. + pipe: DatasetPipeline = ray.data \ + .read_parquet("s3://bucket/dir") \ + .repeat() \ + .random_shuffle_each_window() + + @ray.remote(num_gpus=1) + class TrainingWorker: + def __init__(self, rank: int, shard: DatasetPipeline): + self.rank = rank + self.shard = shard + ... + + shards: List[DatasetPipeline] = pipe.split(n=3) + workers = [TrainingWorker.remote(rank, s) for rank, s in enumerate(shards)] + ... + + +**Pipeline**: + +.. image:: images/dataset-repeat-2.svg + +Handling Epochs +~~~~~~~~~~~~~~~ + +It's common in ML training to want to divide data ingest into epochs, or repetitions over the original source dataset. +DatasetPipeline provides a convenient ``.iter_epochs()`` method that can be used to split up the pipeline into epoch-delimited pipeline segments. +Epochs are defined by the last call to ``.repeat()`` in a pipeline, for example: + +.. code-block:: python + + pipe = ray.data.from_items([0, 1, 2, 3, 4]) \ + .repeat(3) \ + .random_shuffle_each_window() + for i, epoch in enumerate(pipe.iter_epochs()): + print("Epoch {}", i) + for row in epoch.iter_rows(): + print(row) + # -> + # Epoch 0 + # 2 + # 1 + # 3 + # 4 + # 0 + # Epoch 1 + # 3 + # 4 + # 0 + # 2 + # 1 + # Epoch 2 + # 3 + # 2 + # 4 + # 1 + # 0 + +Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls. diff --git a/doc/source/data/saving-datasets.rst b/doc/source/data/saving-datasets.rst deleted file mode 100644 index 590971e4cb06..000000000000 --- a/doc/source/data/saving-datasets.rst +++ /dev/null @@ -1,38 +0,0 @@ -.. _saving_datasets: - -=============== -Saving Datasets -=============== - -Datasets can be written to local or remote storage in the desired data format. -The supported formats include Parquet, CSV, JSON, NumPy. To control the number -of output files, you may use :meth:`ds.repartition() ` -to repartition the Dataset before writing out. - -.. tabbed:: Parquet - - .. literalinclude:: ./doc_code/saving_datasets.py - :language: python - :start-after: __write_parquet_begin__ - :end-before: __write_parquet_end__ - -.. tabbed:: CSV - - .. literalinclude:: ./doc_code/saving_datasets.py - :language: python - :start-after: __write_csv_begin__ - :end-before: __write_csv_end__ - -.. tabbed:: JSON - - .. literalinclude:: ./doc_code/saving_datasets.py - :language: python - :start-after: __write_json_begin__ - :end-before: __write_json_end__ - -.. tabbed:: NumPy - - .. literalinclude:: ./doc_code/saving_datasets.py - :language: python - :start-after: __write_numpy_begin__ - :end-before: __write_numpy_end__ diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index 0eb48a8727f1..b920f2bccda8 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -4,12 +4,10 @@ Transforming Datasets ===================== -The Ray Datasets transformations take in datasets and produce new datasets. +Datasets transformations take in datasets and produce new datasets. For example, *map* is a transformation that applies a user-defined function (UDF) -on each row of input dataset and returns a new dataset as result. The Datasets -transformations are **composable**. Operations can be further applied on the result -dataset, forming a chain of transformations to express more complex computations. -Transformations are the core for expressing business logic in Datasets. +on each dataset record and returns a new dataset as the result. Datasets +transformations can be composed to express a chain of computations. .. _transform_datasets_transformations: @@ -17,14 +15,12 @@ Transformations are the core for expressing business logic in Datasets. Transformations --------------- -In general, we have two types of transformations: +There are two main types of transformations: -* **One-to-one transformations:** each input block will contribute to only one output - block, such as :meth:`ds.map_batches() `. In other - systems this may be called narrow transformations. -* **All-to-all transformations:** input blocks can contribute to multiple output blocks, - such as :meth:`ds.random_shuffle() `. In other - systems this may be called wide transformations. +* One-to-one: each input block will contribute to only one output + block, such as :meth:`ds.map_batches() `. +* All-to-all: input blocks can contribute to multiple output blocks, + such as :meth:`ds.random_shuffle() `. Here is a table listing some common transformations supported by Ray Datasets. @@ -37,6 +33,12 @@ Here is a table listing some common transformations supported by Ray Datasets. * - :meth:`ds.map_batches() ` - One-to-one - Apply a given function to batches of records of this dataset. + * - :meth:`ds.add_column() ` + - One-to-one + - Apply a given function to batches of records to create a new column. + * - :meth:`ds.drop_columns() ` + - One-to-one + - Drop the given columns from the dataset. * - :meth:`ds.split() ` - One-to-one - | Split the dataset into N disjoint pieces. @@ -84,85 +86,54 @@ express your customized business logic in transformations. Here we will focus on :meth:`.map_batches() ` as it's the primary mapping API in Datasets. -A batch UDF can be a function or, if using the -:ref:`actor compute strategy `, a -:ref:`callable class `. -These UDFs have several :ref:`batch format options `, -which control the format of the batches that are passed to the provided batch UDF. -Depending on the underlying :ref:`dataset format `, -using a particular batch format may or may not incur a data conversion cost -(e.g. converting an Arrow Table to a Pandas DataFrame, or creating an Arrow Table from a -Python list, both of which would incur a full copy of the data). - -.. _transform_datasets_dataset_formats: - -Dataset Formats -=============== - -A **dataset format** refers to how Datasets represents data under-the-hood as data -**blocks**. - -* **Tabular (Arrow or Pandas) Datasets:** Represented under-the-hood as - `Arrow Tables `__ - or - `Pandas DataFrames `__. - Tabular datasets are created by reading on-disk tabular data (Parquet, CSV, JSON), - converting from in-memory tabular data (Pandas DataFrames, Arrow Tables, - dictionaries, Dask DataFrames, Modin DataFrames, Spark DataFrames, Mars DataFrames) - and by returning tabular data (Arrow, Pandas, dictionaries) from previous batch - UDFs. - - Datasets will minimize conversions between the Arrow and Pandas - representation: tabular data read from disk will always be read as Arrow Tables, but - in-memory conversions of Pandas DataFrame data to a ``Dataset`` (e.g. converting from - Pandas, Dask, Modin, Spark, and Mars) will use Pandas DataFrames as the internal data - representation in order to avoid extra data conversions/copies. - -* **Tensor Datasets:** Represented under-the-hood as a single-column - `Arrow Table `__ - or - `Pandas DataFrame `__, - using our :ref:`tensor extension type ` to embed the - tensor in these tables under a single ``"__value__"`` column. Tensor datasets are - created by reading on-disk tensor data (NPY), converting from in-memory tensor data - (NumPy), and by returning tensor data (NumPy) from previous batch UDFs. - - Note that converting the underlying tabular representation to a NumPy ndarray is - zero-copy, including converting the entire column to an ndarray and converting just - a single column element to an ndarray. - -* **Simple Datasets:** Represented under-the-hood as plain Python lists. Simple - datasets are created by reading on-disk binary and plain-text data, converting from - in-memory simple data (Python lists), and by returning simple data (Python lists) - from previou batch UDFs. - - Simple datasets are mostly used as an escape hatch for data that's not cleanly - representable in Arrow Tables and Pandas DataFrames. +Here are the basics that you need to know about UDFs: + +* A UDF can be either a function, or if using the :ref:`actor compute strategy `, a :ref:`callable class `. +* Select the UDF input :ref:`batch format ` using the ``batch_format`` argument. +* The UDF output type determines the Dataset schema of the transformation result. + +.. _transform_datasets_callable_classes: + +Callable Class UDFs +=================== + +When using the actor compute strategy, per-row and per-batch UDFs can also be +*callable classes*, i.e. classes that implement the ``__call__`` magic method. The +constructor of the class can be used for stateful setup, and will be only invoked once +per worker actor. + +.. note:: + These transformation APIs take the uninstantiated callable class as an argument, + not an instance of the class. + +.. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __writing_callable_classes_udfs_begin__ + :end-before: __writing_callable_classes_udfs_end__ .. _transform_datasets_batch_formats: -Batch Formats -============= +UDF Input Batch Format +====================== -The **batch format** is the format of the data that's given to batch UDFs, e.g. -Pandas DataFrames, Arrow Tables, NumPy ndarrays, or Python lists. The -:meth:`.map_batches() ` API has a ``batch_format: str`` -parameter that allows the user to dictate the batch format; we dig into the details of -each ``batch_format`` option below: +Choose the *batch format* of the data given to UDFs +by setting the ``batch_format`` option of :meth:`.map_batches() `. +Here is an overview of the available batch formats: .. tabbed:: "native" (default) - The ``"native"`` batch format presents batches in the canonical format for each dataset - type: + The "native" batch format presents data as follows for each Dataset type: - * **Tabular (Arrow or Pandas) Datasets:** Each batch will be a + * **Tabular Datasets**: Each batch will be a `pandas.DataFrame `__. - If the dataset is represented as Arrow Tables under-the-hood, the conversion to the - Pandas DataFrame batch format will incur a conversion cost (i.e., a full copy of - each batch will be created in order to convert it). - * **Tensor Datasets:** Each batch will be a - `numpy.ndarray `__. - * **Simple Datasets:** Each batch will be a Python list. + This may incur a conversion cost if the underlying Dataset block is not + zero-copy convertible from an Arrow table. + + * **Tensor Datasets** (single-column): Each batch will be a single + `numpy.ndarray `__ + containing the single tensor column for this batch. + + * **Simple Datasets**: Each batch will be a Python list. .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -171,13 +142,10 @@ each ``batch_format`` option below: .. tabbed:: "pandas" - The ``"pandas"`` batch format provides batches in a + The ``"pandas"`` batch format presents batches in `pandas.DataFrame `__ format. If converting a simple dataset to Pandas DataFrame batches, a single-column - dataframe with the column ``"value"`` will be created. - - If the underlying datasets data is not already in Pandas DataFrame format, the - batch format conversion will incur a copy for each batch. + dataframe with the column ``"__value__"`` will be created. .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -186,13 +154,10 @@ each ``batch_format`` option below: .. tabbed:: "pyarrow" - The ``"pyarrow"`` batch format provides batches in a + The ``"pyarrow"`` batch format presents batches in `pyarrow.Table `__ format. If converting a simple dataset to Arrow Table batches, a single-column table - with the column ``"value"`` will be created. - - If the underlying datasets data is not already in Arrow Table format, the - batch format conversion will incur a copy for each batch. + with the column ``"__value__"`` will be created. .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -201,126 +166,46 @@ each ``batch_format`` option below: .. tabbed:: "numpy" - The ``"numpy"`` batch format provides batches in a + The ``"numpy"`` batch format presents batches in `numpy.ndarray `__ - format. The following details how each dataset format is converted into a NumPy batch: - - * **Tensor Datasets:** Each batch will be a single - `numpy.ndarray `__ - containing the single-tensor-column for this batch. + format as follows: - Note that this conversion should always be zero-copy. - - * **Tabular (Arrow or Pandas) Datasets:** Each batch will be a dictionary of NumPy + * **Tabular Datasets**: Each batch will be a dictionary of NumPy ndarrays (``Dict[str, np.ndarray]``), with each key-value pair representing a column in the table. - Note that this conversion should usually be zero-copy. + * **Tensor Datasets** (single-column): Each batch will be a single + `numpy.ndarray `__ + containing the single-tensor-column for this batch. - * **Simple Datasets:** Each batch will be a single NumPy ndarray, where Datasets will + * **Simple Datasets**: Each batch will be a single NumPy ndarray, where Datasets will attempt to convert each list-batch to an ndarray. - Note that this conversion will - incur a copy for primitive types that are representable as a NumPy dtype (since - NumPy will create a continguous ndarray in that case), and will not incur a copy if - the batch items aren't supported in the NumPy dtype system (since NumPy will create - an ndarray of ``np.object`` pointers to the batch items in that case). - .. literalinclude:: ./doc_code/transforming_datasets.py :language: python :start-after: __writing_numpy_udfs_begin__ :end-before: __writing_numpy_udfs_end__ -The following table summarizes the conversion costs from a particular dataset format to -a particular batch format: - -.. list-table:: Batch format conversion costs - is the conversion zero-copy? - :header-rows: 1 - :stub-columns: 1 - :widths: 40 15 15 15 15 - :align: center - - * - Dataset Format -> Batch Format - - ``"native"`` - - ``"pandas"`` - - ``"pyarrow"`` - - ``"numpy"`` - * - Tabular - Pandas - - Zero-Copy - - Zero-Copy - - Copy - - Zero-Copy - * - Tabular - Arrow - - Copy - - Copy - - Zero-Copy - - Zero-Copy - * - Tensor - Pandas - - Zero-Copy - - Zero-Copy - - Copy - - Zero-Copy - * - Tensor - Arrow - - Zero-Copy - - Copy - - Zero-Copy - - Zero-Copy - * - Simple - List - - Zero-Copy - - Copy - - Copy - - Copy - -You should reference the `pyarrow.Table APIs -`__, the -`pandas.DataFrame APIs `__, -or the `numpy.ndarray APIs `__ -when writing batch UDFs. - .. tip:: - Write your UDFs to leverage built-in vectorized operations on the ``pandas.DataFrame``, - ``pyarrow.Table``, and ``numpy.ndarray`` abstractions for better performance. For + Prefer using vectorized operations on the ``pandas.DataFrame``, + ``pyarrow.Table``, and ``numpy.ndarray`` types for better performance. For example, suppose you want to compute the sum of a column in ``pandas.DataFrame``: instead of iterating over each row of a batch and summing up values of that column, - you should use ``df_batch["col_foo"].sum()``. - -.. _transform_datasets_callable_classes: - -Callable Class UDFs -=================== - -When using the actor compute strategy, per-row and per-batch UDFs can also be -**callable classes**, i.e. classes that implement the ``__call__`` magic method. The -constructor of the class can be used for stateful setup, and will be only invoked once -per actor worker. - -.. note:: - These transformation APIs take the uninstantiated callable class as an argument, - **not** an instance of the class! - -.. literalinclude:: ./doc_code/transforming_datasets.py - :language: python - :start-after: __writing_callable_classes_udfs_begin__ - :end-before: __writing_callable_classes_udfs_end__ + use ``df_batch["col_foo"].sum()``. .. _transform_datasets_batch_output_types: Batch UDF Output Types ====================== -The return type of a batch UDF will determine the format of the resulting dataset. This -allows you to dynamically convert between data formats during batch transformations. - -The following details how each **batch** UDF (as used in -:meth:`ds.map_batches() `) return type will be interpreted -by Datasets when constructing its internal blocks: +The following output types are allowed for batch UDFs (e.g., +:meth:`ds.map_batches() `). The following describes +how they are interpreted to create the transformation result: .. tabbed:: pd.DataFrame - Tabular dataset containing - `Pandas DataFrame `__ - blocks. + Returning ``pd.DataFrame`` creates a Tabular dataset as the transformation result: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -329,9 +214,7 @@ by Datasets when constructing its internal blocks: .. tabbed:: pa.Table - Tabular dataset containing - `pyarrow.Table `__ - blocks. + Returning ``pa.Table`` creates a Tabular dataset as the transformation result: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -340,11 +223,7 @@ by Datasets when constructing its internal blocks: .. tabbed:: np.ndarray - Tensor dataset containing - `pyarrow.Table `__ - blocks, representing the tensor using our - :ref:`tensor extension type ` to embed the tensor in - these tables under a single ``"__value__"`` column. + Returning ``np.ndarray`` creates a single-column Tensor dataset as the transformation result: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -353,13 +232,10 @@ by Datasets when constructing its internal blocks: .. tabbed:: Dict[str, np.ndarray] - Tabular dataset containing - `pyarrow.Table `__ - blocks, where each key-value pair treats the key as the column name as the value as - the column tensor. + Returning ``Dict[str, np.ndarray]`` creates a multi-column Tensor dataset as the transformation result. If a column tensor is 1-dimensional, then the native Arrow 1D list - type is used; if a column tensor has 2 or more dimensions, then we use our + type is used; if a column tensor has 2 or more dimensions, then the Dataset :ref:`tensor extension type ` to embed these n-dimensional tensors in the Arrow table. @@ -370,7 +246,7 @@ by Datasets when constructing its internal blocks: .. tabbed:: list - Simple dataset containing Python list blocks. + Returning ``list`` creates a simple Python object dataset as the transformation result: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -382,92 +258,32 @@ by Datasets when constructing its internal blocks: Row UDF Output Types ==================== -The return type of a row UDF will determine the format of the resulting dataset. This -allows you to dynamically convert between data formats during row-based transformations. - -The following details how each **row** UDF (as used in -:meth:`ds.map() `) return type will be interpreted by Datasets -when constructing its internal blocks: +The following output types are allowed for per-row UDFs (e.g., +:meth:`ds.map() `): .. tabbed:: dict - Tabular dataset containing - `pyarrow.Table `__ - blocks, treating the keys as the column names and the values as the column elements. - - If Arrow Table construction fails due to a value type in the dictionary not - being supported by Arrow, then Datasets will fall back to a simple dataset containing - Python list blocks. + Returning a ``dict`` of Arrow-compatible data types creates a Tabular dataset + as the transformation result. If any dict values are not Arrow-compatible, then + a simple Python object dataset will be created: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python :start-after: __writing_dict_out_row_udfs_begin__ :end-before: __writing_dict_out_row_udfs_end__ -.. tabbed:: PandasRow - - Tabular dataset containing - `Pandas DataFrame `__ - blocks. - - A ``PandasRow`` object is a zero-copy row view on an underlying Pandas DataFrame block - that Datasets provides to per-row UDFs (:meth:`ds.map() `) and - returns in the row iterators (:meth:`ds.iter_rows `). - This row view provides a dictionary interface and can be transparently used as a - dictionary. See the :class:`TableRow API ` for more information - on this row view object. - - Note that a ``PandasRow`` is immmutable, so this row mapping cannot be updated - in-place. If wanting to update the row, copy this zero-copy row view into a plain - Python dictionary with :meth:`TableRow.as_pydict() ` - and then mutate and return that dictionary. - - .. literalinclude:: ./doc_code/transforming_datasets.py - :language: python - :start-after: __writing_table_row_out_row_udfs_begin__ - :end-before: __writing_table_row_out_row_udfs_end__ - -.. tabbed:: ArrowRow - - Tabular dataset containing - `pyarrow.Table `__ - blocks. - - An ``ArrowRow`` object is a zero-copy row view on an underlying Arrow Table block - that Datasets provides to per-row UDFs (``ds.map()``) and returns in the row - that Datasets provides to per-row UDFs (:meth:`ds.map() `) and - returns in the row iterators (:meth:`ds.iter_rows `). - This row view provides a dictionary interface and can be transparently used as a - dictionary. See the :class:`TableRow API ` for more information - on this row view object. - - Note that an ``ArrowRow`` is immmutable, so this row mapping cannot be updated - in-place. If wanting to update the row, copy this zero-copy row view into a plain - Python dictionary with :meth:`TableRow.as_pydict() ` - and then mutate and return that dictionary. - - .. literalinclude:: ./doc_code/transforming_datasets.py - :language: python - :start-after: __writing_table_row_out_row_udfs_begin__ - :end-before: __writing_table_row_out_row_udfs_end__ - .. tabbed:: np.ndarray - Tensor dataset containing - `pyarrow.Table `__ - blocks, representing the tensor using our - :ref:`tensor extension type ` to embed the tensor in - these tables under a single ``"__value__"`` column. Each such ``ndarray`` will be - treated as a row in this column. + Returning ``np.ndarray`` creates a single-column Tensor dataset as the transformation result: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python :start-after: __writing_numpy_out_row_udfs_begin__ :end-before: __writing_numpy_out_row_udfs_end__ -.. tabbed:: Any +.. tabbed:: object - All other return row types will result in a simple dataset containing list blocks. + Other return row types will create a simple Python object dataset as the transformation result: .. literalinclude:: ./doc_code/transforming_datasets.py :language: python @@ -486,7 +302,7 @@ used (with ``compute="tasks"``). For transformations that require expensive setu it's preferrable to use Ray actors, which are stateful and allow setup to be reused for efficiency. You can specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your -transforms. For a fixed-size actor pool, just specify ``ActorPoolStrategy(n, n)``. +transforms. For a fixed-size actor pool, specify ``ActorPoolStrategy(n, n)``. The following is an example of using the Ray tasks and actors compute strategy for batch inference: diff --git a/doc/source/data/user-guide.rst b/doc/source/data/user-guide.rst index 1158f40209bf..674b7944efe4 100644 --- a/doc/source/data/user-guide.rst +++ b/doc/source/data/user-guide.rst @@ -5,26 +5,16 @@ User Guides =========== If you’re new to Ray Datasets, we recommend starting with the :ref:`Ray Datasets Quick Start `. -This user guide will help you navigate the Ray Datasets project and show you how achieve several tasks, for instance -you will learn - -- how to load data and preprocess it for machine learning applications, -- how to use Tensors with Ray Datasets, -- how to run Dataset Pipelines in common scenarios, -- and how to tune your Ray Datasets applications for performance. +This user guide will help you navigate the Ray Datasets project and show you how achieve several tasks. .. toctree:: :maxdepth: 2 creating-datasets - saving-datasets transforming-datasets - accessing-datasets - pipelining-compute + consuming-datasets dataset-ml-preprocessing dataset-tensor-support - advanced-pipelines - random-access - custom-data - memory-management + pipelining-compute + dataset-internals performance-tips