From fadf917b2834de95fca8dbbc77d113e7dfe79ca2 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 3 May 2022 21:53:19 +0000 Subject: [PATCH 01/10] Better tensor support. --- python/ray/data/block.py | 18 ++- python/ray/data/dataset.py | 42 ++++-- python/ray/data/datasource/datasource.py | 12 +- .../ray/data/datasource/numpy_datasource.py | 3 +- python/ray/data/impl/arrow_block.py | 83 ++++++++---- python/ray/data/impl/block_batching.py | 26 ++-- .../ray/data/impl/delegating_block_builder.py | 5 + python/ray/data/impl/pandas_block.py | 56 +++++--- python/ray/data/impl/simple_block.py | 8 +- python/ray/data/impl/table_block.py | 13 +- python/ray/data/read_api.py | 15 +-- python/ray/data/tests/test_dataset.py | 124 ++++++++++++++++-- python/ray/data/tests/test_dataset_formats.py | 43 ++++-- 13 files changed, 343 insertions(+), 105 deletions(-) diff --git a/python/ray/data/block.py b/python/ray/data/block.py index 225baf4cf446..161fec6b1374 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -3,6 +3,7 @@ from typing import ( TypeVar, List, + Dict, Generic, Iterator, Tuple, @@ -210,11 +211,13 @@ def to_pandas(self) -> "pandas.DataFrame": """Convert this block into a Pandas dataframe.""" raise NotImplementedError - def to_numpy(self, column: str = None) -> np.ndarray: - """Convert this block (or column of block) into a NumPy ndarray. + def to_numpy( + self, columns: Optional[Union[str, List[str]]] = None + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """Convert this block (or columns of block) into a NumPy ndarray. Args: - column: Name of column to convert, or None. + columns: Name of columns to convert, or None if converting all columns. """ raise NotImplementedError @@ -274,7 +277,16 @@ def for_block(block: Block) -> "BlockAccessor[T]": from ray.data.impl.arrow_block import ArrowBlockAccessor return ArrowBlockAccessor.from_bytes(block) + elif isinstance(block, np.ndarray): + from ray.data.impl.arrow_block import ArrowBlockAccessor + + return ArrowBlockAccessor.from_numpy(block) elif isinstance(block, list): + if block and all(isinstance(item, np.ndarray) for item in block): + from ray.data.impl.arrow_block import ArrowBlockAccessor + + return ArrowBlockAccessor.from_numpy(block) + from ray.data.impl.simple_block import SimpleBlockAccessor return SimpleBlockAccessor(block) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 417fb503d37e..24c5f4dbb011 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -68,6 +68,7 @@ from ray.data.row import TableRow from ray.data.aggregate import AggregateFn, Sum, Max, Min, Mean, Std from ray.data.random_access_dataset import RandomAccessDataset +from ray.data.impl.table_block import TENSOR_COL_NAME from ray.data.impl.remote_fn import cached_remote_fn from ray.data.impl.block_batching import batch_blocks, BatchType from ray.data.impl.plan import ExecutionPlan, OneToOneStage, AllToAllStage @@ -198,8 +199,8 @@ def map( def transform(block: Block) -> Iterable[Block]: DatasetContext._set_current(context) - block = BlockAccessor.for_block(block) output_buffer = BlockOutputBuffer(None, context.target_max_block_size) + block = BlockAccessor.for_block(block) for row in block.iter_rows(): output_buffer.add(fn(row)) if output_buffer.has_next(): @@ -224,6 +225,9 @@ def map_batches( ) -> "Dataset[Any]": """Apply the given function to batches of records of this dataset. + The format of the data batch provided to ``fn`` can be controlled via the + ``batch_format`` argument, and the output of the UDF can be any batch type. + This is a blocking operation. Examples: @@ -270,10 +274,10 @@ def map_batches( blocks as batches. Defaults to a system-chosen batch size. compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. - batch_format: Specify "native" to use the native block format - (promotes Arrow to pandas), "pandas" to select - ``pandas.DataFrame`` as the batch format, - or "pyarrow" to select ``pyarrow.Table``. + batch_format: Specify "native" to use the native block format (promotes + tabular Arrow to Pandas), "pandas" to select ``pandas.DataFrame``, + "numpy" to select ``numpy.ndarray``, or "pyarrow" to select + ``pyarrow.Table``. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ @@ -302,13 +306,21 @@ def transform(block: Block) -> Iterable[Block]: # bug where we include the entire base view on serialization. view = block.slice(start, end, copy=batch_size is not None) if batch_format == "native": - # Always promote Arrow blocks to pandas for consistency. - if isinstance(view, pa.Table) or isinstance(view, bytes): + if isinstance(view, pa.Table): + if view.column_names == [TENSOR_COL_NAME]: + view = BlockAccessor.for_block(view).to_numpy() + else: + # Always promote non-tensor Arrow blocks to pandas for + # consistency. + view = BlockAccessor.for_block(view).to_pandas() + elif isinstance(view, bytes): view = BlockAccessor.for_block(view).to_pandas() elif batch_format == "pandas": view = BlockAccessor.for_block(view).to_pandas() elif batch_format == "pyarrow": view = BlockAccessor.for_block(view).to_arrow() + elif batch_format == "numpy": + view = BlockAccessor.for_block(view).to_numpy() else: raise ValueError( "The batch format must be one of 'native', 'pandas', " @@ -319,6 +331,7 @@ def transform(block: Block) -> Iterable[Block]: if not ( isinstance(applied, list) or isinstance(applied, pa.Table) + or isinstance(applied, np.ndarray) or isinstance(applied, pd.core.frame.DataFrame) ): raise ValueError( @@ -2037,7 +2050,7 @@ def write_numpy( self, path: str, *, - column: str = "value", + column: str = TENSOR_COL_NAME, filesystem: Optional["pyarrow.fs.FileSystem"] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, @@ -2065,7 +2078,9 @@ def write_numpy( path: The path to the destination root directory, where npy files will be written to. column: The name of the table column that contains the tensor to - be written. This defaults to "value". + be written. The default is the column name that Datasets uses for + storing tensors in single-column tables. See + ``ray.data.impl.arrow_block.TENSOR_COL_NAME`` for the exact name. filesystem: The filesystem implementation to write to. try_create_dir: Try to create all directories in destination path if True. Does nothing if all directories already exist. @@ -2213,8 +2228,8 @@ def iter_batches( batch_size: Record batch size, or None to let the system pick. batch_format: The format in which to return each batch. Specify "native" to use the current block format (promoting - Arrow to pandas automatically), "pandas" to - select ``pandas.DataFrame`` or "pyarrow" to select + Arrow to pandas automatically), "pandas" to select ``pandas.DataFrame``, + "numpy" to select ``numpy.ndarray``, or "pyarrow" to select ``pyarrow.Table``. Default is "native". drop_last: Whether to drop the last batch if it's incomplete. @@ -2737,8 +2752,9 @@ def to_numpy_refs( Time complexity: O(dataset size / parallelism) Args: - column: The name of the column to convert to numpy, or None to - specify the entire row. Required for Arrow tables. + column: The name of the column to convert to numpy, or None to specify the + entire row. If not specified for Arrow or Pandas blocks, each returned + future will represent a dict of column ndarrays. Returns: A list of remote NumPy ndarrays created from this dataset. diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index 15b3471d3581..02dea96bb78e 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -16,6 +16,7 @@ ) from ray.data.context import DatasetContext from ray.data.impl.arrow_block import ArrowRow +from ray.data.impl.table_block import TENSOR_COL_NAME from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder from ray.data.impl.util import _check_pyarrow_version from ray.util.annotations import DeveloperAPI @@ -144,11 +145,14 @@ def __call__(self) -> MaybeBlockPartition: if context.block_splitting_enabled: partition: BlockPartition = [] for block in result: - metadata = BlockAccessor.for_block(block).get_metadata( + accessor = BlockAccessor.for_block(block) + metadata = accessor.get_metadata( input_files=self._metadata.input_files, exec_stats=None ) # No exec stats for the block splits. assert context.block_owner - partition.append((ray.put(block, _owner=context.block_owner), metadata)) + partition.append( + (ray.put(accessor.to_block(), _owner=context.block_owner), metadata) + ) if len(partition) == 0: raise ValueError("Read task must return non-empty list.") return partition @@ -199,7 +203,7 @@ def make_block(start: int, count: int) -> Block: tuple(range(1, 1 + len(tensor_shape))), ) ) - return pa.Table.from_pydict({"value": tensor}) + return pa.Table.from_pydict({TENSOR_COL_NAME: tensor}) else: return list(builtins.range(start, start + count)) @@ -222,7 +226,7 @@ def make_block(start: int, count: int) -> Block: np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape))) ) ) - schema = pa.Table.from_pydict({"value": tensor}).schema + schema = pa.Table.from_pydict({TENSOR_COL_NAME: tensor}).schema elif block_format == "list": schema = int else: diff --git a/python/ray/data/datasource/numpy_datasource.py b/python/ray/data/datasource/numpy_datasource.py index b4883246dfe9..e3d556261152 100644 --- a/python/ray/data/datasource/numpy_datasource.py +++ b/python/ray/data/datasource/numpy_datasource.py @@ -8,6 +8,7 @@ from ray.data.block import BlockAccessor from ray.data.datasource.file_based_datasource import FileBasedDatasource +from ray.data.impl.table_block import TENSOR_COL_NAME class NumpyDatasource(FileBasedDatasource): @@ -34,7 +35,7 @@ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args): buf.write(data) buf.seek(0) return pa.Table.from_pydict( - {"value": TensorArray(np.load(buf, allow_pickle=True))} + {TENSOR_COL_NAME: TensorArray(np.load(buf, allow_pickle=True))} ) def _write_block( diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index 76b986bcce9f..cfb77dbe571c 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -6,6 +6,7 @@ Dict, List, Tuple, + Union, Iterator, Any, TypeVar, @@ -30,7 +31,11 @@ KeyType, ) from ray.data.row import TableRow -from ray.data.impl.table_block import TableBlockAccessor, TableBlockBuilder +from ray.data.impl.table_block import ( + TableBlockAccessor, + TableBlockBuilder, + TENSOR_COL_NAME, +) from ray.data.aggregate import AggregateFn if TYPE_CHECKING: @@ -73,6 +78,13 @@ def __init__(self): super().__init__(pyarrow.Table) def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> Block: + for col_name, col in columns.items(): + if col_name == TENSOR_COL_NAME or isinstance( + next(iter(col), None), np.ndarray + ): + from ray.data.extensions.tensor_extension import ArrowTensorArray + + columns[col_name] = ArrowTensorArray.from_numpy(col) return pyarrow.Table.from_pydict(columns) def _concat_tables(self, tables: List[Block]) -> Block: @@ -89,14 +101,27 @@ def __init__(self, table: "pyarrow.Table"): raise ImportError("Run `pip install pyarrow` for Arrow support") super().__init__(table) - def _create_table_row(self, row: "pyarrow.Table") -> ArrowRow: - return ArrowRow(row) + def _create_table_row(self, row: "pyarrow.Table") -> Union[ArrowRow, np.ndarray]: + if row.column_names == [TENSOR_COL_NAME]: + return row.column(TENSOR_COL_NAME)[0] + else: + return ArrowRow(row) @classmethod def from_bytes(cls, data: bytes): reader = pyarrow.ipc.open_stream(data) return cls(reader.read_all()) + @classmethod + def from_numpy(cls, data: Union[np.ndarray, List[np.ndarray]]): + import pyarrow as pa + from ray.data.extensions.tensor_extension import ArrowTensorArray + + table = pa.Table.from_pydict( + {TENSOR_COL_NAME: ArrowTensorArray.from_numpy(data)} + ) + return cls(table) + def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table": view = self._table.slice(start, end - start) if copy: @@ -113,26 +138,40 @@ def schema(self) -> "pyarrow.lib.Schema": def to_pandas(self) -> "pandas.DataFrame": return self._table.to_pandas() - def to_numpy(self, column: str = None) -> np.ndarray: - if column is None: - raise ValueError( - "`column` must be specified when calling .to_numpy() " - "on Arrow blocks." - ) - if column not in self._table.column_names: - raise ValueError( - f"Cannot find column {column}, available columns: " - f"{self._table.column_names}" - ) - array = self._table[column] - if array.num_chunks > 1: - # TODO(ekl) combine fails since we can't concat - # ArrowTensorType? - array = array.combine_chunks() + def to_numpy( + self, columns: Optional[Union[str, List[str]]] = None + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + if columns is None: + columns = self._table.column_names + if not isinstance(columns, list): + columns = [columns] + for column in columns: + if column not in self._table.column_names: + raise ValueError( + f"Cannot find column {column}, available columns: " + f"{self._table.column_names}" + ) + arrays = [] + for column in columns: + array = self._table[column] + if array.num_chunks == 0: + array = pyarrow.array([], type=array.type) + elif array.num_chunks == 1: + array = array.chunk(0) + elif isinstance(array.chunk(0), pyarrow.ExtensionArray): + # If an extension array, we manually concatenate the underlying storage + # arrays. + chunk = array.chunk(0) + array = type(chunk).from_storage( + chunk.type, + pyarrow.concat_arrays([chunk.storage for chunk in array.chunks]), + ) + arrays.append(array.to_numpy(zero_copy_only=False)) + if len(arrays) == 1: + arrays = arrays[0] else: - assert array.num_chunks == 1, array - array = array.chunk(0) - return array.to_numpy(zero_copy_only=False) + arrays = dict(zip(columns, arrays)) + return arrays def to_arrow(self) -> "pyarrow.Table": return self._table diff --git a/python/ray/data/impl/block_batching.py b/python/ray/data/impl/block_batching.py index 702e7927ebf6..2f76c713a96b 100644 --- a/python/ray/data/impl/block_batching.py +++ b/python/ray/data/impl/block_batching.py @@ -14,6 +14,7 @@ from ray.data.block import Block, BlockAccessor from ray.data.context import DatasetContext from ray.data.impl.batcher import Batcher +from ray.data.impl.table_block import TENSOR_COL_NAME from ray.data.impl.stats import DatasetStats, DatasetPipelineStats from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -93,26 +94,35 @@ def batch_block(block: ObjectRef[Block]): def _format_batch(batch: Block, batch_format: str) -> BatchType: + import pandas as pd import pyarrow as pa if batch_format == "native": # Always promote Arrow blocks to pandas for consistency, since # we lazily convert pandas->Arrow internally for efficiency. - if isinstance(batch, pa.Table) or isinstance(batch, bytes): - batch = BlockAccessor.for_block(batch) - batch = batch.to_pandas() - return batch + if isinstance(batch, pa.Table): + if batch.column_names == [TENSOR_COL_NAME]: + batch = BlockAccessor.for_block(batch).to_numpy() + else: + batch = BlockAccessor.for_block(batch).to_pandas() + elif isinstance(batch, pd.DataFrame) and batch.columns.tolist() == [ + TENSOR_COL_NAME + ]: + batch = BlockAccessor.for_block(batch).to_numpy() + elif isinstance(batch, bytes): + batch = BlockAccessor.for_block(batch).to_pandas() elif batch_format == "pandas": - batch = BlockAccessor.for_block(batch) - return batch.to_pandas() + batch = BlockAccessor.for_block(batch).to_pandas() elif batch_format == "pyarrow": - batch = BlockAccessor.for_block(batch) - return batch.to_arrow() + batch = BlockAccessor.for_block(batch).to_arrow() + elif batch_format == "numpy": + batch = BlockAccessor.for_block(batch).to_numpy() else: raise ValueError( f"The given batch format: {batch_format} " f"is invalid. Supported batch type: {BatchType}" ) + return batch def _sliding_window(iterable: Iterable, n: int): diff --git a/python/ray/data/impl/delegating_block_builder.py b/python/ray/data/impl/delegating_block_builder.py index b66598b601e7..b35838136032 100644 --- a/python/ray/data/impl/delegating_block_builder.py +++ b/python/ray/data/impl/delegating_block_builder.py @@ -1,5 +1,7 @@ from typing import Any +import numpy as np + from ray.data.block import Block, T, BlockAccessor from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder @@ -26,6 +28,8 @@ def add(self, item: Any) -> None: self._builder = ArrowBlockBuilder() except (TypeError, pyarrow.lib.ArrowInvalid): self._builder = SimpleBlockBuilder() + elif isinstance(item, np.ndarray): + self._builder = ArrowBlockBuilder() elif isinstance(item, PandasRow): self._builder = PandasBlockBuilder() else: @@ -34,6 +38,7 @@ def add(self, item: Any) -> None: def add_block(self, block: Block) -> None: accessor = BlockAccessor.for_block(block) + block = accessor.to_block() if accessor.num_rows() == 0: # Don't infer types of empty lists. Store the block and use it if no # other data is added. https://github.com/ray-project/ray/issues/20290 diff --git a/python/ray/data/impl/pandas_block.py b/python/ray/data/impl/pandas_block.py index 07bbdc1eb50a..11520e021bce 100644 --- a/python/ray/data/impl/pandas_block.py +++ b/python/ray/data/impl/pandas_block.py @@ -3,6 +3,7 @@ Dict, List, Tuple, + Union, Iterator, Any, TypeVar, @@ -14,8 +15,13 @@ import numpy as np from ray.data.block import BlockAccessor, BlockMetadata, KeyFn, U +from ray.data.extensions.tensor_extension import TensorArray from ray.data.row import TableRow -from ray.data.impl.table_block import TableBlockAccessor, TableBlockBuilder +from ray.data.impl.table_block import ( + TableBlockAccessor, + TableBlockBuilder, + TENSOR_COL_NAME, +) from ray.data.impl.arrow_block import ArrowBlockAccessor from ray.data.aggregate import AggregateFn @@ -71,6 +77,13 @@ def __init__(self): def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> "pandas.DataFrame": pandas = lazy_import_pandas() + for key, value in columns.items(): + if key == TENSOR_COL_NAME or isinstance( + next(iter(value), None), np.ndarray + ): + if len(value) == 1: + value = value[0] + columns[key] = TensorArray(value) return pandas.DataFrame(columns) def _concat_tables(self, tables: List["pandas.DataFrame"]) -> "pandas.DataFrame": @@ -92,8 +105,13 @@ class PandasBlockAccessor(TableBlockAccessor): def __init__(self, table: "pandas.DataFrame"): super().__init__(table) - def _create_table_row(self, row: "pandas.DataFrame") -> PandasRow: - return PandasRow(row) + def _create_table_row( + self, row: "pandas.DataFrame" + ) -> Union[PandasRow, np.ndarray]: + if row.columns.tolist() == [TENSOR_COL_NAME]: + return row[TENSOR_COL_NAME][0] + else: + return PandasRow(row) def slice(self, start: int, end: int, copy: bool) -> "pandas.DataFrame": view = self._table[start:end] @@ -122,19 +140,27 @@ def schema(self) -> PandasBlockSchema: def to_pandas(self) -> "pandas.DataFrame": return self._table - def to_numpy(self, column: str = None) -> np.ndarray: - if not column: - raise ValueError( - "`column` must be specified when calling .to_numpy() " - "on Pandas blocks." - ) - if column not in self._table.columns: - raise ValueError( - "Cannot find column {}, available columns: {}".format( - column, self._table.columns.tolist() + def to_numpy( + self, columns: Optional[Union[str, List[str]]] = None + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + if columns is None: + columns = self._table.columns.tolist() + if not isinstance(columns, list): + columns = [columns] + for column in columns: + if column not in self._table.columns: + raise ValueError( + f"Cannot find column {column}, available columns: " + f"{self._table.columns.tolist()}" ) - ) - return self._table[column].to_numpy() + arrays = [] + for column in columns: + arrays.append(self._table[column].to_numpy()) + if len(arrays) == 1: + arrays = arrays[0] + else: + arrays = dict(zip(columns, arrays)) + return arrays def to_arrow(self) -> "pyarrow.Table": import pyarrow diff --git a/python/ray/data/impl/simple_block.py b/python/ray/data/impl/simple_block.py index d952583a3cd2..05acbf0fc00c 100644 --- a/python/ray/data/impl/simple_block.py +++ b/python/ray/data/impl/simple_block.py @@ -1,7 +1,7 @@ import random import sys import heapq -from typing import Callable, Iterator, List, Tuple, Any, Optional, TYPE_CHECKING +from typing import Union, Callable, Iterator, List, Tuple, Any, Optional, TYPE_CHECKING import numpy as np @@ -84,9 +84,9 @@ def to_pandas(self) -> "pandas.DataFrame": return pandas.DataFrame({"value": self._items}) - def to_numpy(self, column: str = None) -> np.ndarray: - if column: - raise ValueError("`column` arg not supported for list block") + def to_numpy(self, columns: Optional[Union[str, List[str]]] = None) -> np.ndarray: + if columns: + raise ValueError("`columns` arg is not supported for list block.") return np.array(self._items) def to_arrow(self) -> "pyarrow.Table": diff --git a/python/ray/data/impl/table_block.py b/python/ray/data/impl/table_block.py index ad760de8527d..c5135c9a21fd 100644 --- a/python/ray/data/impl/table_block.py +++ b/python/ray/data/impl/table_block.py @@ -1,7 +1,8 @@ import collections - from typing import Dict, Iterator, List, Union, Any, TypeVar, TYPE_CHECKING +import numpy as np + from ray.data.block import Block, BlockAccessor from ray.data.row import TableRow from ray.data.impl.block_builder import BlockBuilder @@ -10,6 +11,9 @@ if TYPE_CHECKING: from ray.data.impl.sort import SortKeyT + +TENSOR_COL_NAME = "__RAY_TC__" + T = TypeVar("T") # The max size of Python tuples to buffer before compacting them into a @@ -30,9 +34,11 @@ def __init__(self, block_type): self._num_compactions = 0 self._block_type = block_type - def add(self, item: Union[dict, TableRow]) -> None: + def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: if isinstance(item, TableRow): item = item.as_pydict() + elif isinstance(item, np.ndarray): + item = {TENSOR_COL_NAME: item} if not isinstance(item, dict): raise ValueError( "Returned elements of an TableBlock must be of type `dict`, " @@ -53,6 +59,7 @@ def add_block(self, block: Any) -> None: f"{block}" ) accessor = BlockAccessor.for_block(block) + block = accessor.to_block() self._tables.append(block) self._tables_size_bytes += accessor.size_bytes() self._num_rows += accessor.num_rows() @@ -109,7 +116,7 @@ def _create_table_row(self, row: Any) -> TableRow: def to_block(self) -> Block: return self._table - def iter_rows(self) -> Iterator[TableRow]: + def iter_rows(self) -> Iterator[Union[TableRow, np.ndarray]]: outer = self class Iter: diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index a2a67c49344d..160edac63e69 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -173,10 +173,10 @@ def range_tensor( >>> import ray >>> ds = ray.data.range_tensor(1000, shape=(3, 10)) # doctest: +SKIP >>> ds.map_batches( # doctest: +SKIP - ... lambda arr: arr * 2, batch_format="pandas").show() + ... lambda arr: arr * 2, batch_format="numpy").show() This is similar to range_table(), but uses the ArrowTensorArray extension - type. The dataset elements take the form {"value": array(N, shape=shape)}. + type. The dataset elements take the form {TENSOR_COL_NAME: array(N, shape=shape)}. Args: n: The upper bound of the range of integer records. @@ -1019,15 +1019,10 @@ def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: def _ndarray_to_block(ndarray: np.ndarray) -> Block[np.ndarray]: stats = BlockExecStats.builder() - import pyarrow as pa - from ray.data.extensions import TensorArray - - table = pa.Table.from_pydict({"value": TensorArray(ndarray)}) + accessor = BlockAccessor.for_block(ndarray) return ( - table, - BlockAccessor.for_block(table).get_metadata( - input_files=None, exec_stats=stats.build() - ), + accessor.to_block(), + accessor.get_metadata(input_files=None, exec_stats=stats.build()), ) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 78f5928e4d69..451495c51d20 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -22,6 +22,7 @@ from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.lazy_block_list import LazyBlockList from ray.data.impl.pandas_block import PandasRow +from ray.data.impl.table_block import TENSOR_COL_NAME from ray.data.aggregate import AggregateFn, Count, Sum, Min, Max, Mean, Std from ray.data.extensions.tensor_extension import ( TensorArray, @@ -443,21 +444,122 @@ def test_range_table(ray_start_regular_shared): assert ds.take() == [{"value": i} for i in range(10)] -def test_tensors(ray_start_regular_shared): +def test_tensors_basic(ray_start_regular_shared): # Create directly. - ds = ray.data.range_tensor(5, shape=(3, 5)) + tensor_shape = (3, 5) + ds = ray.data.range_tensor(6, shape=tensor_shape) assert str(ds) == ( - "Dataset(num_blocks=5, num_rows=5, " - "schema={value: })" + "Dataset(num_blocks=6, num_rows=6, " + f"schema={{{TENSOR_COL_NAME}: }})" ) + # Test row iterator yields tensors. + for tensor in ds.iter_rows(): + assert isinstance(tensor, np.ndarray) + assert tensor.shape == tensor_shape + + # Test batch iterator yields tensors. + for tensor in ds.iter_batches(batch_size=2): + assert isinstance(tensor, np.ndarray) + assert tensor.shape == (2,) + tensor_shape + + # Native format. + def np_mapper(arr): + assert isinstance(arr, np.ndarray) + return arr + 1 + + res = ray.data.range_tensor(2, shape=(2, 2)).map(np_mapper).take() + np.testing.assert_equal(res, [np.ones((2, 2)), 2 * np.ones((2, 2))]) + + # Explicit NumPy format. + res = ( + ray.data.range_tensor(2, shape=(2, 2)) + .map_batches(np_mapper, batch_format="numpy") + .take() + ) + np.testing.assert_equal(res, [np.ones((2, 2)), 2 * np.ones((2, 2))]) + # Pandas conversion. + def pd_mapper(df): + assert isinstance(df, pd.DataFrame) + return df + 2 + + res = ray.data.range_tensor(2).map_batches(pd_mapper, batch_format="pandas").take() + np.testing.assert_equal(res, [np.array([2]), np.array([3])]) + + # Arrow columns in NumPy format. + def mapper(col_arrs): + assert all(isinstance(col_arr, np.ndarray) for col_arr in col_arrs) + return pa.table({"a": col_arrs[0] + 1, "b": col_arrs[1] + 1}) + + t = pa.table({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]}) + res = ( + ray.data.from_arrow(t) + .map_batches(mapper, batch_size=2, batch_format="numpy") + .take() + ) + assert res == [{"a": 2, "b": 5.0}, {"a": 3, "b": 6.0}, {"a": 4, "b": 7.0}] + + # Pandas columns in NumPy format. + def mapper(col_arrs): + assert all(isinstance(col_arr, np.ndarray) for col_arr in col_arrs) + return pd.DataFrame({"a": col_arrs[0] + 1, "b": col_arrs[1] + 1}) + + df = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]}) + res = ( + ray.data.from_pandas(df) + .map_batches(mapper, batch_size=2, batch_format="numpy") + .take() + ) + assert res == [{"a": 2, "b": 5.0}, {"a": 3, "b": 6.0}, {"a": 4, "b": 7.0}] + + # Simple dataset in NumPy format. + def mapper(arr): + arr = np_mapper(arr) + return arr.tolist() + res = ( - ray.data.range_tensor(10) - .map_batches(lambda t: t + 2, batch_format="pandas") - .take(2) + ray.data.range(10, parallelism=2) + .map_batches(mapper, batch_format="numpy") + .take() + ) + assert res == list(range(1, 11)) + + +def test_tensors_inferred_from_map(ray_start_regular_shared): + # Test map. + ds = ray.data.range(10).map(lambda _: np.ones((4, 4))) + assert str(ds) == ( + "Dataset(num_blocks=10, num_rows=10, " + f"schema={{{TENSOR_COL_NAME}: }})" + ) + + # Test map_batches. + + # - Test top-level ndarray. + ds = ray.data.range(16, parallelism=4).map_batches( + lambda _: np.ones((3, 4, 4)), batch_size=2 + ) + assert str(ds) == ( + "Dataset(num_blocks=4, num_rows=24, " + f"schema={{{TENSOR_COL_NAME}: }})" + ) + + # - Test list of ndarrays. + ds = ray.data.range(16, parallelism=4).map_batches( + lambda _: [np.ones((4, 4)), np.ones((4, 4))], batch_size=2 + ) + assert str(ds) == ( + "Dataset(num_blocks=4, num_rows=16, " + f"schema={{{TENSOR_COL_NAME}: }})" + ) + + # Test flat_map. + ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))]) + assert str(ds) == ( + "Dataset(num_blocks=10, num_rows=20, " + f"schema={{{TENSOR_COL_NAME}: }})" ) - assert str(res) == "[{'value': array([2])}, {'value': array([3])}]" def test_tensor_array_ops(ray_start_regular_shared): @@ -1466,6 +1568,12 @@ def test_iter_batches_basic(ray_start_regular_shared): assert isinstance(batch, pa.Table) assert batch.equals(pa.Table.from_pandas(df)) + # NumPy format. + for batch, df in zip(ds.iter_batches(batch_format="numpy"), dfs): + assert isinstance(batch, list) + assert all(isinstance(col, np.ndarray) for col in batch) + np.testing.assert_equal(batch, [col.to_numpy() for _, col in df.items()]) + # blocks format. for batch, df in zip(ds.iter_batches(batch_format="native"), dfs): assert BlockAccessor.for_block(batch).to_pandas().equals(df) diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index 9d4097605585..66c555f0c427 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -33,6 +33,7 @@ WriteResult, ) from ray.data.impl.arrow_block import ArrowRow +from ray.data.impl.table_block import TENSOR_COL_NAME from ray.data.datasource.file_based_datasource import _unwrap_protocol from ray.data.datasource.parquet_datasource import PARALLELIZE_META_FETCH_THRESHOLD from ray.data.tests.conftest import * # noqa @@ -121,7 +122,7 @@ def test_from_numpy(ray_start_regular_shared, from_ref): ds = ray.data.from_numpy_refs([ray.put(arr) for arr in arrs]) else: ds = ray.data.from_numpy(arrs) - values = np.stack([x["value"] for x in ds.take(8)]) + values = np.stack(ds.take(8)) np.testing.assert_array_equal(values, np.concatenate((arr1, arr2))) # Test from single NumPy ndarray. @@ -129,7 +130,7 @@ def test_from_numpy(ray_start_regular_shared, from_ref): ds = ray.data.from_numpy_refs(ray.put(arr1)) else: ds = ray.data.from_numpy(arr1) - values = np.stack([x["value"] for x in ds.take(4)]) + values = np.stack(ds.take(4)) np.testing.assert_array_equal(values, arr1) @@ -197,14 +198,28 @@ def test_to_numpy_refs(ray_start_regular_shared): # Tensor Dataset ds = ray.data.range_tensor(10, parallelism=2) - arr = np.concatenate(ray.get(ds.to_numpy_refs(column="value"))) + arr = np.concatenate(ray.get(ds.to_numpy_refs())) np.testing.assert_equal(arr, np.expand_dims(np.arange(0, 10), 1)) # Table Dataset ds = ray.data.range_table(10) - arr = np.concatenate(ray.get(ds.to_numpy_refs(column="value"))) + arr = np.concatenate(ray.get(ds.to_numpy_refs())) np.testing.assert_equal(arr, np.arange(0, 10)) + # Test multi-column Arrow dataset. + ds = ray.data.from_arrow(pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})) + arrs = ray.get(ds.to_numpy_refs()) + np.testing.assert_equal( + arrs, [{"a": np.array([1, 2, 3]), "b": np.array([4, 5, 6])}] + ) + + # Test multi-column Pandas dataset. + ds = ray.data.from_pandas(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})) + arrs = ray.get(ds.to_numpy_refs()) + np.testing.assert_equal( + arrs, [{"a": np.array([1, 2, 3]), "b": np.array([4, 5, 6])}] + ) + def test_to_arrow_refs(ray_start_regular_shared): n = 5 @@ -998,9 +1013,9 @@ def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path): ds = ray.data.read_numpy(data_path, filesystem=fs) assert str(ds) == ( "Dataset(num_blocks=2, num_rows=None, " - "schema={value: })" + f"schema={{{TENSOR_COL_NAME}: }})" ) - assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]" + np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) def test_numpy_read(ray_start_regular_shared, tmp_path): @@ -1010,9 +1025,9 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - "schema={value: })" + f"schema={{{TENSOR_COL_NAME}: }})" ) - assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]" + np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): @@ -1023,9 +1038,9 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider()) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - "schema={value: })" + f"schema={{{TENSOR_COL_NAME}: }})" ) - assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]" + np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) with pytest.raises(NotImplementedError): ray.data.read_binary_files( @@ -1077,10 +1092,10 @@ def skip_unpartitioned(kv_dict): ds = ray.data.read_numpy(base_dir, partition_filter=partition_path_filter) vals = [[1, 0], [1, 1], [1, 2], [3, 3], [3, 4], [3, 5]] - val_str = "".join([f"{{'value': array({v}, dtype=int8)}}, " for v in vals])[:-2] + val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2] assert_base_partitioned_ds( ds, - schema="{value: }", + schema=f"{{{TENSOR_COL_NAME}: }}", sorted_values=f"[[{val_str}]]", ds_take_transform_fn=lambda taken: [taken], sorted_values_transform_fn=lambda sorted_values: str(sorted_values), @@ -1119,7 +1134,7 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): assert len(arr2) == 5 assert arr1.sum() == 10 assert arr2.sum() == 35 - assert str(ds.take(1)) == "[{'value': array([0])}]" + np.testing.assert_equal(ds.take(1), [np.array([0])]) @pytest.mark.parametrize( @@ -1158,7 +1173,7 @@ def test_numpy_write_block_path_provider( assert len(arr2) == 5 assert arr1.sum() == 10 assert arr2.sum() == 35 - assert str(ds.take(1)) == "[{'value': array([0])}]" + np.testing.assert_equal(ds.take(1), [np.array([0])]) def test_read_text(ray_start_regular_shared, tmp_path): From 94ecc45bef156fe8639416d9eaff300e614da7a3 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 17 May 2022 00:26:44 +0000 Subject: [PATCH 02/10] __RAY_TC__ --> __value__; refactor native format path. --- python/ray/data/block.py | 4 +++ python/ray/data/dataset.py | 30 +++++++---------- python/ray/data/datasource/datasource.py | 6 ++-- .../ray/data/datasource/numpy_datasource.py | 4 +-- python/ray/data/impl/arrow_block.py | 15 ++++----- python/ray/data/impl/block_batching.py | 18 +--------- python/ray/data/impl/pandas_block.py | 17 ++++------ python/ray/data/impl/table_block.py | 33 +++++++++++++++---- python/ray/data/random_access_dataset.py | 2 +- python/ray/data/read_api.py | 2 +- python/ray/data/tests/test_dataset.py | 12 +++---- python/ray/data/tests/test_dataset_formats.py | 10 +++--- 12 files changed, 73 insertions(+), 80 deletions(-) diff --git a/python/ray/data/block.py b/python/ray/data/block.py index 161fec6b1374..d96879147cf9 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -229,6 +229,10 @@ def to_block(self) -> Block: """Return the base block that this accessor wraps.""" raise NotImplementedError + def to_native(self) -> Block: + """Return the native data format for this accessor.""" + return self.to_block() + def size_bytes(self) -> int: """Return the approximate size in bytes of this block.""" raise NotImplementedError diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 24c5f4dbb011..6b5c2772f998 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -68,7 +68,7 @@ from ray.data.row import TableRow from ray.data.aggregate import AggregateFn, Sum, Max, Min, Mean, Std from ray.data.random_access_dataset import RandomAccessDataset -from ray.data.impl.table_block import TENSOR_COL_NAME +from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.impl.remote_fn import cached_remote_fn from ray.data.impl.block_batching import batch_blocks, BatchType from ray.data.impl.plan import ExecutionPlan, OneToOneStage, AllToAllStage @@ -275,9 +275,9 @@ def map_batches( compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. batch_format: Specify "native" to use the native block format (promotes - tabular Arrow to Pandas), "pandas" to select ``pandas.DataFrame``, - "numpy" to select ``numpy.ndarray``, or "pyarrow" to select - ``pyarrow.Table``. + tables to Pandas and tensors to NumPy), "pandas" to select + ``pandas.DataFrame``, "numpy" to select ``numpy.ndarray``, + or "pyarrow" to select `pyarrow.Table``. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ @@ -306,15 +306,7 @@ def transform(block: Block) -> Iterable[Block]: # bug where we include the entire base view on serialization. view = block.slice(start, end, copy=batch_size is not None) if batch_format == "native": - if isinstance(view, pa.Table): - if view.column_names == [TENSOR_COL_NAME]: - view = BlockAccessor.for_block(view).to_numpy() - else: - # Always promote non-tensor Arrow blocks to pandas for - # consistency. - view = BlockAccessor.for_block(view).to_pandas() - elif isinstance(view, bytes): - view = BlockAccessor.for_block(view).to_pandas() + view = BlockAccessor.for_block(view).to_native() elif batch_format == "pandas": view = BlockAccessor.for_block(view).to_pandas() elif batch_format == "pyarrow": @@ -2050,7 +2042,7 @@ def write_numpy( self, path: str, *, - column: str = TENSOR_COL_NAME, + column: str = VALUE_COL_NAME, filesystem: Optional["pyarrow.fs.FileSystem"] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, @@ -2080,7 +2072,7 @@ def write_numpy( column: The name of the table column that contains the tensor to be written. The default is the column name that Datasets uses for storing tensors in single-column tables. See - ``ray.data.impl.arrow_block.TENSOR_COL_NAME`` for the exact name. + ``ray.data.impl.arrow_block.VALUE_COL_NAME`` for the exact name. filesystem: The filesystem implementation to write to. try_create_dir: Try to create all directories in destination path if True. Does nothing if all directories already exist. @@ -2227,10 +2219,10 @@ def iter_batches( current block during the scan. batch_size: Record batch size, or None to let the system pick. batch_format: The format in which to return each batch. - Specify "native" to use the current block format (promoting - Arrow to pandas automatically), "pandas" to select ``pandas.DataFrame``, - "numpy" to select ``numpy.ndarray``, or "pyarrow" to select - ``pyarrow.Table``. Default is "native". + Specify "native" to use the native block format (promoting + tables to Pandas and tensors to NumPy), "pandas" to select + ``pandas.DataFrame``, "numpy" to select ``numpy.ndarray``, or "pyarrow" + to select ``pyarrow.Table``. Default is "native". drop_last: Whether to drop the last batch if it's incomplete. Returns: diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index 02dea96bb78e..3e904c6f6d7f 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -16,7 +16,7 @@ ) from ray.data.context import DatasetContext from ray.data.impl.arrow_block import ArrowRow -from ray.data.impl.table_block import TENSOR_COL_NAME +from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder from ray.data.impl.util import _check_pyarrow_version from ray.util.annotations import DeveloperAPI @@ -203,7 +203,7 @@ def make_block(start: int, count: int) -> Block: tuple(range(1, 1 + len(tensor_shape))), ) ) - return pa.Table.from_pydict({TENSOR_COL_NAME: tensor}) + return pa.Table.from_pydict({VALUE_COL_NAME: tensor}) else: return list(builtins.range(start, start + count)) @@ -226,7 +226,7 @@ def make_block(start: int, count: int) -> Block: np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape))) ) ) - schema = pa.Table.from_pydict({TENSOR_COL_NAME: tensor}).schema + schema = pa.Table.from_pydict({VALUE_COL_NAME: tensor}).schema elif block_format == "list": schema = int else: diff --git a/python/ray/data/datasource/numpy_datasource.py b/python/ray/data/datasource/numpy_datasource.py index e3d556261152..e1fd9ba5fa7f 100644 --- a/python/ray/data/datasource/numpy_datasource.py +++ b/python/ray/data/datasource/numpy_datasource.py @@ -8,7 +8,7 @@ from ray.data.block import BlockAccessor from ray.data.datasource.file_based_datasource import FileBasedDatasource -from ray.data.impl.table_block import TENSOR_COL_NAME +from ray.data.impl.table_block import VALUE_COL_NAME class NumpyDatasource(FileBasedDatasource): @@ -35,7 +35,7 @@ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args): buf.write(data) buf.seek(0) return pa.Table.from_pydict( - {TENSOR_COL_NAME: TensorArray(np.load(buf, allow_pickle=True))} + {VALUE_COL_NAME: TensorArray(np.load(buf, allow_pickle=True))} ) def _write_block( diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index cfb77dbe571c..ca5021a6605d 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -34,7 +34,7 @@ from ray.data.impl.table_block import ( TableBlockAccessor, TableBlockBuilder, - TENSOR_COL_NAME, + VALUE_COL_NAME, ) from ray.data.aggregate import AggregateFn @@ -79,7 +79,7 @@ def __init__(self): def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> Block: for col_name, col in columns.items(): - if col_name == TENSOR_COL_NAME or isinstance( + if col_name == VALUE_COL_NAME or isinstance( next(iter(col), None), np.ndarray ): from ray.data.extensions.tensor_extension import ArrowTensorArray @@ -96,16 +96,15 @@ def _empty_table() -> "pyarrow.Table": class ArrowBlockAccessor(TableBlockAccessor): + ROW_TYPE = ArrowRow + def __init__(self, table: "pyarrow.Table"): if pyarrow is None: raise ImportError("Run `pip install pyarrow` for Arrow support") super().__init__(table) - def _create_table_row(self, row: "pyarrow.Table") -> Union[ArrowRow, np.ndarray]: - if row.column_names == [TENSOR_COL_NAME]: - return row.column(TENSOR_COL_NAME)[0] - else: - return ArrowRow(row) + def column_names(self) -> List[str]: + return self._table.column_names @classmethod def from_bytes(cls, data: bytes): @@ -118,7 +117,7 @@ def from_numpy(cls, data: Union[np.ndarray, List[np.ndarray]]): from ray.data.extensions.tensor_extension import ArrowTensorArray table = pa.Table.from_pydict( - {TENSOR_COL_NAME: ArrowTensorArray.from_numpy(data)} + {VALUE_COL_NAME: ArrowTensorArray.from_numpy(data)} ) return cls(table) diff --git a/python/ray/data/impl/block_batching.py b/python/ray/data/impl/block_batching.py index 2f76c713a96b..4cd3660c99ab 100644 --- a/python/ray/data/impl/block_batching.py +++ b/python/ray/data/impl/block_batching.py @@ -14,7 +14,6 @@ from ray.data.block import Block, BlockAccessor from ray.data.context import DatasetContext from ray.data.impl.batcher import Batcher -from ray.data.impl.table_block import TENSOR_COL_NAME from ray.data.impl.stats import DatasetStats, DatasetPipelineStats from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -94,23 +93,8 @@ def batch_block(block: ObjectRef[Block]): def _format_batch(batch: Block, batch_format: str) -> BatchType: - import pandas as pd - import pyarrow as pa - if batch_format == "native": - # Always promote Arrow blocks to pandas for consistency, since - # we lazily convert pandas->Arrow internally for efficiency. - if isinstance(batch, pa.Table): - if batch.column_names == [TENSOR_COL_NAME]: - batch = BlockAccessor.for_block(batch).to_numpy() - else: - batch = BlockAccessor.for_block(batch).to_pandas() - elif isinstance(batch, pd.DataFrame) and batch.columns.tolist() == [ - TENSOR_COL_NAME - ]: - batch = BlockAccessor.for_block(batch).to_numpy() - elif isinstance(batch, bytes): - batch = BlockAccessor.for_block(batch).to_pandas() + batch = BlockAccessor.for_block(batch).to_native() elif batch_format == "pandas": batch = BlockAccessor.for_block(batch).to_pandas() elif batch_format == "pyarrow": diff --git a/python/ray/data/impl/pandas_block.py b/python/ray/data/impl/pandas_block.py index 11520e021bce..cf311798e6bb 100644 --- a/python/ray/data/impl/pandas_block.py +++ b/python/ray/data/impl/pandas_block.py @@ -20,7 +20,7 @@ from ray.data.impl.table_block import ( TableBlockAccessor, TableBlockBuilder, - TENSOR_COL_NAME, + VALUE_COL_NAME, ) from ray.data.impl.arrow_block import ArrowBlockAccessor from ray.data.aggregate import AggregateFn @@ -78,9 +78,7 @@ def __init__(self): def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> "pandas.DataFrame": pandas = lazy_import_pandas() for key, value in columns.items(): - if key == TENSOR_COL_NAME or isinstance( - next(iter(value), None), np.ndarray - ): + if key == VALUE_COL_NAME or isinstance(next(iter(value), None), np.ndarray): if len(value) == 1: value = value[0] columns[key] = TensorArray(value) @@ -102,16 +100,13 @@ def _empty_table() -> "pandas.DataFrame": class PandasBlockAccessor(TableBlockAccessor): + ROW_TYPE = PandasRow + def __init__(self, table: "pandas.DataFrame"): super().__init__(table) - def _create_table_row( - self, row: "pandas.DataFrame" - ) -> Union[PandasRow, np.ndarray]: - if row.columns.tolist() == [TENSOR_COL_NAME]: - return row[TENSOR_COL_NAME][0] - else: - return PandasRow(row) + def column_names(self) -> List[str]: + return self._table.columns.tolist() def slice(self, start: int, end: int, copy: bool) -> "pandas.DataFrame": view = self._table[start:end] diff --git a/python/ray/data/impl/table_block.py b/python/ray/data/impl/table_block.py index c5135c9a21fd..155d816669cd 100644 --- a/python/ray/data/impl/table_block.py +++ b/python/ray/data/impl/table_block.py @@ -12,7 +12,7 @@ from ray.data.impl.sort import SortKeyT -TENSOR_COL_NAME = "__RAY_TC__" +VALUE_COL_NAME = "__value__" T = TypeVar("T") @@ -38,7 +38,7 @@ def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: if isinstance(item, TableRow): item = item.as_pydict() elif isinstance(item, np.ndarray): - item = {TENSOR_COL_NAME: item} + item = {VALUE_COL_NAME: item} if not isinstance(item, dict): raise ValueError( "Returned elements of an TableBlock must be of type `dict`, " @@ -107,15 +107,37 @@ def _compact_if_needed(self) -> None: class TableBlockAccessor(BlockAccessor): + ROW_TYPE: TableRow = TableRow + def __init__(self, table: Any): self._table = table - def _create_table_row(self, row: Any) -> TableRow: + def _get_row(self, index: int, copy: bool = False) -> Union[TableRow, np.ndarray]: + row = self.slice(index, index + 1, copy=copy) + if self.is_tensor_wrapper(): + row = row[VALUE_COL_NAME][0] + else: + row = self.ROW_TYPE(row) + return row + + def to_native(self) -> Block: + if self.is_tensor_wrapper(): + native = self.to_numpy() + else: + # Always promote Arrow blocks to pandas for consistency, since + # we lazily convert pandas->Arrow internally for efficiency. + native = self.to_pandas() + return native + + def column_names(self) -> List[str]: raise NotImplementedError def to_block(self) -> Block: return self._table + def is_tensor_wrapper(self) -> bool: + return self.column_names() == [VALUE_COL_NAME] + def iter_rows(self) -> Iterator[Union[TableRow, np.ndarray]]: outer = self @@ -129,10 +151,7 @@ def __iter__(self): def __next__(self): self._cur += 1 if self._cur < outer.num_rows(): - row = outer._create_table_row( - outer.slice(self._cur, self._cur + 1, copy=False) - ) - return row + return outer._get_row(self._cur) raise StopIteration return Iter() diff --git a/python/ray/data/random_access_dataset.py b/python/ray/data/random_access_dataset.py index 018f69485ca4..82339f5663f5 100644 --- a/python/ray/data/random_access_dataset.py +++ b/python/ray/data/random_access_dataset.py @@ -254,7 +254,7 @@ def _get(self, block_index, key): if i is None: return None acc = BlockAccessor.for_block(block) - return acc._create_table_row(acc.slice(i, i + 1, copy=True)) + return acc._get_row(i, copy=True) def _binary_search_find(column, x): diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 160edac63e69..8213db4faa36 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -176,7 +176,7 @@ def range_tensor( ... lambda arr: arr * 2, batch_format="numpy").show() This is similar to range_table(), but uses the ArrowTensorArray extension - type. The dataset elements take the form {TENSOR_COL_NAME: array(N, shape=shape)}. + type. The dataset elements take the form {VALUE_COL_NAME: array(N, shape=shape)}. Args: n: The upper bound of the range of integer records. diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 451495c51d20..a5c83bd47491 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -22,7 +22,7 @@ from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.lazy_block_list import LazyBlockList from ray.data.impl.pandas_block import PandasRow -from ray.data.impl.table_block import TENSOR_COL_NAME +from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.aggregate import AggregateFn, Count, Sum, Min, Max, Mean, Std from ray.data.extensions.tensor_extension import ( TensorArray, @@ -450,7 +450,7 @@ def test_tensors_basic(ray_start_regular_shared): ds = ray.data.range_tensor(6, shape=tensor_shape) assert str(ds) == ( "Dataset(num_blocks=6, num_rows=6, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) # Test row iterator yields tensors. @@ -531,7 +531,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(10).map(lambda _: np.ones((4, 4))) assert str(ds) == ( "Dataset(num_blocks=10, num_rows=10, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) # Test map_batches. @@ -542,7 +542,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) assert str(ds) == ( "Dataset(num_blocks=4, num_rows=24, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) # - Test list of ndarrays. @@ -551,14 +551,14 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) assert str(ds) == ( "Dataset(num_blocks=4, num_rows=16, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) # Test flat_map. ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))]) assert str(ds) == ( "Dataset(num_blocks=10, num_rows=20, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index 66c555f0c427..eef12552e99a 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -33,7 +33,7 @@ WriteResult, ) from ray.data.impl.arrow_block import ArrowRow -from ray.data.impl.table_block import TENSOR_COL_NAME +from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.datasource.file_based_datasource import _unwrap_protocol from ray.data.datasource.parquet_datasource import PARALLELIZE_META_FETCH_THRESHOLD from ray.data.tests.conftest import * # noqa @@ -1013,7 +1013,7 @@ def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path): ds = ray.data.read_numpy(data_path, filesystem=fs) assert str(ds) == ( "Dataset(num_blocks=2, num_rows=None, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) @@ -1025,7 +1025,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) @@ -1038,7 +1038,7 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider()) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - f"schema={{{TENSOR_COL_NAME}: }})" + f"schema={{{VALUE_COL_NAME}: }})" ) np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) @@ -1095,7 +1095,7 @@ def skip_unpartitioned(kv_dict): val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2] assert_base_partitioned_ds( ds, - schema=f"{{{TENSOR_COL_NAME}: }}", + schema=f"{{{VALUE_COL_NAME}: }}", sorted_values=f"[[{val_str}]]", ds_take_transform_fn=lambda taken: [taken], sorted_values_transform_fn=lambda sorted_values: str(sorted_values), From 50dfa534f229c9d488f977af464d5b5e76e20d48 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 17 May 2022 00:36:17 +0000 Subject: [PATCH 03/10] Remove numpy format. --- python/ray/data/dataset.py | 9 ++--- python/ray/data/read_api.py | 2 +- python/ray/data/tests/test_dataset.py | 52 --------------------------- 3 files changed, 4 insertions(+), 59 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6b5c2772f998..6c6b953df7d9 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -276,8 +276,7 @@ def map_batches( tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. batch_format: Specify "native" to use the native block format (promotes tables to Pandas and tensors to NumPy), "pandas" to select - ``pandas.DataFrame``, "numpy" to select ``numpy.ndarray``, - or "pyarrow" to select `pyarrow.Table``. + ``pandas.DataFrame``, or "pyarrow" to select `pyarrow.Table``. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ @@ -311,8 +310,6 @@ def transform(block: Block) -> Iterable[Block]: view = BlockAccessor.for_block(view).to_pandas() elif batch_format == "pyarrow": view = BlockAccessor.for_block(view).to_arrow() - elif batch_format == "numpy": - view = BlockAccessor.for_block(view).to_numpy() else: raise ValueError( "The batch format must be one of 'native', 'pandas', " @@ -2221,8 +2218,8 @@ def iter_batches( batch_format: The format in which to return each batch. Specify "native" to use the native block format (promoting tables to Pandas and tensors to NumPy), "pandas" to select - ``pandas.DataFrame``, "numpy" to select ``numpy.ndarray``, or "pyarrow" - to select ``pyarrow.Table``. Default is "native". + ``pandas.DataFrame``, or "pyarrow" to select ``pyarrow.Table``. Default + is "native". drop_last: Whether to drop the last batch if it's incomplete. Returns: diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 8213db4faa36..52342eaf9c74 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -173,7 +173,7 @@ def range_tensor( >>> import ray >>> ds = ray.data.range_tensor(1000, shape=(3, 10)) # doctest: +SKIP >>> ds.map_batches( # doctest: +SKIP - ... lambda arr: arr * 2, batch_format="numpy").show() + ... lambda arr: arr * 2).show() This is similar to range_table(), but uses the ArrowTensorArray extension type. The dataset elements take the form {VALUE_COL_NAME: array(N, shape=shape)}. diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index a5c83bd47491..298b89912834 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -471,14 +471,6 @@ def np_mapper(arr): res = ray.data.range_tensor(2, shape=(2, 2)).map(np_mapper).take() np.testing.assert_equal(res, [np.ones((2, 2)), 2 * np.ones((2, 2))]) - # Explicit NumPy format. - res = ( - ray.data.range_tensor(2, shape=(2, 2)) - .map_batches(np_mapper, batch_format="numpy") - .take() - ) - np.testing.assert_equal(res, [np.ones((2, 2)), 2 * np.ones((2, 2))]) - # Pandas conversion. def pd_mapper(df): assert isinstance(df, pd.DataFrame) @@ -487,44 +479,6 @@ def pd_mapper(df): res = ray.data.range_tensor(2).map_batches(pd_mapper, batch_format="pandas").take() np.testing.assert_equal(res, [np.array([2]), np.array([3])]) - # Arrow columns in NumPy format. - def mapper(col_arrs): - assert all(isinstance(col_arr, np.ndarray) for col_arr in col_arrs) - return pa.table({"a": col_arrs[0] + 1, "b": col_arrs[1] + 1}) - - t = pa.table({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]}) - res = ( - ray.data.from_arrow(t) - .map_batches(mapper, batch_size=2, batch_format="numpy") - .take() - ) - assert res == [{"a": 2, "b": 5.0}, {"a": 3, "b": 6.0}, {"a": 4, "b": 7.0}] - - # Pandas columns in NumPy format. - def mapper(col_arrs): - assert all(isinstance(col_arr, np.ndarray) for col_arr in col_arrs) - return pd.DataFrame({"a": col_arrs[0] + 1, "b": col_arrs[1] + 1}) - - df = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]}) - res = ( - ray.data.from_pandas(df) - .map_batches(mapper, batch_size=2, batch_format="numpy") - .take() - ) - assert res == [{"a": 2, "b": 5.0}, {"a": 3, "b": 6.0}, {"a": 4, "b": 7.0}] - - # Simple dataset in NumPy format. - def mapper(arr): - arr = np_mapper(arr) - return arr.tolist() - - res = ( - ray.data.range(10, parallelism=2) - .map_batches(mapper, batch_format="numpy") - .take() - ) - assert res == list(range(1, 11)) - def test_tensors_inferred_from_map(ray_start_regular_shared): # Test map. @@ -1568,12 +1522,6 @@ def test_iter_batches_basic(ray_start_regular_shared): assert isinstance(batch, pa.Table) assert batch.equals(pa.Table.from_pandas(df)) - # NumPy format. - for batch, df in zip(ds.iter_batches(batch_format="numpy"), dfs): - assert isinstance(batch, list) - assert all(isinstance(col, np.ndarray) for col in batch) - np.testing.assert_equal(batch, [col.to_numpy() for _, col in df.items()]) - # blocks format. for batch, df in zip(ds.iter_batches(batch_format="native"), dfs): assert BlockAccessor.for_block(batch).to_pandas().equals(df) From daa3d15bcaa5f1b5de731db7625d981dee86eb1d Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 17 May 2022 01:38:58 +0000 Subject: [PATCH 04/10] Update docs. --- doc/source/data/dataset-tensor-support.rst | 57 +++++++++++++++++----- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index 51be49a0047e..1357bc6c00a2 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -15,22 +15,57 @@ Automatic conversion between the Pandas and Arrow extension types/arrays keeps t Single-column tensor datasets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The most basic case is when a dataset only has a single column, which is of tensor type. This kind of dataset can be created with ``.range_tensor()``, and can be read from and written to ``.npy`` files. Here are some examples: +The most basic case is when a dataset only has a single column, which is of tensor +type. This kind of dataset can be: -.. code-block:: python +* created with :func:`range_tensor() ` + or :func:`from_numpy() `, +* transformed with NumPy UDFs via + :meth:`ds.map_batches() `, +* consumed with :meth:`ds.iter_rows() ` and + :meth:`ds.iter_batches() `, and +* can be read from and written to ``.npy`` files. - # Create a Dataset of tensor-typed values. - ds = ray.data.range_tensor(10000, shape=(3, 5)) - # -> Dataset(num_blocks=200, num_rows=10000, - # schema={value: }) +Here is an end-to-end example: - # Save to storage. - ds.write_numpy("/tmp/tensor_out", column="value") +.. code-block:: python - # Read from storage. + # Create a synthetic pure-tensor Dataset. + ds = ray.data.range_tensor(10, shape=(3, 5)) + # -> Dataset(num_blocks=10, num_rows=10, + # schema={__value__: }) + + # Create a pure-tensor Dataset from an existing NumPy ndarray. + arr = np.arange(10 * 3 * 5).reshape((10, 3, 5)) + ds = ray.data.from_numpy(arr) + # -> Dataset(num_blocks=1, num_rows=10, + # schema={__value__: }) + + # Transform the tensors. Datasets will automatically unpack the single-column Arrow + # table into a NumPy ndarray, provide that ndarray to your UDF, and then repack it + # into a single-column Arrow table; this will be a zero-copy conversion in both + # cases. + ds = ds.map_batches(lambda arr: arr / arr.max()) + # -> Dataset(num_blocks=1, num_rows=10, + # schema={__value__: }) + + # Consume the tensor. This will yield the underlying (3, 5) ndarrays. + for arr in ds.iter_rows(): + assert isinstance(arr, np.ndarray) + assert arr.shape == (3, 5) + + # Consume the tensor in batches. + for arr in ds.iter_batches(batch_size=2): + assert isinstance(arr, np.ndarray) + assert arr.shape == (2, 3, 5) + + # Save to storage. This will write out the blocks of the tensor column as NPY files. + ds.write_numpy("/tmp/tensor_out") + + # Read back from storage. ray.data.read_numpy("/tmp/tensor_out") - # -> Dataset(num_blocks=200, num_rows=?, - # schema={value: }) + # -> Dataset(num_blocks=1, num_rows=?, + # schema={__value__: }) Reading existing serialized tensor columns ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From 4bf27a01abde0a6c2b82b016bd7c72190b14aa67 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 17 May 2022 02:09:32 +0000 Subject: [PATCH 05/10] PR feedback: use __value__ directly in docs and tests; add comment for VALUE_COL_NAME. --- python/ray/data/dataset.py | 5 ++--- python/ray/data/impl/table_block.py | 2 ++ python/ray/data/tests/test_dataset.py | 11 +++++------ python/ray/data/tests/test_dataset_formats.py | 9 ++++----- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6c6b953df7d9..43597ceacf1f 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2067,9 +2067,8 @@ def write_numpy( path: The path to the destination root directory, where npy files will be written to. column: The name of the table column that contains the tensor to - be written. The default is the column name that Datasets uses for - storing tensors in single-column tables. See - ``ray.data.impl.arrow_block.VALUE_COL_NAME`` for the exact name. + be written. The default is ``"__value__"``, the column name that + Datasets uses for storing tensors in single-column tables. filesystem: The filesystem implementation to write to. try_create_dir: Try to create all directories in destination path if True. Does nothing if all directories already exist. diff --git a/python/ray/data/impl/table_block.py b/python/ray/data/impl/table_block.py index 155d816669cd..f0c2adfb2a36 100644 --- a/python/ray/data/impl/table_block.py +++ b/python/ray/data/impl/table_block.py @@ -12,6 +12,8 @@ from ray.data.impl.sort import SortKeyT +# The internal column name used for pure-tensor datasets, represented as +# single-tensor-column tables. VALUE_COL_NAME = "__value__" T = TypeVar("T") diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 298b89912834..d2cf8b2fdd94 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -22,7 +22,6 @@ from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.lazy_block_list import LazyBlockList from ray.data.impl.pandas_block import PandasRow -from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.aggregate import AggregateFn, Count, Sum, Min, Max, Mean, Std from ray.data.extensions.tensor_extension import ( TensorArray, @@ -450,7 +449,7 @@ def test_tensors_basic(ray_start_regular_shared): ds = ray.data.range_tensor(6, shape=tensor_shape) assert str(ds) == ( "Dataset(num_blocks=6, num_rows=6, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) # Test row iterator yields tensors. @@ -485,7 +484,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(10).map(lambda _: np.ones((4, 4))) assert str(ds) == ( "Dataset(num_blocks=10, num_rows=10, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) # Test map_batches. @@ -496,7 +495,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) assert str(ds) == ( "Dataset(num_blocks=4, num_rows=24, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) # - Test list of ndarrays. @@ -505,14 +504,14 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) assert str(ds) == ( "Dataset(num_blocks=4, num_rows=16, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) # Test flat_map. ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))]) assert str(ds) == ( "Dataset(num_blocks=10, num_rows=20, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index eef12552e99a..fe43a36ce8c5 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -33,7 +33,6 @@ WriteResult, ) from ray.data.impl.arrow_block import ArrowRow -from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.datasource.file_based_datasource import _unwrap_protocol from ray.data.datasource.parquet_datasource import PARALLELIZE_META_FETCH_THRESHOLD from ray.data.tests.conftest import * # noqa @@ -1013,7 +1012,7 @@ def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path): ds = ray.data.read_numpy(data_path, filesystem=fs) assert str(ds) == ( "Dataset(num_blocks=2, num_rows=None, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) @@ -1025,7 +1024,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) @@ -1038,7 +1037,7 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider()) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - f"schema={{{VALUE_COL_NAME}: }})" + "schema={__value__: })" ) np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) @@ -1095,7 +1094,7 @@ def skip_unpartitioned(kv_dict): val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2] assert_base_partitioned_ds( ds, - schema=f"{{{VALUE_COL_NAME}: }}", + schema="{__value__: }", sorted_values=f"[[{val_str}]]", ds_take_transform_fn=lambda taken: [taken], sorted_values_transform_fn=lambda sorted_values: str(sorted_values), From f3a2ac4bd256c1d6b44b08cbd4ebf50da11fae86 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 17 May 2022 18:57:09 +0000 Subject: [PATCH 06/10] Fix shuffling and sorting of tensor columns, fix random access dataset. --- python/ray/data/impl/arrow_block.py | 100 ++++++++++++++++++----- python/ray/data/impl/pandas_block.py | 6 ++ python/ray/data/impl/table_block.py | 6 +- python/ray/data/random_access_dataset.py | 4 +- python/ray/data/tests/test_dataset.py | 73 +++++++++++++++++ 5 files changed, 164 insertions(+), 25 deletions(-) diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index ca5021a6605d..613e010b680b 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -121,6 +121,11 @@ def from_numpy(cls, data: Union[np.ndarray, List[np.ndarray]]): ) return cls(table) + @staticmethod + def _build_tensor_row(row: ArrowRow) -> np.ndarray: + # Getting an item in a tensor column automatically does a NumPy conversion. + return row[VALUE_COL_NAME][0] + def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table": view = self._table.slice(start, end - start) if copy: @@ -129,7 +134,7 @@ def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table": def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table": random = np.random.RandomState(random_seed) - return self._table.take(random.permutation(self.num_rows())) + return self.take(random.permutation(self.num_rows())) def schema(self) -> "pyarrow.lib.Schema": return self._table.schema @@ -155,16 +160,10 @@ def to_numpy( array = self._table[column] if array.num_chunks == 0: array = pyarrow.array([], type=array.type) - elif array.num_chunks == 1: - array = array.chunk(0) - elif isinstance(array.chunk(0), pyarrow.ExtensionArray): - # If an extension array, we manually concatenate the underlying storage - # arrays. - chunk = array.chunk(0) - array = type(chunk).from_storage( - chunk.type, - pyarrow.concat_arrays([chunk.storage for chunk in array.chunks]), - ) + elif _is_column_extension_type(array): + array = _concatenate_extension_column(array) + else: + array = array.combine_chunks() arrays.append(array.to_numpy(zero_copy_only=False)) if len(arrays) == 1: arrays = arrays[0] @@ -207,9 +206,45 @@ def builder() -> ArrowBlockBuilder[T]: def _empty_table() -> "pyarrow.Table": return ArrowBlockBuilder._empty_table() + @staticmethod + def take_table( + table: "pyarrow.Table", + indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"], + ) -> "pyarrow.Table": + """Select rows from the table. + + This method is an alternative to pyarrow.Table.take(), which breaks for + extension arrays. This is exposed as a static method for easier use on + intermediate tables, not underlying an ArrowBlockAccessor. + """ + if any(_is_column_extension_type(col) for col in table.columns): + new_cols = [] + for col in table.columns: + if _is_column_extension_type(col): + # .take() will concatenate internally, which currently breaks for + # extension arrays. + col = _concatenate_extension_column(col) + new_cols.append(col.take(indices)) + table = pyarrow.Table.from_arrays(new_cols, schema=table.schema) + else: + table = table.take(indices) + return table + + def take( + self, + indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"], + ) -> "pyarrow.Table": + """Select rows from the underlying table. + + This method is an alternative to pyarrow.Table.take(), which breaks for + extension arrays. + """ + return self.take_table(self._table, indices) + def _sample(self, n_samples: int, key: "SortKeyT") -> "pyarrow.Table": indices = random.sample(range(self._table.num_rows), n_samples) - return self._table.select([k[0] for k in key]).take(indices) + table = self._table.select([k[0] for k in key]) + return self.take_table(table, indices) def count(self, on: KeyFn) -> Optional[U]: """Count the number of non-null values in the provided column.""" @@ -306,7 +341,7 @@ def sort_and_partition( import pyarrow.compute as pac indices = pac.sort_indices(self._table, sort_keys=key) - table = self._table.take(indices) + table = self.take(indices) if len(boundaries) == 0: return [table] @@ -431,7 +466,7 @@ def merge_sorted_blocks( else: ret = pyarrow.concat_tables(blocks, promote=True) indices = pyarrow.compute.sort_indices(ret, sort_keys=key) - ret = ret.take(indices) + ret = ArrowBlockAccessor.take_table(ret, indices) return ret, ArrowBlockAccessor(ret).get_metadata(None, exec_stats=stats.build()) @staticmethod @@ -527,6 +562,33 @@ def gen(): return ret, ArrowBlockAccessor(ret).get_metadata(None, exec_stats=stats.build()) +def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool: + """Whether the provided Arrow Table column is an extension array, using an Arrow + extension type. + """ + return isinstance(ca.type, pyarrow.ExtensionType) + + +def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array": + """Concatenate chunks of an extension column into a contiguous array. + + This concatenation is required for creating copies and for .take() to work on + extension arrays. + See https://issues.apache.org/jira/browse/ARROW-16503. + """ + if not _is_column_extension_type(ca): + raise ValueError("Chunked array isn't an extension array: {ca}") + + if ca.num_chunks == 0: + # No-op for no-chunk chunked arrays, since there's nothing to concatenate. + return ca + + chunk = ca.chunk(0) + return type(chunk).from_storage( + chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks]) + ) + + def _copy_table(table: "pyarrow.Table") -> "pyarrow.Table": """Copy the provided Arrow table.""" import pyarrow as pa @@ -536,14 +598,10 @@ def _copy_table(table: "pyarrow.Table") -> "pyarrow.Table": cols = table.columns new_cols = [] for col in cols: - if col.num_chunks > 0 and isinstance(col.chunk(0), pa.ExtensionArray): - # If an extension array, we copy the underlying storage arrays. - chunk = col.chunk(0) - arr = type(chunk).from_storage( - chunk.type, pa.concat_arrays([c.storage for c in col.chunks]) - ) + if _is_column_extension_type(col): + # Extension arrays don't support concatenation. + arr = _concatenate_extension_column(col) else: - # Otherwise, we copy the top-level chunk arrays. arr = col.combine_chunks() new_cols.append(arr) return pa.Table.from_arrays(new_cols, schema=table.schema) diff --git a/python/ray/data/impl/pandas_block.py b/python/ray/data/impl/pandas_block.py index cf311798e6bb..5f389e4e00bc 100644 --- a/python/ray/data/impl/pandas_block.py +++ b/python/ray/data/impl/pandas_block.py @@ -108,6 +108,12 @@ def __init__(self, table: "pandas.DataFrame"): def column_names(self) -> List[str]: return self._table.columns.tolist() + @staticmethod + def _build_tensor_row(row: PandasRow) -> np.ndarray: + # Getting an item in a Pandas tensor column returns a TensorArrayElement, which + # we have to convert to an ndarray. + return row[VALUE_COL_NAME].iloc[0].to_numpy() + def slice(self, start: int, end: int, copy: bool) -> "pandas.DataFrame": view = self._table[start:end] if copy: diff --git a/python/ray/data/impl/table_block.py b/python/ray/data/impl/table_block.py index f0c2adfb2a36..0b9060ce24ca 100644 --- a/python/ray/data/impl/table_block.py +++ b/python/ray/data/impl/table_block.py @@ -117,11 +117,15 @@ def __init__(self, table: Any): def _get_row(self, index: int, copy: bool = False) -> Union[TableRow, np.ndarray]: row = self.slice(index, index + 1, copy=copy) if self.is_tensor_wrapper(): - row = row[VALUE_COL_NAME][0] + row = self._build_tensor_row(row) else: row = self.ROW_TYPE(row) return row + @staticmethod + def _build_tensor_row(row: TableRow) -> np.ndarray: + raise NotImplementedError + def to_native(self) -> Block: if self.is_tensor_wrapper(): native = self.to_numpy() diff --git a/python/ray/data/random_access_dataset.py b/python/ray/data/random_access_dataset.py index 82339f5663f5..54ae33993ee8 100644 --- a/python/ray/data/random_access_dataset.py +++ b/python/ray/data/random_access_dataset.py @@ -223,9 +223,7 @@ def multiget(self, block_indices, keys): col = block[self.key_field] indices = np.searchsorted(col, keys) acc = BlockAccessor.for_block(block) - result = [ - acc._create_table_row(acc.slice(i, i + 1, copy=True)) for i in indices - ] + result = [acc._get_row(i, copy=True) for i in indices] # assert result == [self._get(i, k) for i, k in zip(block_indices, keys)] else: result = [self._get(i, k) for i, k in zip(block_indices, keys)] diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index d2cf8b2fdd94..aa228c62ecbb 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -479,6 +479,79 @@ def pd_mapper(df): np.testing.assert_equal(res, [np.array([2]), np.array([3])]) +def test_tensors_shuffle(ray_start_regular_shared): + # Test Arrow table representation. + tensor_shape = (3, 5) + ds = ray.data.range_tensor(6, shape=tensor_shape) + shuffled_ds = ds.random_shuffle() + shuffled = shuffled_ds.take() + base = ds.take() + np.testing.assert_raises( + AssertionError, + np.testing.assert_equal, + shuffled, + base, + ) + np.testing.assert_equal( + sorted(shuffled, key=lambda arr: arr.min()), + sorted(base, key=lambda arr: arr.min()), + ) + + # Test Pandas table representation. + tensor_shape = (3, 5) + ds = ray.data.range_tensor(6, shape=tensor_shape) + ds = ds.map_batches(lambda df: df, batch_format="pandas") + shuffled_ds = ds.random_shuffle() + shuffled = shuffled_ds.take() + base = ds.take() + np.testing.assert_raises( + AssertionError, + np.testing.assert_equal, + shuffled, + base, + ) + np.testing.assert_equal( + sorted(shuffled, key=lambda arr: arr.min()), + sorted(base, key=lambda arr: arr.min()), + ) + + +def test_tensors_sort(ray_start_regular_shared): + # Test Arrow table representation. + t = pa.table({"a": TensorArray(np.arange(32).reshape((2, 4, 4))), "b": [1, 2]}) + ds = ray.data.from_arrow(t) + sorted_ds = ds.sort(key="b", descending=True) + sorted_arrs = [row["a"] for row in sorted_ds.take()] + base = [row["a"] for row in ds.take()] + np.testing.assert_raises( + AssertionError, + np.testing.assert_equal, + sorted_arrs, + base, + ) + np.testing.assert_equal( + sorted_arrs, + sorted(base, key=lambda arr: -arr.min()), + ) + + # Test Pandas table representation. + df = pd.DataFrame({"a": TensorArray(np.arange(32).reshape((2, 4, 4))), "b": [1, 2]}) + ds = ray.data.from_pandas(df) + sorted_ds = ds.sort(key="b", descending=True) + sorted_arrs = [np.asarray(row["a"]) for row in sorted_ds.take()] + base = [np.asarray(row["a"]) for row in ds.take()] + np.testing.assert_raises( + AssertionError, + np.testing.assert_equal, + sorted_arrs, + base, + ) + np.testing.assert_equal( + sorted_arrs, + sorted(base, key=lambda arr: -arr.min()), + ) + + def test_tensors_inferred_from_map(ray_start_regular_shared): # Test map. ds = ray.data.range(10).map(lambda _: np.ones((4, 4))) From e36bd181148f7d9df379fcd89c62438cccb87647 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 19 May 2022 15:30:46 +0000 Subject: [PATCH 07/10] PR feedback --- python/ray/data/block.py | 22 +++++++++------- python/ray/data/dataset.py | 2 +- python/ray/data/datasource/datasource.py | 26 ++++++------------- .../ray/data/datasource/numpy_datasource.py | 8 +----- python/ray/data/impl/arrow_block.py | 11 ++++---- .../ray/data/impl/delegating_block_builder.py | 15 ++++++++--- python/ray/data/impl/output_buffer.py | 7 ++++- python/ray/data/impl/table_block.py | 1 - python/ray/data/tests/test_dataset.py | 11 -------- 9 files changed, 45 insertions(+), 58 deletions(-) diff --git a/python/ray/data/block.py b/python/ray/data/block.py index d96879147cf9..b396d17c2632 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -83,6 +83,10 @@ def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None: # ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``. Block = Union[List[T], "pyarrow.Table", "pandas.DataFrame", bytes] +# User-facing data batch type. This is the data type for data that is supplied to and +# returned from batch UDFs. +DataBatch = Union[Block, np.ndarray] + # A list of block references pending computation by a single task. For example, # this may be the output of a task reading a file. BlockPartition = List[Tuple[ObjectRef[Block], "BlockMetadata"]] @@ -262,6 +266,15 @@ def builder() -> "BlockBuilder[T]": """Create a builder for this block type.""" raise NotImplementedError + @staticmethod + def batch_to_block(batch: DataBatch) -> Block: + """Create a block from user-facing data formats.""" + if isinstance(batch, np.ndarray): + from ray.data.impl.arrow_block import ArrowBlockAccessor + + return ArrowBlockAccessor.numpy_to_block(batch) + return batch + @staticmethod def for_block(block: Block) -> "BlockAccessor[T]": """Create a block accessor for the given block.""" @@ -281,16 +294,7 @@ def for_block(block: Block) -> "BlockAccessor[T]": from ray.data.impl.arrow_block import ArrowBlockAccessor return ArrowBlockAccessor.from_bytes(block) - elif isinstance(block, np.ndarray): - from ray.data.impl.arrow_block import ArrowBlockAccessor - - return ArrowBlockAccessor.from_numpy(block) elif isinstance(block, list): - if block and all(isinstance(item, np.ndarray) for item in block): - from ray.data.impl.arrow_block import ArrowBlockAccessor - - return ArrowBlockAccessor.from_numpy(block) - from ray.data.impl.simple_block import SimpleBlockAccessor return SimpleBlockAccessor(block) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 43597ceacf1f..a2dbe8b1bc4b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -330,7 +330,7 @@ def transform(block: Block) -> Iterable[Block]: "The return type must be either list, " "pandas.DataFrame, or pyarrow.Table" ) - output_buffer.add_block(applied) + output_buffer.add_batch(applied) if output_buffer.has_next(): yield output_buffer.next() diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index 3e904c6f6d7f..c093c215ed98 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -16,7 +16,6 @@ ) from ray.data.context import DatasetContext from ray.data.impl.arrow_block import ArrowRow -from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder from ray.data.impl.util import _check_pyarrow_version from ray.util.annotations import DeveloperAPI @@ -150,9 +149,7 @@ def __call__(self) -> MaybeBlockPartition: input_files=self._metadata.input_files, exec_stats=None ) # No exec stats for the block splits. assert context.block_owner - partition.append( - (ray.put(accessor.to_block(), _owner=context.block_owner), metadata) - ) + partition.append((ray.put(block, _owner=context.block_owner), metadata)) if len(partition) == 0: raise ValueError("Read task must return non-empty list.") return partition @@ -196,14 +193,11 @@ def make_block(start: int, count: int) -> Block: elif block_format == "tensor": import pyarrow as pa - tensor = TensorArray( - np.ones(tensor_shape, dtype=np.int64) - * np.expand_dims( - np.arange(start, start + count), - tuple(range(1, 1 + len(tensor_shape))), - ) + tensor = np.ones(tensor_shape, dtype=np.int64) * np.expand_dims( + np.arange(start, start + count), + tuple(range(1, 1 + len(tensor_shape))), ) - return pa.Table.from_pydict({VALUE_COL_NAME: tensor}) + return BlockAccessor.batch_to_block(tensor) else: return list(builtins.range(start, start + count)) @@ -217,16 +211,12 @@ def make_block(start: int, count: int) -> Block: schema = pa.Table.from_pydict({"value": [0]}).schema elif block_format == "tensor": _check_pyarrow_version() - from ray.data.extensions import TensorArray import pyarrow as pa - tensor = TensorArray( - np.ones(tensor_shape, dtype=np.int64) - * np.expand_dims( - np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape))) - ) + tensor = np.ones(tensor_shape, dtype=np.int64) * np.expand_dims( + np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape))) ) - schema = pa.Table.from_pydict({VALUE_COL_NAME: tensor}).schema + schema = BlockAccessor.batch_to_block(tensor).schema elif block_format == "list": schema = int else: diff --git a/python/ray/data/datasource/numpy_datasource.py b/python/ray/data/datasource/numpy_datasource.py index e1fd9ba5fa7f..7b65d01e1e26 100644 --- a/python/ray/data/datasource/numpy_datasource.py +++ b/python/ray/data/datasource/numpy_datasource.py @@ -8,7 +8,6 @@ from ray.data.block import BlockAccessor from ray.data.datasource.file_based_datasource import FileBasedDatasource -from ray.data.impl.table_block import VALUE_COL_NAME class NumpyDatasource(FileBasedDatasource): @@ -25,18 +24,13 @@ class NumpyDatasource(FileBasedDatasource): """ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args): - from ray.data.extensions import TensorArray - import pyarrow as pa - # TODO(ekl) Ideally numpy can read directly from the file, but it # seems like it requires the file to be seekable. buf = BytesIO() data = f.readall() buf.write(data) buf.seek(0) - return pa.Table.from_pydict( - {VALUE_COL_NAME: TensorArray(np.load(buf, allow_pickle=True))} - ) + return BlockAccessor.batch_to_block(np.load(buf, allow_pickle=True)) def _write_block( self, diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index 613e010b680b..72692723501a 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -107,19 +107,18 @@ def column_names(self) -> List[str]: return self._table.column_names @classmethod - def from_bytes(cls, data: bytes): + def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor": reader = pyarrow.ipc.open_stream(data) return cls(reader.read_all()) - @classmethod - def from_numpy(cls, data: Union[np.ndarray, List[np.ndarray]]): + @staticmethod + def numpy_to_block(batch: np.ndarray) -> "pyarrow.Table": import pyarrow as pa from ray.data.extensions.tensor_extension import ArrowTensorArray - table = pa.Table.from_pydict( - {VALUE_COL_NAME: ArrowTensorArray.from_numpy(data)} + return pa.Table.from_pydict( + {VALUE_COL_NAME: ArrowTensorArray.from_numpy(batch)} ) - return cls(table) @staticmethod def _build_tensor_row(row: ArrowRow) -> np.ndarray: diff --git a/python/ray/data/impl/delegating_block_builder.py b/python/ray/data/impl/delegating_block_builder.py index b35838136032..d88fac07f682 100644 --- a/python/ray/data/impl/delegating_block_builder.py +++ b/python/ray/data/impl/delegating_block_builder.py @@ -2,7 +2,7 @@ import numpy as np -from ray.data.block import Block, T, BlockAccessor +from ray.data.block import Block, DataBatch, T, BlockAccessor from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder from ray.data.impl.arrow_block import ArrowRow, ArrowBlockBuilder @@ -15,7 +15,6 @@ def __init__(self): self._empty_block = None def add(self, item: Any) -> None: - if self._builder is None: # TODO (kfstorm): Maybe we can use Pandas block format for dict. if isinstance(item, dict) or isinstance(item, ArrowRow): @@ -36,9 +35,17 @@ def add(self, item: Any) -> None: self._builder = SimpleBlockBuilder() self._builder.add(item) - def add_block(self, block: Block) -> None: + def add_batch(self, batch: DataBatch): + """Add a user-facing data batch to the builder. + + This data batch will be converted to an internal block and then added to the + underlying builder. + """ + block = BlockAccessor.batch_to_block(batch) + return self.add_block(block) + + def add_block(self, block: Block): accessor = BlockAccessor.for_block(block) - block = accessor.to_block() if accessor.num_rows() == 0: # Don't infer types of empty lists. Store the block and use it if no # other data is added. https://github.com/ray-project/ray/issues/20290 diff --git a/python/ray/data/impl/output_buffer.py b/python/ray/data/impl/output_buffer.py index bc7ea61cb2fd..b5a07cbb6ad9 100644 --- a/python/ray/data/impl/output_buffer.py +++ b/python/ray/data/impl/output_buffer.py @@ -1,6 +1,6 @@ from typing import Callable, Any, Optional -from ray.data.block import Block, BlockAccessor +from ray.data.block import Block, DataBatch, BlockAccessor from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder @@ -44,6 +44,11 @@ def add(self, item: Any) -> None: assert not self._finalized self._buffer.add(item) + def add_batch(self, batch: DataBatch) -> None: + """Add a data batch to this output buffer.""" + assert not self._finalized + self._buffer.add_batch(batch) + def add_block(self, block: Block) -> None: """Add a data block to this output buffer.""" assert not self._finalized diff --git a/python/ray/data/impl/table_block.py b/python/ray/data/impl/table_block.py index 0b9060ce24ca..8b92843908d7 100644 --- a/python/ray/data/impl/table_block.py +++ b/python/ray/data/impl/table_block.py @@ -61,7 +61,6 @@ def add_block(self, block: Any) -> None: f"{block}" ) accessor = BlockAccessor.for_block(block) - block = accessor.to_block() self._tables.append(block) self._tables_size_bytes += accessor.size_bytes() self._num_rows += accessor.num_rows() diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index aa228c62ecbb..ffee72f24e58 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -561,8 +561,6 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) # Test map_batches. - - # - Test top-level ndarray. ds = ray.data.range(16, parallelism=4).map_batches( lambda _: np.ones((3, 4, 4)), batch_size=2 ) @@ -571,15 +569,6 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): "schema={__value__: })" ) - # - Test list of ndarrays. - ds = ray.data.range(16, parallelism=4).map_batches( - lambda _: [np.ones((4, 4)), np.ones((4, 4))], batch_size=2 - ) - assert str(ds) == ( - "Dataset(num_blocks=4, num_rows=16, " - "schema={__value__: })" - ) - # Test flat_map. ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))]) assert str(ds) == ( From 9a819f0ad23089a724007ed5b89f5407aced980c Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 19 May 2022 18:31:07 +0000 Subject: [PATCH 08/10] Fix from_numpy(). --- python/ray/data/datasource/datasource.py | 3 +-- python/ray/data/read_api.py | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index c093c215ed98..5bb3ffab6dc4 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -144,8 +144,7 @@ def __call__(self) -> MaybeBlockPartition: if context.block_splitting_enabled: partition: BlockPartition = [] for block in result: - accessor = BlockAccessor.for_block(block) - metadata = accessor.get_metadata( + metadata = BlockAccessor.for_block(block).get_metadata( input_files=self._metadata.input_files, exec_stats=None ) # No exec stats for the block splits. assert context.block_owner diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 52342eaf9c74..a30664dace32 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1019,11 +1019,11 @@ def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: def _ndarray_to_block(ndarray: np.ndarray) -> Block[np.ndarray]: stats = BlockExecStats.builder() - accessor = BlockAccessor.for_block(ndarray) - return ( - accessor.to_block(), - accessor.get_metadata(input_files=None, exec_stats=stats.build()), + block = BlockAccessor.batch_to_block(ndarray) + metadata = BlockAccessor.for_block(block).get_metadata( + input_files=None, exec_stats=stats.build() ) + return block, metadata def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMetadata: From cb3e761c358671050c344eb85084f64a5d173548 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 19 May 2022 19:48:10 +0000 Subject: [PATCH 09/10] Change to dynamic TensorArray import. --- python/ray/data/impl/pandas_block.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/impl/pandas_block.py b/python/ray/data/impl/pandas_block.py index 5f389e4e00bc..70302d9f0fa3 100644 --- a/python/ray/data/impl/pandas_block.py +++ b/python/ray/data/impl/pandas_block.py @@ -15,7 +15,6 @@ import numpy as np from ray.data.block import BlockAccessor, BlockMetadata, KeyFn, U -from ray.data.extensions.tensor_extension import TensorArray from ray.data.row import TableRow from ray.data.impl.table_block import ( TableBlockAccessor, @@ -79,6 +78,8 @@ def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> "pandas.DataFrame pandas = lazy_import_pandas() for key, value in columns.items(): if key == VALUE_COL_NAME or isinstance(next(iter(value), None), np.ndarray): + from ray.data.extensions.tensor_extension import TensorArray + if len(value) == 1: value = value[0] columns[key] = TensorArray(value) From 72f3dc10bd4a64005707450466cb6ef592fa6afb Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 19 May 2022 21:10:40 +0000 Subject: [PATCH 10/10] Fix random_sample() to support ndarrays. --- python/ray/data/dataset.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index a2dbe8b1bc4b..92b1044a917b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -669,6 +669,8 @@ def process_batch(batch): ) if isinstance(batch, pd.DataFrame): return batch.sample(frac=fraction) + if isinstance(batch, np.ndarray): + return np.array([row for row in batch if random.random() <= fraction]) raise ValueError(f"Unsupported batch type: {type(batch)}") return self.map_batches(process_batch)