diff --git a/doc/source/data/doc_code/transforming_datasets.py b/doc/source/data/doc_code/transforming_datasets.py index cc5151e175f8..946ba5300874 100644 --- a/doc/source/data/doc_code/transforming_datasets.py +++ b/doc/source/data/doc_code/transforming_datasets.py @@ -77,29 +77,80 @@ def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame: # fmt: on # fmt: off -# __writing_native_udfs_begin__ +# __writing_default_udfs_tabular_begin__ import ray import pandas as pd # Load dataset. ds = ray.data.read_csv("example://iris.csv") +print(ds.default_batch_format()) +# # UDF as a function on Pandas DataFrame batches. -def pandas_transform(df: pd.DataFrame) -> pd.DataFrame: +def pandas_transform(df_batch: pd.DataFrame) -> pd.DataFrame: # Filter rows. - df = df[df["variety"] == "Versicolor"] + df_batch = df_batch[df_batch["variety"] == "Versicolor"] # Add derived column. - df["normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max() + # Notice here that `df["sepal.length"].max()` is only the max value of the column + # within a given batch (instead of globally)!! + df_batch.loc[:, "normalized.sepal.length"] = df_batch["sepal.length"] / df_batch["sepal.length"].max() # Drop column. - df = df.drop(columns=["sepal.length"]) - return df + df_batch = df_batch.drop(columns=["sepal.length"]) + return df_batch ds.map_batches(pandas_transform).show(2) # -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4, # 'variety': 'Versicolor', 'normalized.sepal.length': 1.0} # -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5, # 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144} -# __writing_native_udfs_end__ +# __writing_default_udfs_tabular_end__ +# fmt: on + +# fmt: off +# __writing_default_udfs_tensor_begin__ +import ray +import numpy as np + +# Load dataset. +ds = ray.data.range_tensor(1000, shape=(2, 2)) +print(ds.default_batch_format()) +# + +# UDF as a function on NumPy ndarray batches. +def tensor_transform(arr: np.ndarray) -> np.ndarray: + # Notice here that the ndarray is of shape (batch_size, 2, 2) + # Multiply each element in the ndarray by a factor of 2 + return arr * 2 + +ds.map_batches(tensor_transform).show(2) +# [array([[0, 0], +# [0, 0]]), +# array([[2, 2], +# [2, 2]])] + +# __writing_default_udfs_tensor_end__ +# fmt: on + +# fmt: off +# __writing_default_udfs_list_begin__ +import ray + +# Load dataset. +ds = ray.data.range(1000) +print(ds.default_batch_format()) +# + +# UDF as a function on Python list batches. +def list_transform(list) -> list: + # Notice here that the list is of length batch_size + # Multiply each element in the list by a factor of 2 + return [x * 2 for x in list] + +ds.map_batches(list_transform).show(2) +# 0 +# 2 + +# __writing_default_udfs_list_end__ # fmt: on # fmt: off @@ -115,7 +166,7 @@ def pandas_transform(df: pd.DataFrame) -> pd.DataFrame: # Filter rows. df = df[df["variety"] == "Versicolor"] # Add derived column. - df["normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max() + df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max() # Drop column. df = df.drop(columns=["sepal.length"]) return df diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index a90d3c79252a..eaad24fbc102 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -76,9 +76,9 @@ the Iris dataset. .. _transform_datasets_writing_udfs: ------------------------------- -Writing User-defined Functions ------------------------------- +------------------------------------- +Writing User-defined Functions (UDFs) +------------------------------------- User-defined functions (UDFs) are routines that apply on one row (e.g. :meth:`.map() `) or a batch of rows (e.g. @@ -130,16 +130,26 @@ Here is an overview of the available batch formats: This may incur a conversion cost if the underlying Dataset block is not zero-copy convertible from an Arrow table. + .. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __writing_default_udfs_tabular_begin__ + :end-before: __writing_default_udfs_tabular_end__ + * **Tensor Datasets** (single-column): Each batch will be a single `numpy.ndarray `__ containing the single tensor column for this batch. + .. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __writing_default_udfs_tensor_begin__ + :end-before: __writing_default_udfs_tensor_end__ + * **Simple Datasets**: Each batch will be a Python list. - .. literalinclude:: ./doc_code/transforming_datasets.py - :language: python - :start-after: __writing_native_udfs_begin__ - :end-before: __writing_native_udfs_end__ + .. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __writing_default_udfs_list_begin__ + :end-before: __writing_default_udfs_list_end__ .. tabbed:: "pandas" diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4ef2d317ed75..2d17e4c5e44e 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -341,6 +341,12 @@ def map_batches( consider using :class:`~ray.data.preprocessors.BatchMapper`. It's more performant and easier to use. + .. tip:: + + For some standard operations like imputing, encoding or normalization, + one may find directly using :py:class:`~ray.data.preprocessors.Preprocessor` to be + more convenient. + Examples: >>> import pandas as pd