Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add metadata override and inference in Dataset.to_dask(). #28625

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2895,7 +2895,17 @@ def make_generator():

return dataset

def to_dask(self) -> "dask.DataFrame":
def to_dask(
self,
meta: Union[
"pandas.DataFrame",
"pandas.Series",
Dict[str, Any],
Iterable[Any],
Tuple[Any],
None,
] = None,
) -> "dask.DataFrame":
"""Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.
Expand All @@ -2905,12 +2915,25 @@ def to_dask(self) -> "dask.DataFrame":

Time complexity: O(dataset size / parallelism)

Args:
meta: An empty pandas DataFrame or Series that matches the dtypes and column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the last 4 types from Union be dropped then?

Also if it's a Series, how does it correspond to multiple columns of the Dataset? Is it performing a column projection when meta is a subset columns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Clark was following Dask type here - meta: pd.DataFrame, pd.Series, dict, iterable, tuple, optional in https://docs.dask.org/en/stable/generated/dask.dataframe.from_delayed.html .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not quite match the type annotation so we should fix either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree, the comment should be updated.

names of the Dataset. By default, this will be inferred from the
underlying Dataset schema, with this argument supplying an optional
override.

Returns:
A Dask DataFrame created from this dataset.
"""
import dask
import dask.dataframe as dd
import pandas as pd

try:
import pyarrow as pa
except Exception:
pa = None

from ray.data._internal.pandas_block import PandasBlockSchema
from ray.util.client.common import ClientObjectRef
from ray.util.dask import ray_dask_get

Expand All @@ -2927,10 +2950,25 @@ def block_to_df(block: Block):
)
return block.to_pandas()

# TODO(Clark): Give Dask a Pandas-esque schema via the Pyarrow schema,
# once that's implemented.
if meta is None:
# Infer Dask metadata from Datasets schema.
schema = self.schema(fetch_if_missing=True)
if isinstance(schema, PandasBlockSchema):
meta = pd.DataFrame(
{
col: pd.Series(dtype=dtype)
for col, dtype in zip(schema.names, schema.types)
}
)
elif pa is not None and isinstance(schema, pa.Schema):
meta = schema.empty_table().to_pandas()
else:
# Simple dataset or schema not available.
meta = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this branch seems unnecessary, and it could be pyarrow not installed.


ddf = dd.from_delayed(
[block_to_df(block) for block in self.get_internal_block_refs()]
[block_to_df(block) for block in self.get_internal_block_refs()],
meta=meta,
)
return ddf

Expand Down
34 changes: 33 additions & 1 deletion python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2524,14 +2524,46 @@ def test_from_dask(ray_start_regular_shared):
assert df.equals(dfds)


def test_to_dask(ray_start_regular_shared):
@pytest.mark.parametrize("ds_format", ["pandas", "arrow"])
def test_to_dask(ray_start_regular_shared, ds_format):
from ray.util.dask import ray_dask_get

df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df = pd.concat([df1, df2])
ds = ray.data.from_pandas([df1, df2])
if ds_format == "arrow":
ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None)
ddf = ds.to_dask()
meta = ddf._meta
# Check metadata.
assert isinstance(meta, pd.DataFrame)
assert meta.empty
assert list(meta.columns) == ["one", "two"]
assert list(meta.dtypes) == [np.int64, object]
# Explicit Dask-on-Ray
assert df.equals(ddf.compute(scheduler=ray_dask_get))
# Implicit Dask-on-Ray.
assert df.equals(ddf.compute())

# Explicit metadata.
df1["two"] = df1["two"].astype(pd.StringDtype())
df2["two"] = df2["two"].astype(pd.StringDtype())
df = pd.concat([df1, df2])
ds = ray.data.from_pandas([df1, df2])
if ds_format == "arrow":
ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None)
ddf = ds.to_dask(
meta=pd.DataFrame(
{"one": pd.Series(dtype=np.int16), "two": pd.Series(dtype=pd.StringDtype())}
),
)
meta = ddf._meta
# Check metadata.
assert isinstance(meta, pd.DataFrame)
assert meta.empty
assert list(meta.columns) == ["one", "two"]
assert list(meta.dtypes) == [np.int16, pd.StringDtype()]
# Explicit Dask-on-Ray
assert df.equals(ddf.compute(scheduler=ray_dask_get))
# Implicit Dask-on-Ray.
Expand Down