Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Remove user-facing arguments for casting to Ray's tensor type #2802

Merged
merged 5 commits into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2460,14 +2460,11 @@ def __contains__(self, col_name: str) -> bool:
return col_name in self.column_names

@DataframePublicAPI
def to_pandas(
self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False
) -> "pandas.DataFrame":
def to_pandas(self, coerce_temporal_nanoseconds: bool = False) -> "pandas.DataFrame":
"""Converts the current DataFrame to a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
If results have not computed yet, collect will be called.

Args:
cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False.
coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas>`__ for more information.

Returns:
Expand All @@ -2482,13 +2479,12 @@ def to_pandas(

pd_df = result.to_pandas(
schema=self._builder.schema(),
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)
return pd_df

@DataframePublicAPI
def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.Table":
def to_arrow(self) -> "pyarrow.Table":
"""Converts the current DataFrame to a `pyarrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__.
If results have not computed yet, collect will be called.

Expand All @@ -2498,11 +2494,17 @@ def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.T
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
"""
for name in self.schema().column_names():
if self.schema()[name].dtype._is_python_type():
raise ValueError(
f"Cannot convert column {name} to Arrow type, found Python type: {self.schema()[name].dtype}"
)

self.collect()
result = self._result
assert result is not None

return result.to_arrow(cast_tensors_to_ray_tensor_dtype)
return result.to_arrow()

@DataframePublicAPI
def to_pydict(self) -> Dict[str, List[Any]]:
Expand Down
4 changes: 2 additions & 2 deletions daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ def from_numpy_dtype(cls, np_type: np.dtype) -> DataType:
arrow_type = pa.from_numpy_dtype(np_type)
return cls.from_arrow_type(arrow_type)

def to_arrow_dtype(self, cast_tensor_to_ray_type: builtins.bool = False) -> pa.DataType:
return self._dtype.to_arrow(cast_tensor_to_ray_type)
def to_arrow_dtype(self) -> pa.DataType:
return self._dtype.to_arrow()

@classmethod
def python(cls) -> DataType:
Expand Down
6 changes: 2 additions & 4 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,17 @@ def to_pydict(self) -> dict[str, list[Any]]:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
merged_partition = self._get_merged_micropartition()
return merged_partition.to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
def to_arrow(self) -> pa.Table:
merged_partition = self._get_merged_micropartition()
return merged_partition.to_arrow(cast_tensors_to_ray_tensor_dtype)
return merged_partition.to_arrow()

def items(self) -> list[tuple[PartID, MaterializedResult[PartitionT]]]:
"""
Expand Down
37 changes: 36 additions & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@

RAY_VERSION = tuple(int(s) for s in ray.__version__.split(".")[0:3])

_RAY_DATA_ARROW_TENSOR_TYPE_AVAILABLE = True
try:
from ray.data.extensions import ArrowTensorArray, ArrowTensorType
except ImportError:
_RAY_DATA_ARROW_TENSOR_TYPE_AVAILABLE = False

Check warning on line 81 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L80-L81

Added lines #L80 - L81 were not covered by tests


@ray.remote
def _glob_path_into_file_infos(
Expand All @@ -95,7 +101,36 @@
@ray.remote
def _make_ray_block_from_micropartition(partition: MicroPartition) -> RayDatasetBlock:
try:
return partition.to_arrow(cast_tensors_to_ray_tensor_dtype=True)
daft_schema = partition.schema()
arrow_tbl = partition.to_arrow()

# Convert arrays to Ray Data's native ArrowTensorType arrays
new_arrs = {}
for idx, field in enumerate(arrow_tbl.schema):
if daft_schema[field.name].dtype._is_fixed_shape_tensor_type():
assert isinstance(field.type, pa.FixedShapeTensorType)
new_dtype = ArrowTensorType(field.type.shape, field.type.value_type)
arrow_arr = arrow_tbl[field.name].combine_chunks()
storage_arr = arrow_arr.storage
list_size = storage_arr.type.list_size
new_storage_arr = pa.ListArray.from_arrays(
pa.array(
list(range(0, (len(arrow_arr) + 1) * list_size, list_size)),
pa.int32(),
),
storage_arr.values,
)
new_arrs[idx] = (
field.name,
pa.ExtensionArray.from_storage(new_dtype, new_storage_arr),
)
elif daft_schema[field.name].dtype._is_tensor_type():
assert isinstance(field.type, pa.ExtensionType)
new_arrs[idx] = (field.name, ArrowTensorArray.from_numpy(partition.get_column(field.name).to_pylist()))
for idx, (field_name, arr) in new_arrs.items():
arrow_tbl = arrow_tbl.set_column(idx, pa.field(field_name, arr.type), arr)

return arrow_tbl
except pa.ArrowInvalid:
return partition.to_pylist()

Expand Down
38 changes: 9 additions & 29 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
_RAY_DATA_EXTENSIONS_AVAILABLE = True
try:
from ray.data.extensions import (
ArrowTensorArray,
ArrowTensorType,
ArrowVariableShapedTensorType,
)
Expand Down Expand Up @@ -242,40 +241,21 @@ def rename(self, name: str) -> Series:
def datatype(self) -> DataType:
return DataType._from_pydatatype(self._series.data_type())

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Array:
def to_arrow(self) -> pa.Array:
"""
Convert this Series to an pyarrow array.
"""
dtype = self.datatype()
if cast_tensors_to_ray_tensor_dtype and (dtype._is_tensor_type() or dtype._is_fixed_shape_tensor_type()):
if not _RAY_DATA_EXTENSIONS_AVAILABLE:
raise ValueError("Trying to convert tensors to Ray tensor dtypes, but Ray is not installed.")
pyarrow_dtype = dtype.to_arrow_dtype(cast_tensor_to_ray_type=True)
if isinstance(pyarrow_dtype, ArrowTensorType):
assert dtype._is_fixed_shape_tensor_type()
arrow_series = self._series.to_arrow()
storage = arrow_series.storage
list_size = storage.type.list_size
storage = pa.ListArray.from_arrays(
pa.array(
list(range(0, (len(arrow_series) + 1) * list_size, list_size)),
pa.int32(),
),
storage.values,
)
return pa.ExtensionArray.from_storage(pyarrow_dtype, storage)
else:
# Variable-shaped tensor columns can't be converted directly to Ray's variable-shaped tensor extension
# type since it expects all tensor elements to have the same number of dimensions, which Daft does not enforce.
# TODO(Clark): Convert directly to Ray's variable-shaped tensor extension type when all tensor
# elements have the same number of dimensions, without going through pylist roundtrip.
return ArrowTensorArray.from_numpy(self.to_pylist())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I omitted this logic in this refactor because I have no idea what this is doing. Also there aren't any tests to help me understand so 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, added this back in to pass tests

elif dtype._is_fixed_shape_tensor_type() and pyarrow_supports_fixed_shape_tensor():
pyarrow_dtype = dtype.to_arrow_dtype(cast_tensor_to_ray_type=False)
arrow_arr = self._series.to_arrow()

# Special-case for PyArrow FixedShapeTensor if it is supported by the version of PyArrow
# TODO: Push this down into self._series.to_arrow()?
if dtype._is_fixed_shape_tensor_type() and pyarrow_supports_fixed_shape_tensor():
pyarrow_dtype = dtype.to_arrow_dtype()
arrow_series = self._series.to_arrow()
return pa.ExtensionArray.from_storage(pyarrow_dtype, arrow_series.storage)
else:
return self._series.to_arrow()

return arrow_arr

def to_pylist(self) -> list:
"""
Expand Down
8 changes: 2 additions & 6 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ def slice(self, start: int, end: int) -> MicroPartition:
def to_table(self) -> Table:
return Table._from_pytable(self._micropartition.to_table())

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table:
return self.to_table().to_arrow(
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, convert_large_arrays=convert_large_arrays
)
def to_arrow(self) -> pa.Table:
return self.to_table().to_arrow()

def to_pydict(self) -> dict[str, list]:
return self.to_table().to_pydict()
Expand All @@ -153,12 +151,10 @@ def to_pylist(self) -> list[dict[str, Any]]:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
return self.to_table().to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

Expand Down
75 changes: 9 additions & 66 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,13 @@
from daft.logical.schema import Schema
from daft.series import Series, item_to_series

_NUMPY_AVAILABLE = True
try:
import numpy as np
except ImportError:
_NUMPY_AVAILABLE = False

_PANDAS_AVAILABLE = True
try:
import pandas as pd
except ImportError:
_PANDAS_AVAILABLE = False

if TYPE_CHECKING:
import numpy as np
import pandas as pd
import pyarrow as pa

Expand Down Expand Up @@ -176,36 +169,9 @@ def to_table(self) -> Table:
"""For compatibility with MicroPartition"""
return self

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table:
python_fields = set()
tensor_fields = set()
for field in self.schema():
if field.dtype._is_python_type():
python_fields.add(field.name)
elif field.dtype._is_tensor_type() or field.dtype._is_fixed_shape_tensor_type():
tensor_fields.add(field.name)
if python_fields or tensor_fields:
table = {}
for colname in self.column_names():
column_series = self.get_column(colname)
if colname in python_fields:
column = column_series.to_pylist()
else:
column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
table[colname] = column

tab = pa.Table.from_pydict(table)
else:
tab = pa.Table.from_batches([self._table.to_arrow_record_batch()])

if not convert_large_arrays:
return tab

new_columns = []
for col in tab.columns:
new_columns.append(_trim_pyarrow_large_arrays(col))

return pa.Table.from_arrays(new_columns, names=tab.column_names)
def to_arrow(self) -> pa.Table:
tab = pa.Table.from_pydict({colname: self.get_column(colname).to_arrow() for colname in self.column_names()})
return tab

def to_pydict(self) -> dict[str, list]:
return {colname: self.get_column(colname).to_pylist() for colname in self.column_names()}
Expand All @@ -220,30 +186,31 @@ def to_pylist(self) -> list[dict[str, Any]]:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
from packaging.version import parse

if not _PANDAS_AVAILABLE:
raise ImportError("Unable to import Pandas - please ensure that it is installed.")

python_fields = set()
tensor_fields = set()
for field in self.schema():
if field.dtype._is_python_type():
python_fields.add(field.name)
elif field.dtype._is_tensor_type() or field.dtype._is_fixed_shape_tensor_type():
tensor_fields.add(field.name)

if python_fields or tensor_fields:
# Use Python list representation for Python typed columns.
table = {}
for colname in self.column_names():
column_series = self.get_column(colname)
if colname in python_fields or (colname in tensor_fields and not cast_tensors_to_ray_tensor_dtype):
# Use Python list representation for Python typed columns or tensor columns (return as numpy)
if colname in python_fields or colname in tensor_fields:
column = column_series.to_pylist()
else:
# Arrow-native field, so provide column as Arrow array.
column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
column_arrow = column_series.to_arrow()
if parse(pa.__version__) < parse("13.0.0"):
column = column_arrow.to_pandas()
else:
Expand All @@ -252,7 +219,7 @@ def to_pandas(

return pd.DataFrame.from_dict(table)
else:
arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype)
arrow_table = self.to_arrow()
if parse(pa.__version__) < parse("13.0.0"):
return arrow_table.to_pandas()
else:
Expand Down Expand Up @@ -559,30 +526,6 @@ def read_json(
)


def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray:
if pa.types.is_large_binary(arr.type) or pa.types.is_large_string(arr.type):
if pa.types.is_large_binary(arr.type):
target_type = pa.binary()
else:
target_type = pa.string()

all_chunks = []
for chunk in arr.chunks:
if len(chunk) == 0:
continue
offsets = np.frombuffer(chunk.buffers()[1], dtype=np.int64)
if offsets[-1] < 2**31:
all_chunks.append(chunk.cast(target_type))
else:
raise ValueError(
f"Can not convert {arr.type} into {target_type} due to the offset array being too large: {offsets[-1]}. Maximum: {2**31}"
)

return pa.chunked_array(all_chunks, type=target_type)
else:
return arr


def read_parquet_into_pyarrow(
path: str,
columns: list[str] | None = None,
Expand Down
Loading
Loading