Skip to content

Commit

Permalink
[Dataset] [DataFrame 2/n] Add pandas block format implementation (par…
Browse files Browse the repository at this point in the history
…tial) (#20988)

This PR adds pandas block format support by implementing `PandasRow`, `PandasBlockBuilder`, `PandasBlockAccessor`.

Note that `sort_and_partition`, `combine`, `merge_sorted_blocks`, `aggregate_combined_blocks` in `PandasBlockAccessor` redirects to arrow block format implementation for now. They'll be implemented in a later PR.

Co-authored-by: Clark Zinzow <[email protected]>
Co-authored-by: Eric Liang <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2022
1 parent 26057c4 commit 4a55d10
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 96 deletions.
11 changes: 6 additions & 5 deletions doc/examples/datasets_train/datasets_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def inference(dataset, model_cls: type, batch_size: int, result_path: str,
model_cls,
compute="actors",
batch_size=batch_size,
batch_format="pandas",
num_gpus=num_gpus,
num_cpus=0) \
.write_parquet(result_path)
Expand Down Expand Up @@ -578,8 +579,8 @@ def train_func(config):
read_dataset(data_path))

num_columns = len(train_dataset.schema().names)
# remove label column and internal Arrow column.
num_features = num_columns - 2
# remove label column.
num_features = num_columns - 1

NUM_EPOCHS = 2
BATCH_SIZE = 512
Expand Down Expand Up @@ -681,9 +682,9 @@ def __init__(self, load_model_func):
self.model = load_model_func().to(self.device)

def __call__(self, batch) -> "pd.DataFrame":
tensor = torch.FloatTensor(batch.to_pandas().values).to(
self.device)
return pd.DataFrame(self.model(tensor).cpu().detach().numpy())
tensor = torch.FloatTensor(batch.values).to(self.device)
return pd.DataFrame(
self.model(tensor).cpu().detach().numpy(), columns=["value"])

inference_dataset = preprocessor.preprocess_inference_data(
read_dataset(inference_path))
Expand Down
7 changes: 6 additions & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#
# Block data can be accessed in a uniform way via ``BlockAccessors`` such as
# ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``.
Block = Union[List[T], "pyarrow.Table", bytes]
Block = Union[List[T], "pyarrow.Table", "pandas.DataFrame", bytes]

# A list of block references pending computation by a single task. For example,
# this may be the output of a task reading a file.
Expand Down Expand Up @@ -196,11 +196,16 @@ def for_block(block: Block) -> "BlockAccessor[T]":
"""Create a block accessor for the given block."""
_check_pyarrow_version()
import pyarrow
import pandas

if isinstance(block, pyarrow.Table):
from ray.data.impl.arrow_block import \
ArrowBlockAccessor
return ArrowBlockAccessor(block)
elif isinstance(block, pandas.DataFrame):
from ray.data.impl.pandas_block import \
PandasBlockAccessor
return PandasBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data.impl.arrow_block import \
ArrowBlockAccessor
Expand Down
18 changes: 15 additions & 3 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# Whether block splitting is on by default
DEFAULT_BLOCK_SPLITTING_ENABLED = False

# Whether pandas block format is enabled.
# TODO (kfstorm): Remove this once stable.
DEFAULT_ENABLE_PANDAS_BLOCK = True


@DeveloperAPI
class DatasetContext:
Expand All @@ -23,12 +27,18 @@ class DatasetContext:
from the driver and remote workers via DatasetContext.get_current().
"""

def __init__(self, block_owner: ray.actor.ActorHandle,
block_splitting_enabled: bool, target_max_block_size: int):
def __init__(
self,
block_owner: ray.actor.ActorHandle,
block_splitting_enabled: bool,
target_max_block_size: int,
enable_pandas_block: bool,
):
"""Private constructor (use get_current() instead)."""
self.block_owner = block_owner
self.block_splitting_enabled = block_splitting_enabled
self.target_max_block_size = target_max_block_size
self.enable_pandas_block = enable_pandas_block

@staticmethod
def get_current() -> "DatasetContext":
Expand All @@ -45,7 +55,9 @@ def get_current() -> "DatasetContext":
_default_context = DatasetContext(
block_owner=None,
block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED,
target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE)
target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE,
enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK,
)

if _default_context.block_owner is None:
owner = _DesignatedBlockOwner.options(
Expand Down
85 changes: 46 additions & 39 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from ray.data.impl.sort import sort_impl
from ray.data.impl.block_list import BlockList
from ray.data.impl.lazy_block_list import LazyBlockList
from ray.data.impl.table_block import TableRow
from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder

# An output type of iter_batches() determined by the batch_format parameter.
Expand Down Expand Up @@ -230,11 +231,9 @@ def transform(block: Block) -> Iterable[Block]:
"or 'pyarrow', got: {}".format(batch_format))

applied = fn(view)
if isinstance(applied, list) or isinstance(applied, pa.Table):
applied = applied
elif isinstance(applied, pd.core.frame.DataFrame):
applied = pa.Table.from_pandas(applied)
else:
if not (isinstance(applied, list)
or isinstance(applied, pa.Table)
or isinstance(applied, pd.core.frame.DataFrame)):
raise ValueError("The map batches UDF returned the value "
f"{applied}, which is not allowed. "
"The return type must be either list, "
Expand Down Expand Up @@ -403,12 +402,15 @@ def repartition(self, num_blocks: int, *,
# Handle empty blocks.
if len(new_blocks) < num_blocks:
from ray.data.impl.arrow_block import ArrowBlockBuilder
from ray.data.impl.pandas_block import PandasBlockBuilder
from ray.data.impl.simple_block import SimpleBlockBuilder

num_empties = num_blocks - len(new_blocks)
dataset_format = self._dataset_format()
if dataset_format == "arrow":
builder = ArrowBlockBuilder()
elif dataset_format == "pandas":
builder = PandasBlockBuilder()
else:
builder = SimpleBlockBuilder()
empty_block = builder.build()
Expand Down Expand Up @@ -938,7 +940,7 @@ def _check_and_normalize_agg_on(self,
# Dataset is empty/cleared, let downstream ops handle this.
return on

if dataset_format == "arrow":
if dataset_format == "arrow" or dataset_format == "pandas":
# This should be cached from the ._dataset_format() check, so we
# don't fetch and we assert that the schema is not None.
schema = self.schema(fetch_if_missing=False)
Expand Down Expand Up @@ -971,32 +973,35 @@ def _check_and_normalize_agg_on(self,
and isinstance(on[0], str)):
raise ValueError(
"Can't aggregate on a column when using a simple Dataset; "
"use a callable `on` argument or use an Arrow Dataset "
"instead of a simple Dataset.")
"use a callable `on` argument or use an Arrow or Pandas"
" Dataset instead of a simple Dataset.")
return on

def _dataset_format(self) -> str:
"""Determine the format of the dataset. Possible values are: "arrow",
"simple".
"pandas", "simple".
This may block; if the schema is unknown, this will synchronously fetch
the schema for the first block.
"""
# We need schema to properly validate, so synchronously
# fetch it if necessary.
schema = self.schema(fetch_if_missing=True)
if schema is None:
raise ValueError(
"Dataset is empty or cleared, can't determine the format of "
"the dataset.")

try:
import pyarrow as pa
except ModuleNotFoundError:
return "simple"
else:
# We need schema to properly validate, so synchronously
# fetch it if necessary.
schema = self.schema(fetch_if_missing=True)
if schema is None:
raise ValueError(
"Dataset is empty or cleared, can't determine the format"
" of the dataset")
if isinstance(schema, pa.Schema):
return "arrow"
return "simple"
except ModuleNotFoundError:
pass
from ray.data.impl.pandas_block import PandasBlockSchema
if isinstance(schema, PandasBlockSchema):
return "pandas"
return "simple"

def _aggregate_on(self, agg_cls: type, on: Optional["AggregateOnTs"],
*args, **kwargs):
Expand Down Expand Up @@ -1025,6 +1030,18 @@ def _build_multicolumn_aggs(self,
on = [on]
return [agg_cls(on_, *args, **kwargs) for on_ in on]

def _aggregate_result(self, result: Union[Tuple, TableRow]) -> U:
if len(result) == 1:
if isinstance(result, tuple):
return result[0]
else:
# NOTE (kfstorm): We cannot call `result[0]` directly on
# `PandasRow` because indexing a column with position is not
# supported by pandas.
return list(result.values())[0]
else:
return result

def sum(self, on: Optional["AggregateOnTs"] = None) -> U:
"""Compute sum over entire dataset.
Expand Down Expand Up @@ -1075,10 +1092,8 @@ def sum(self, on: Optional["AggregateOnTs"] = None) -> U:
ret = self._aggregate_on(Sum, on)
if ret is None:
return 0
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def min(self, on: Optional["AggregateOnTs"] = None) -> U:
"""Compute minimum over entire dataset.
Expand Down Expand Up @@ -1130,10 +1145,8 @@ def min(self, on: Optional["AggregateOnTs"] = None) -> U:
ret = self._aggregate_on(Min, on)
if ret is None:
raise ValueError("Cannot compute min on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def max(self, on: Optional["AggregateOnTs"] = None) -> U:
"""Compute maximum over entire dataset.
Expand Down Expand Up @@ -1185,10 +1198,8 @@ def max(self, on: Optional["AggregateOnTs"] = None) -> U:
ret = self._aggregate_on(Max, on)
if ret is None:
raise ValueError("Cannot compute max on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def mean(self, on: Optional["AggregateOnTs"] = None) -> U:
"""Compute mean over entire dataset.
Expand Down Expand Up @@ -1240,10 +1251,8 @@ def mean(self, on: Optional["AggregateOnTs"] = None) -> U:
ret = self._aggregate_on(Mean, on)
if ret is None:
raise ValueError("Cannot compute mean on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def std(self, on: Optional["AggregateOnTs"] = None, ddof: int = 1) -> U:
"""Compute standard deviation over entire dataset.
Expand Down Expand Up @@ -1305,10 +1314,8 @@ def std(self, on: Optional["AggregateOnTs"] = None, ddof: int = 1) -> U:
ret = self._aggregate_on(Std, on, ddof=ddof)
if ret is None:
raise ValueError("Cannot compute std on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def sort(self,
key: Union[None, str, List[str], Callable[[T], Any]] = None,
Expand Down Expand Up @@ -2263,10 +2270,10 @@ def to_spark(self,
def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame":
"""Convert this dataset into a single Pandas DataFrame.
This is only supported for datasets convertible to Arrow records. An
error is raised if the number of records exceeds the provided limit.
Note that you can use ``.limit()`` on the dataset beforehand to
truncate the dataset manually.
This is only supported for datasets convertible to Arrow or Pandas
records. An error is raised if the number of records exceeds the
provided limit. Note that you can use ``.limit()`` on the dataset
beforehand to truncate the dataset manually.
Time complexity: O(dataset size)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/impl/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table":
view = _copy_table(view)
return view

def random_shuffle(self, random_seed: Optional[int]) -> List[T]:
def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table":
random = np.random.RandomState(random_seed)
return self._table.take(random.permutation(self.num_rows()))

Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/impl/delegating_block_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.data.impl.block_builder import BlockBuilder
from ray.data.impl.simple_block import SimpleBlockBuilder
from ray.data.impl.arrow_block import ArrowRow, ArrowBlockBuilder
from ray.data.impl.pandas_block import PandasRow, PandasBlockBuilder


class DelegatingBlockBuilder(BlockBuilder[T]):
Expand All @@ -13,6 +14,7 @@ def __init__(self):
def add(self, item: Any) -> None:

if self._builder is None:
# TODO (kfstorm): Maybe we can use Pandas block format for dict.
if isinstance(item, dict) or isinstance(item, ArrowRow):
import pyarrow
try:
Expand All @@ -22,6 +24,8 @@ def add(self, item: Any) -> None:
self._builder = ArrowBlockBuilder()
except (TypeError, pyarrow.lib.ArrowInvalid):
self._builder = SimpleBlockBuilder()
elif isinstance(item, PandasRow):
self._builder = PandasBlockBuilder()
else:
self._builder = SimpleBlockBuilder()
self._builder.add(item)
Expand Down
Loading

0 comments on commit 4a55d10

Please sign in to comment.