diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 418b03bd97..d9ed72e1f5 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -2460,14 +2460,11 @@ def __contains__(self, col_name: str) -> bool: return col_name in self.column_names @DataframePublicAPI - def to_pandas( - self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False - ) -> "pandas.DataFrame": + def to_pandas(self, coerce_temporal_nanoseconds: bool = False) -> "pandas.DataFrame": """Converts the current DataFrame to a `pandas DataFrame `__. If results have not computed yet, collect will be called. Args: - cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False. coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas `__ for more information. Returns: @@ -2482,13 +2479,12 @@ def to_pandas( pd_df = result.to_pandas( schema=self._builder.schema(), - cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) return pd_df @DataframePublicAPI - def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.Table": + def to_arrow(self) -> "pyarrow.Table": """Converts the current DataFrame to a `pyarrow Table `__. If results have not computed yet, collect will be called. @@ -2498,11 +2494,17 @@ def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.T .. NOTE:: This call is **blocking** and will execute the DataFrame when called """ + for name in self.schema().column_names(): + if self.schema()[name].dtype._is_python_type(): + raise ValueError( + f"Cannot convert column {name} to Arrow type, found Python type: {self.schema()[name].dtype}" + ) + self.collect() result = self._result assert result is not None - return result.to_arrow(cast_tensors_to_ray_tensor_dtype) + return result.to_arrow() @DataframePublicAPI def to_pydict(self) -> Dict[str, List[Any]]: diff --git a/daft/datatype.py b/daft/datatype.py index 741bf438d1..6e19c60c4a 100644 --- a/daft/datatype.py +++ b/daft/datatype.py @@ -462,8 +462,8 @@ def from_numpy_dtype(cls, np_type: np.dtype) -> DataType: arrow_type = pa.from_numpy_dtype(np_type) return cls.from_arrow_type(arrow_type) - def to_arrow_dtype(self, cast_tensor_to_ray_type: builtins.bool = False) -> pa.DataType: - return self._dtype.to_arrow(cast_tensor_to_ray_type) + def to_arrow_dtype(self) -> pa.DataType: + return self._dtype.to_arrow() @classmethod def python(cls) -> DataType: diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 0cf68e4cbe..6f4269d27c 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -216,19 +216,17 @@ def to_pydict(self) -> dict[str, list[Any]]: def to_pandas( self, schema: Schema | None = None, - cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: merged_partition = self._get_merged_micropartition() return merged_partition.to_pandas( schema=schema, - cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) - def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table: + def to_arrow(self) -> pa.Table: merged_partition = self._get_merged_micropartition() - return merged_partition.to_arrow(cast_tensors_to_ray_tensor_dtype) + return merged_partition.to_arrow() def items(self) -> list[tuple[PartID, MaterializedResult[PartitionT]]]: """ diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 9ef83195ae..30c0eeda91 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -74,6 +74,12 @@ RAY_VERSION = tuple(int(s) for s in ray.__version__.split(".")[0:3]) +_RAY_DATA_ARROW_TENSOR_TYPE_AVAILABLE = True +try: + from ray.data.extensions import ArrowTensorArray, ArrowTensorType +except ImportError: + _RAY_DATA_ARROW_TENSOR_TYPE_AVAILABLE = False + @ray.remote def _glob_path_into_file_infos( @@ -95,7 +101,36 @@ def _glob_path_into_file_infos( @ray.remote def _make_ray_block_from_micropartition(partition: MicroPartition) -> RayDatasetBlock: try: - return partition.to_arrow(cast_tensors_to_ray_tensor_dtype=True) + daft_schema = partition.schema() + arrow_tbl = partition.to_arrow() + + # Convert arrays to Ray Data's native ArrowTensorType arrays + new_arrs = {} + for idx, field in enumerate(arrow_tbl.schema): + if daft_schema[field.name].dtype._is_fixed_shape_tensor_type(): + assert isinstance(field.type, pa.FixedShapeTensorType) + new_dtype = ArrowTensorType(field.type.shape, field.type.value_type) + arrow_arr = arrow_tbl[field.name].combine_chunks() + storage_arr = arrow_arr.storage + list_size = storage_arr.type.list_size + new_storage_arr = pa.ListArray.from_arrays( + pa.array( + list(range(0, (len(arrow_arr) + 1) * list_size, list_size)), + pa.int32(), + ), + storage_arr.values, + ) + new_arrs[idx] = ( + field.name, + pa.ExtensionArray.from_storage(new_dtype, new_storage_arr), + ) + elif daft_schema[field.name].dtype._is_tensor_type(): + assert isinstance(field.type, pa.ExtensionType) + new_arrs[idx] = (field.name, ArrowTensorArray.from_numpy(partition.get_column(field.name).to_pylist())) + for idx, (field_name, arr) in new_arrs.items(): + arrow_tbl = arrow_tbl.set_column(idx, pa.field(field_name, arr.type), arr) + + return arrow_tbl except pa.ArrowInvalid: return partition.to_pylist() diff --git a/daft/series.py b/daft/series.py index e4e562fbf1..82875b21cf 100644 --- a/daft/series.py +++ b/daft/series.py @@ -12,7 +12,6 @@ _RAY_DATA_EXTENSIONS_AVAILABLE = True try: from ray.data.extensions import ( - ArrowTensorArray, ArrowTensorType, ArrowVariableShapedTensorType, ) @@ -242,40 +241,21 @@ def rename(self, name: str) -> Series: def datatype(self) -> DataType: return DataType._from_pydatatype(self._series.data_type()) - def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Array: + def to_arrow(self) -> pa.Array: """ Convert this Series to an pyarrow array. """ dtype = self.datatype() - if cast_tensors_to_ray_tensor_dtype and (dtype._is_tensor_type() or dtype._is_fixed_shape_tensor_type()): - if not _RAY_DATA_EXTENSIONS_AVAILABLE: - raise ValueError("Trying to convert tensors to Ray tensor dtypes, but Ray is not installed.") - pyarrow_dtype = dtype.to_arrow_dtype(cast_tensor_to_ray_type=True) - if isinstance(pyarrow_dtype, ArrowTensorType): - assert dtype._is_fixed_shape_tensor_type() - arrow_series = self._series.to_arrow() - storage = arrow_series.storage - list_size = storage.type.list_size - storage = pa.ListArray.from_arrays( - pa.array( - list(range(0, (len(arrow_series) + 1) * list_size, list_size)), - pa.int32(), - ), - storage.values, - ) - return pa.ExtensionArray.from_storage(pyarrow_dtype, storage) - else: - # Variable-shaped tensor columns can't be converted directly to Ray's variable-shaped tensor extension - # type since it expects all tensor elements to have the same number of dimensions, which Daft does not enforce. - # TODO(Clark): Convert directly to Ray's variable-shaped tensor extension type when all tensor - # elements have the same number of dimensions, without going through pylist roundtrip. - return ArrowTensorArray.from_numpy(self.to_pylist()) - elif dtype._is_fixed_shape_tensor_type() and pyarrow_supports_fixed_shape_tensor(): - pyarrow_dtype = dtype.to_arrow_dtype(cast_tensor_to_ray_type=False) + arrow_arr = self._series.to_arrow() + + # Special-case for PyArrow FixedShapeTensor if it is supported by the version of PyArrow + # TODO: Push this down into self._series.to_arrow()? + if dtype._is_fixed_shape_tensor_type() and pyarrow_supports_fixed_shape_tensor(): + pyarrow_dtype = dtype.to_arrow_dtype() arrow_series = self._series.to_arrow() return pa.ExtensionArray.from_storage(pyarrow_dtype, arrow_series.storage) - else: - return self._series.to_arrow() + + return arrow_arr def to_pylist(self) -> list: """ diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index 66d1f8dd99..0646c8c7ef 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -139,10 +139,8 @@ def slice(self, start: int, end: int) -> MicroPartition: def to_table(self) -> Table: return Table._from_pytable(self._micropartition.to_table()) - def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table: - return self.to_table().to_arrow( - cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, convert_large_arrays=convert_large_arrays - ) + def to_arrow(self) -> pa.Table: + return self.to_table().to_arrow() def to_pydict(self) -> dict[str, list]: return self.to_table().to_pydict() @@ -153,12 +151,10 @@ def to_pylist(self) -> list[dict[str, Any]]: def to_pandas( self, schema: Schema | None = None, - cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: return self.to_table().to_pandas( schema=schema, - cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) diff --git a/daft/table/table.py b/daft/table/table.py index c8f6d70dea..81b845ba5b 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -29,12 +29,6 @@ from daft.logical.schema import Schema from daft.series import Series, item_to_series -_NUMPY_AVAILABLE = True -try: - import numpy as np -except ImportError: - _NUMPY_AVAILABLE = False - _PANDAS_AVAILABLE = True try: import pandas as pd @@ -42,7 +36,6 @@ _PANDAS_AVAILABLE = False if TYPE_CHECKING: - import numpy as np import pandas as pd import pyarrow as pa @@ -176,36 +169,9 @@ def to_table(self) -> Table: """For compatibility with MicroPartition""" return self - def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table: - python_fields = set() - tensor_fields = set() - for field in self.schema(): - if field.dtype._is_python_type(): - python_fields.add(field.name) - elif field.dtype._is_tensor_type() or field.dtype._is_fixed_shape_tensor_type(): - tensor_fields.add(field.name) - if python_fields or tensor_fields: - table = {} - for colname in self.column_names(): - column_series = self.get_column(colname) - if colname in python_fields: - column = column_series.to_pylist() - else: - column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype) - table[colname] = column - - tab = pa.Table.from_pydict(table) - else: - tab = pa.Table.from_batches([self._table.to_arrow_record_batch()]) - - if not convert_large_arrays: - return tab - - new_columns = [] - for col in tab.columns: - new_columns.append(_trim_pyarrow_large_arrays(col)) - - return pa.Table.from_arrays(new_columns, names=tab.column_names) + def to_arrow(self) -> pa.Table: + tab = pa.Table.from_pydict({colname: self.get_column(colname).to_arrow() for colname in self.column_names()}) + return tab def to_pydict(self) -> dict[str, list]: return {colname: self.get_column(colname).to_pylist() for colname in self.column_names()} @@ -220,13 +186,13 @@ def to_pylist(self) -> list[dict[str, Any]]: def to_pandas( self, schema: Schema | None = None, - cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: from packaging.version import parse if not _PANDAS_AVAILABLE: raise ImportError("Unable to import Pandas - please ensure that it is installed.") + python_fields = set() tensor_fields = set() for field in self.schema(): @@ -234,16 +200,17 @@ def to_pandas( python_fields.add(field.name) elif field.dtype._is_tensor_type() or field.dtype._is_fixed_shape_tensor_type(): tensor_fields.add(field.name) + if python_fields or tensor_fields: - # Use Python list representation for Python typed columns. table = {} for colname in self.column_names(): column_series = self.get_column(colname) - if colname in python_fields or (colname in tensor_fields and not cast_tensors_to_ray_tensor_dtype): + # Use Python list representation for Python typed columns or tensor columns (return as numpy) + if colname in python_fields or colname in tensor_fields: column = column_series.to_pylist() else: # Arrow-native field, so provide column as Arrow array. - column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype) + column_arrow = column_series.to_arrow() if parse(pa.__version__) < parse("13.0.0"): column = column_arrow.to_pandas() else: @@ -252,7 +219,7 @@ def to_pandas( return pd.DataFrame.from_dict(table) else: - arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype) + arrow_table = self.to_arrow() if parse(pa.__version__) < parse("13.0.0"): return arrow_table.to_pandas() else: @@ -559,30 +526,6 @@ def read_json( ) -def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray: - if pa.types.is_large_binary(arr.type) or pa.types.is_large_string(arr.type): - if pa.types.is_large_binary(arr.type): - target_type = pa.binary() - else: - target_type = pa.string() - - all_chunks = [] - for chunk in arr.chunks: - if len(chunk) == 0: - continue - offsets = np.frombuffer(chunk.buffers()[1], dtype=np.int64) - if offsets[-1] < 2**31: - all_chunks.append(chunk.cast(target_type)) - else: - raise ValueError( - f"Can not convert {arr.type} into {target_type} due to the offset array being too large: {offsets[-1]}. Maximum: {2**31}" - ) - - return pa.chunked_array(all_chunks, type=target_type) - else: - return arr - - def read_parquet_into_pyarrow( path: str, columns: list[str] | None = None, diff --git a/src/daft-core/src/python/datatype.rs b/src/daft-core/src/python/datatype.rs index c553a78eb0..e50608b05d 100644 --- a/src/daft-core/src/python/datatype.rs +++ b/src/daft-core/src/python/datatype.rs @@ -308,15 +308,10 @@ impl PyDataType { Ok(DataType::Python.into()) } - pub fn to_arrow( - &self, - py: Python, - cast_tensor_type_for_ray: Option, - ) -> PyResult { + pub fn to_arrow(&self, py: Python) -> PyResult { let pyarrow = py.import(pyo3::intern!(py, "pyarrow"))?; - let cast_tensor_to_ray_type = cast_tensor_type_for_ray.unwrap_or(false); - match (&self.dtype, cast_tensor_to_ray_type) { - (DataType::FixedShapeTensor(dtype, shape), false) => Ok( + match &self.dtype { + DataType::FixedShapeTensor(dtype, shape) => Ok( if py .import(pyo3::intern!(py, "daft.utils"))? .getattr(pyo3::intern!(py, "pyarrow_supports_fixed_shape_tensor"))? @@ -329,7 +324,7 @@ impl PyDataType { Self { dtype: *dtype.clone(), } - .to_arrow(py, None)?, + .to_arrow(py)?, pyo3::types::PyTuple::new(py, shape.clone()), ))? .to_object(py) @@ -339,18 +334,7 @@ impl PyDataType { ffi::to_py_schema(&self.dtype.to_arrow()?, py, pyarrow)? }, ), - (DataType::FixedShapeTensor(dtype, shape), true) => Ok(py - .import(pyo3::intern!(py, "ray.data.extensions"))? - .getattr(pyo3::intern!(py, "ArrowTensorType"))? - .call1(( - pyo3::types::PyTuple::new(py, shape.clone()), - Self { - dtype: *dtype.clone(), - } - .to_arrow(py, None)?, - ))? - .to_object(py)), - (_, _) => ffi::to_py_schema(&self.dtype.to_arrow()?, py, pyarrow)? + _ => ffi::to_py_schema(&self.dtype.to_arrow()?, py, pyarrow)? .getattr(py, pyo3::intern!(py, "type")), } } diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index 929c18ccf4..d8f76feff2 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -13,7 +13,6 @@ import pyarrow as pa import pyarrow.parquet as papq import pytest -from ray.data.extensions import ArrowTensorArray, TensorArray import daft from daft.api_annotations import APITypeError @@ -189,25 +188,6 @@ def test_create_dataframe_arrow(valid_data: list[dict[str, float]], multiple) -> assert df.to_arrow() == expected -def test_create_dataframe_arrow_tensor_ray(valid_data: list[dict[str, float]]) -> None: - pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} - shape = (2, 2) - arr = np.ones((len(valid_data),) + shape) - ata = ArrowTensorArray.from_numpy(arr) - pydict["tensor"] = ata - t = pa.Table.from_pydict(pydict) - df = daft.from_arrow(t) - assert set(df.column_names) == set(t.column_names) - # Tensor type should be inferred. - expected_tensor_dtype = DataType.tensor(DataType.float64(), shape) - assert df.schema()["tensor"].dtype == expected_tensor_dtype - casted_variety = t.schema.field("variety").with_type(pa.large_string()) - schema = t.schema.set(t.schema.get_field_index("variety"), casted_variety) - expected = t.cast(schema) - # Check roundtrip. - assert df.to_arrow(True) == expected - - @pytest.mark.skipif( not pyarrow_supports_fixed_shape_tensor(), reason=f"Arrow version {ARROW_VERSION} doesn't support the canonical tensor extension type.", @@ -274,10 +254,10 @@ def test_create_dataframe_arrow_unsupported_dtype(valid_data: list[dict[str, flo assert set(df.column_names) == set(t.column_names) # Type not natively supported, so should have Python object dtype. assert df.schema()["obj"].dtype == DataType.python() - casted_field = t.schema.field("variety").with_type(pa.large_string()) - expected = t.cast(t.schema.set(t.schema.get_field_index("variety"), casted_field)) - # Check roundtrip. - assert df.to_arrow() == expected + + # Assert that it raises an error when trying to convert back to arrow + with pytest.raises(ValueError): + df.to_arrow() ### @@ -312,18 +292,6 @@ def test_create_dataframe_pandas_py_object(valid_data: list[dict[str, float]]) - pd.testing.assert_frame_equal(df.to_pandas(), pd_df) -def test_create_dataframe_pandas_tensor(valid_data: list[dict[str, float]]) -> None: - pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} - shape = (2, 2) - pydict["tensor"] = TensorArray(np.ones((len(valid_data),) + shape)) - pd_df = pd.DataFrame(pydict) - df = daft.from_pandas(pd_df) - assert df.schema()["tensor"].dtype == DataType.tensor(DataType.float64(), shape) - assert set(df.column_names) == set(pd_df.columns) - # Check roundtrip. - pd.testing.assert_frame_equal(df.to_pandas(cast_tensors_to_ray_tensor_dtype=True), pd_df) - - @pytest.mark.parametrize( ["data", "expected_dtype"], [