Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR - Datasets] Cast tensor extension type to opaque object dtype in .to_pandas(), .to_dask(), etc. #29417

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ def schema(self) -> "pyarrow.lib.Schema":
return self._table.schema

def to_pandas(self) -> "pandas.DataFrame":
return self._table.to_pandas()
from ray.air.util.data_batch_conversion import _cast_tensor_columns_to_ndarrays

df = self._table.to_pandas()
ctx = DatasetContext.get_current()
if ctx.enable_tensor_extension_casting:
df = _cast_tensor_columns_to_ndarrays(df)
return df

def to_numpy(
self, columns: Optional[Union[str, List[str]]] = None
Expand Down
35 changes: 29 additions & 6 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2998,27 +2998,50 @@ def to_dask(

@dask.delayed
def block_to_df(block: Block):
block = BlockAccessor.for_block(block)
if isinstance(block, (ray.ObjectRef, ClientObjectRef)):
raise ValueError(
"Dataset.to_dask() must be used with Dask-on-Ray, please "
"set the Dask scheduler to ray_dask_get (located in "
"ray.util.dask)."
)
return block.to_pandas()
return _block_to_df(block)

if meta is None:
from ray.data.extensions import TensorDtype

# Infer Dask metadata from Datasets schema.
schema = self.schema(fetch_if_missing=True)
if isinstance(schema, PandasBlockSchema):
meta = pd.DataFrame(
{
col: pd.Series(dtype=dtype)
col: pd.Series(
dtype=(
dtype
if not isinstance(dtype, TensorDtype)
else np.object_
)
)
for col, dtype in zip(schema.names, schema.types)
}
)
elif pa is not None and isinstance(schema, pa.Schema):
meta = schema.empty_table().to_pandas()
from ray.data.extensions import ArrowTensorType

if any(isinstance(type_, ArrowTensorType) for type_ in schema.types):
meta = pd.DataFrame(
{
col: pd.Series(
dtype=(
dtype.to_pandas_dtype()
if not isinstance(dtype, ArrowTensorType)
else np.object_
)
)
for col, dtype in zip(schema.names, schema.types)
}
)
else:
meta = schema.empty_table().to_pandas()

ddf = dd.from_delayed(
[block_to_df(block) for block in self.get_internal_block_refs()],
Expand Down Expand Up @@ -3112,7 +3135,6 @@ def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame":
A Pandas DataFrame created from this dataset, containing a limited
number of records.
"""

count = self.count()
if count > limit:
raise ValueError(
Expand All @@ -3125,7 +3147,8 @@ def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame":
output = DelegatingBlockBuilder()
for block in blocks:
output.add_block(ray.get(block))
return BlockAccessor.for_block(output.build()).to_pandas()
block = output.build()
return _block_to_df(block)

def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]:
"""Convert this dataset into a distributed set of Pandas dataframes.
Expand Down
32 changes: 9 additions & 23 deletions python/ray/data/tests/test_batch_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import ray
from ray.data.preprocessors import BatchMapper
from ray.data.extensions import TensorArray
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray

Expand Down Expand Up @@ -183,9 +182,11 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):
),
pd.DataFrame(
{
# Single column pandas automatically converts `TENSOR_COLUMN_NAME`
# In UDFs
TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 13).reshape((3, 2, 2)))
TENSOR_COLUMN_NAME: [
[[1, 2], [3, 4]],
[[5, 6], [7, 8]],
[[9, 10], [11, 12]],
]
}
),
),
Expand Down Expand Up @@ -257,7 +258,7 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):


@pytest.mark.parametrize(
"ds_with_expected_pandas_numpy_df",
"ds,expected_df",
[
(
ds_numpy_single_column_tensor_format(),
Expand All @@ -272,13 +273,6 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):
]
}
),
pd.DataFrame(
{
# Single column pandas automatically converts `TENSOR_COLUMN_NAME`
# In UDFs
TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 13).reshape((3, 2, 2)))
}
),
),
(
ds_numpy_list_of_ndarray_tensor_format(),
Expand All @@ -296,24 +290,16 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):
]
}
),
pd.DataFrame(
{
# Single column pandas automatically converts `TENSOR_COLUMN_NAME`
# In UDFs
TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 25).reshape((6, 2, 2)))
}
),
),
],
)
def test_batch_mapper_numpy_data_format(ds_with_expected_pandas_numpy_df):
def test_batch_mapper_numpy_data_format(ds, expected_df):
"""Tests batch mapper functionality for numpy data format.

Note:
For single column pandas dataframes, we automatically convert it to
single column tensor with column name as `__value__`.
"""
ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df

def add_and_modify_udf_pandas(df: "pd.DataFrame"):
col_name = list(df.columns)[0]
Expand All @@ -331,7 +317,7 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):

transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy")
out_df_map_batches = transformed_ds.to_pandas()
assert_frame_equal(out_df_map_batches, expected_numpy_df)
assert_frame_equal(out_df_map_batches, expected_df)

# Test BatchMapper
batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas")
Expand All @@ -344,7 +330,7 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):
batch_mapper.fit(ds)
transformed_ds = batch_mapper.transform(ds)
out_df = transformed_ds.to_pandas()
assert_frame_equal(out_df, expected_numpy_df)
assert_frame_equal(out_df, expected_df)


if __name__ == "__main__":
Expand Down
41 changes: 41 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,47 @@ def test_to_dask(ray_start_regular_shared, ds_format):
assert df.equals(ddf.compute())


def test_to_dask_tensor_column_cast_pandas(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Dask
# DataFrame.
data = np.arange(12).reshape((3, 2, 2))
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.enable_tensor_extension_casting
try:
ctx.enable_tensor_extension_casting = True
in_df = pd.DataFrame({"a": TensorArray(data)})
ds = ray.data.from_pandas(in_df)
dtypes = ds.schema().types
assert len(dtypes) == 1
assert isinstance(dtypes[0], TensorDtype)
out_df = ds.to_dask().compute()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
pd.testing.assert_frame_equal(out_df, expected_df)
finally:
ctx.enable_tensor_extension_casting = original


def test_to_dask_tensor_column_cast_arrow(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Dask
# DataFrame.
data = np.arange(12).reshape((3, 2, 2))
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.enable_tensor_extension_casting
try:
ctx.enable_tensor_extension_casting = True
in_table = pa.table({"a": ArrowTensorArray.from_numpy(data)})
ds = ray.data.from_arrow(in_table)
dtype = ds.schema().field(0).type
assert isinstance(dtype, ArrowTensorType)
out_df = ds.to_dask().compute()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
pd.testing.assert_frame_equal(out_df, expected_df)
finally:
ctx.enable_tensor_extension_casting = original


def test_from_modin(ray_start_regular_shared):
import modin.pandas as mopd

Expand Down
5 changes: 4 additions & 1 deletion python/ray/data/tests/test_dataset_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def test_partitioning(

df = ds.to_pandas()
assert sorted(df["label"]) == ["cat", "cat", "dog"]
assert all(tensor.numpy_shape == (32, 32, 3) for tensor in df["image"])
if enable_automatic_tensor_extension_cast:
assert all(tensor.shape == (32, 32, 3) for tensor in df["image"])
else:
assert all(tensor.numpy_shape == (32, 32, 3) for tensor in df["image"])

def test_e2e_prediction(self, ray_start_regular_shared):
from ray.train.torch import TorchCheckpoint, TorchPredictor
Expand Down
48 changes: 48 additions & 0 deletions python/ray/data/tests/test_dataset_pandas.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import pytest
import pandas as pd
import pyarrow as pa
import numpy as np

import ray

from ray.data.extensions import (
TensorDtype,
TensorArray,
ArrowTensorType,
ArrowTensorArray,
)
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -97,6 +104,47 @@ def test_pandas_roundtrip(ray_start_regular_shared, tmp_path):
assert pd.concat([df1, df2], ignore_index=True).equals(dfds)


def test_to_pandas_tensor_column_cast_pandas(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Pandas
# DataFrame.
data = np.arange(12).reshape((3, 2, 2))
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.enable_tensor_extension_casting
try:
ctx.enable_tensor_extension_casting = True
in_df = pd.DataFrame({"a": TensorArray(data)})
ds = ray.data.from_pandas(in_df)
dtypes = ds.schema().types
assert len(dtypes) == 1
assert isinstance(dtypes[0], TensorDtype)
out_df = ds.to_pandas()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
pd.testing.assert_frame_equal(out_df, expected_df)
finally:
ctx.enable_tensor_extension_casting = original


def test_to_pandas_tensor_column_cast_arrow(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Pandas
# DataFrame.
data = np.arange(12).reshape((3, 2, 2))
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.enable_tensor_extension_casting
try:
ctx.enable_tensor_extension_casting = True
in_table = pa.table({"a": ArrowTensorArray.from_numpy(data)})
ds = ray.data.from_arrow(in_table)
dtype = ds.schema().field(0).type
assert isinstance(dtype, ArrowTensorType)
out_df = ds.to_pandas()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
pd.testing.assert_frame_equal(out_df, expected_df)
finally:
ctx.enable_tensor_extension_casting = original


def test_read_pandas_data_array_column(ray_start_regular_shared):
df = pd.DataFrame(
{
Expand Down