From dcb37aa8ba5bcbb188ab9d502cd604369d8c5061 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Mon, 19 Sep 2022 21:55:59 +0000 Subject: [PATCH 1/2] Add metadata override and inference in Dataset.to_dask(). --- python/ray/data/dataset.py | 46 ++++++++++++++++++++++++--- python/ray/data/tests/test_dataset.py | 34 +++++++++++++++++++- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index e214bca8585d..06e4aa153dba 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2895,7 +2895,17 @@ def make_generator(): return dataset - def to_dask(self) -> "dask.DataFrame": + def to_dask( + self, + meta: Union[ + "pandas.DataFrame", + "pandas.Series", + Dict[str, Any], + Iterable[Any], + Tuple[Any], + None, + ] = None, + ) -> "dask.DataFrame": """Convert this dataset into a Dask DataFrame. This is only supported for datasets convertible to Arrow records. @@ -2905,12 +2915,25 @@ def to_dask(self) -> "dask.DataFrame": Time complexity: O(dataset size / parallelism) + Args: + meta: An empty pandas DataFrame or Series that matches the dtypes and column + names of the Dataset. By default, this will be inferred from the + underlying Dataset schema, with this argument supplying an optional + override. + Returns: A Dask DataFrame created from this dataset. """ import dask import dask.dataframe as dd + import pandas as pd + try: + import pyarrow as pa + except Exception: + pa = None + + from ray.data._internal.pandas_block import PandasBlockSchema from ray.util.client.common import ClientObjectRef from ray.util.dask import ray_dask_get @@ -2927,10 +2950,25 @@ def block_to_df(block: Block): ) return block.to_pandas() - # TODO(Clark): Give Dask a Pandas-esque schema via the Pyarrow schema, - # once that's implemented. + if meta is None: + # Infer Dask metadata from Datasets schema. + schema = self.schema(fetch_if_missing=True) + if isinstance(schema, PandasBlockSchema): + meta = pd.DataFrame( + { + col: pd.Series(dtype=dtype) + for col, dtype in zip(schema.names, schema.types) + } + ) + elif pa is not None and isinstance(schema, pa.Schema): + meta = schema.empty_table().to_pandas() + else: + # Simple dataset or schema not available. + meta = None + ddf = dd.from_delayed( - [block_to_df(block) for block in self.get_internal_block_refs()] + [block_to_df(block) for block in self.get_internal_block_refs()], + meta=meta, ) return ddf diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 8bbbfd407b3c..5f19de2ac009 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2524,14 +2524,46 @@ def test_from_dask(ray_start_regular_shared): assert df.equals(dfds) -def test_to_dask(ray_start_regular_shared): +@pytest.mark.parametrize("ds_format", ["pandas", "arrow"]) +def test_to_dask(ray_start_regular_shared, ds_format): from ray.util.dask import ray_dask_get df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df = pd.concat([df1, df2]) ds = ray.data.from_pandas([df1, df2]) + if ds_format == "arrow": + ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None) ddf = ds.to_dask() + meta = ddf._meta + # Check metadata. + assert isinstance(meta, pd.DataFrame) + assert meta.empty + assert list(meta.columns) == ["one", "two"] + assert list(meta.dtypes) == [np.int64, object] + # Explicit Dask-on-Ray + assert df.equals(ddf.compute(scheduler=ray_dask_get)) + # Implicit Dask-on-Ray. + assert df.equals(ddf.compute()) + + # Explicit metadata. + df1["two"] = df1["two"].astype(pd.StringDtype()) + df2["two"] = df2["two"].astype(pd.StringDtype()) + df = pd.concat([df1, df2]) + ds = ray.data.from_pandas([df1, df2]) + if ds_format == "arrow": + ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None) + ddf = ds.to_dask( + meta=pd.DataFrame( + {"one": pd.Series(dtype=np.int16), "two": pd.Series(dtype=pd.StringDtype())} + ), + ) + meta = ddf._meta + # Check metadata. + assert isinstance(meta, pd.DataFrame) + assert meta.empty + assert list(meta.columns) == ["one", "two"] + assert list(meta.dtypes) == [np.int16, pd.StringDtype()] # Explicit Dask-on-Ray assert df.equals(ddf.compute(scheduler=ray_dask_get)) # Implicit Dask-on-Ray. From e216150493e305816b314748829d69d66cab01b6 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 22 Sep 2022 21:08:02 +0000 Subject: [PATCH 2/2] Update meta arg docstring to detail other format options. --- python/ray/data/dataset.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 06e4aa153dba..421c0f32b95b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2917,9 +2917,14 @@ def to_dask( Args: meta: An empty pandas DataFrame or Series that matches the dtypes and column - names of the Dataset. By default, this will be inferred from the - underlying Dataset schema, with this argument supplying an optional - override. + names of the Dataset. This metadata is necessary for many algorithms in + dask dataframe to work. For ease of use, some alternative inputs are + also available. Instead of a DataFrame, a dict of ``{name: dtype}`` or + iterable of ``(name, dtype)`` can be provided (note that the order of + the names should match the order of the columns). Instead of a series, a + tuple of ``(name, dtype)`` can be used. + By default, this will be inferred from the underlying Dataset schema, + with this argument supplying an optional override. Returns: A Dask DataFrame created from this dataset. @@ -2962,9 +2967,6 @@ def block_to_df(block: Block): ) elif pa is not None and isinstance(schema, pa.Schema): meta = schema.empty_table().to_pandas() - else: - # Simple dataset or schema not available. - meta = None ddf = dd.from_delayed( [block_to_df(block) for block in self.get_internal_block_refs()],