Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[Dataset] [DataFrame 2/n] Add pandas block format implementation (partial) (#20988) (#21661)" #21894

Merged
merged 8 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions doc/source/ray-core/_examples/datasets_train/datasets_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def inference(
model_cls,
compute="actors",
batch_size=batch_size,
batch_format="pandas",
num_gpus=num_gpus,
num_cpus=0,
).write_parquet(result_path)
Expand Down Expand Up @@ -584,8 +585,8 @@ def train_func(config):
)

num_columns = len(train_dataset.schema().names)
# remove label column and internal Arrow column.
num_features = num_columns - 2
# remove label column.
num_features = num_columns - 1

NUM_EPOCHS = 2
BATCH_SIZE = 512
Expand Down Expand Up @@ -688,8 +689,10 @@ def __init__(self, load_model_func):
self.model = load_model_func().to(self.device)

def __call__(self, batch) -> "pd.DataFrame":
tensor = torch.FloatTensor(batch.to_pandas().values).to(self.device)
return pd.DataFrame(self.model(tensor).cpu().detach().numpy())
tensor = torch.FloatTensor(batch.values).to(self.device)
return pd.DataFrame(
self.model(tensor).cpu().detach().numpy(), columns=["value"]
)

inference_dataset = preprocessor.preprocess_inference_data(
read_dataset(inference_path)
Expand Down
7 changes: 6 additions & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None:
#
# Block data can be accessed in a uniform way via ``BlockAccessors`` such as
# ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``.
Block = Union[List[T], "pyarrow.Table", bytes]
Block = Union[List[T], "pyarrow.Table", "pandas.DataFrame", bytes]

# A list of block references pending computation by a single task. For example,
# this may be the output of a task reading a file.
Expand Down Expand Up @@ -260,11 +260,16 @@ def for_block(block: Block) -> "BlockAccessor[T]":
"""Create a block accessor for the given block."""
_check_pyarrow_version()
import pyarrow
import pandas

if isinstance(block, pyarrow.Table):
from ray.data.impl.arrow_block import ArrowBlockAccessor

return ArrowBlockAccessor(block)
elif isinstance(block, pandas.DataFrame):
from ray.data.impl.pandas_block import PandasBlockAccessor

return PandasBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data.impl.arrow_block import ArrowBlockAccessor

Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# Whether block splitting is on by default
DEFAULT_BLOCK_SPLITTING_ENABLED = False

# Whether pandas block format is enabled.
# TODO (kfstorm): Remove this once stable.
DEFAULT_ENABLE_PANDAS_BLOCK = True


@DeveloperAPI
class DatasetContext:
Expand All @@ -28,11 +32,13 @@ def __init__(
block_owner: ray.actor.ActorHandle,
block_splitting_enabled: bool,
target_max_block_size: int,
enable_pandas_block: bool,
):
"""Private constructor (use get_current() instead)."""
self.block_owner = block_owner
self.block_splitting_enabled = block_splitting_enabled
self.target_max_block_size = target_max_block_size
self.enable_pandas_block = enable_pandas_block

@staticmethod
def get_current() -> "DatasetContext":
Expand All @@ -50,6 +56,7 @@ def get_current() -> "DatasetContext":
block_owner=None,
block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED,
target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE,
enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK,
)

if _default_context.block_owner is None:
Expand Down
58 changes: 34 additions & 24 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from ray.data.impl.sort import sort_impl
from ray.data.impl.block_list import BlockList
from ray.data.impl.lazy_block_list import LazyBlockList
from ray.data.impl.table_block import TableRow
from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder

# An output type of iter_batches() determined by the batch_format parameter.
Expand Down Expand Up @@ -257,11 +258,11 @@ def transform(block: Block) -> Iterable[Block]:
)

applied = fn(view)
if isinstance(applied, list) or isinstance(applied, pa.Table):
applied = applied
elif isinstance(applied, pd.core.frame.DataFrame):
applied = pa.Table.from_pandas(applied)
else:
if not (
isinstance(applied, list)
or isinstance(applied, pa.Table)
or isinstance(applied, pd.core.frame.DataFrame)
):
raise ValueError(
"The map batches UDF returned the value "
f"{applied}, which is not allowed. "
Expand Down Expand Up @@ -434,12 +435,15 @@ def repartition(self, num_blocks: int, *, shuffle: bool = False) -> "Dataset[T]"
# Handle empty blocks.
if len(new_blocks) < num_blocks:
from ray.data.impl.arrow_block import ArrowBlockBuilder
from ray.data.impl.pandas_block import PandasBlockBuilder
from ray.data.impl.simple_block import SimpleBlockBuilder

num_empties = num_blocks - len(new_blocks)
dataset_format = self._dataset_format()
if dataset_format == "arrow":
builder = ArrowBlockBuilder()
elif dataset_format == "pandas":
builder = PandasBlockBuilder()
else:
builder = SimpleBlockBuilder()
empty_block = builder.build()
Expand Down Expand Up @@ -1024,10 +1028,8 @@ def sum(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
ret = self._aggregate_on(Sum, on)
if ret is None:
return 0
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def min(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
"""Compute minimum over entire dataset.
Expand Down Expand Up @@ -1079,10 +1081,8 @@ def min(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
ret = self._aggregate_on(Min, on)
if ret is None:
raise ValueError("Cannot compute min on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def max(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
"""Compute maximum over entire dataset.
Expand Down Expand Up @@ -1134,10 +1134,8 @@ def max(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
ret = self._aggregate_on(Max, on)
if ret is None:
raise ValueError("Cannot compute max on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def mean(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
"""Compute mean over entire dataset.
Expand Down Expand Up @@ -1189,10 +1187,8 @@ def mean(self, on: Union[KeyFn, List[KeyFn]] = None) -> U:
ret = self._aggregate_on(Mean, on)
if ret is None:
raise ValueError("Cannot compute mean on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def std(self, on: Union[KeyFn, List[KeyFn]] = None, ddof: int = 1) -> U:
"""Compute standard deviation over entire dataset.
Expand Down Expand Up @@ -1254,10 +1250,8 @@ def std(self, on: Union[KeyFn, List[KeyFn]] = None, ddof: int = 1) -> U:
ret = self._aggregate_on(Std, on, ddof=ddof)
if ret is None:
raise ValueError("Cannot compute std on an empty dataset")
elif len(ret) == 1:
return ret[0]
else:
return ret
return self._aggregate_result(ret)

def sort(self, key: KeyFn = None, descending: bool = False) -> "Dataset[T]":
"""Sort the dataset by the specified key column or key function.
Expand Down Expand Up @@ -2235,10 +2229,10 @@ def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame"
def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame":
"""Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow records. An
error is raised if the number of records exceeds the provided limit.
Note that you can use ``.limit()`` on the dataset beforehand to
truncate the dataset manually.
This is only supported for datasets convertible to Arrow or Pandas
records. An error is raised if the number of records exceeds the
provided limit. Note that you can use ``.limit()`` on the dataset
beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Expand Down Expand Up @@ -2567,6 +2561,10 @@ def _dataset_format(self) -> str:
return "arrow"
except ModuleNotFoundError:
pass
from ray.data.impl.pandas_block import PandasBlockSchema

if isinstance(schema, PandasBlockSchema):
return "pandas"
return "simple"

def _aggregate_on(
Expand Down Expand Up @@ -2613,6 +2611,18 @@ def _build_multicolumn_aggs(
on = [on]
return [agg_cls(on_, *args, **kwargs) for on_ in on]

def _aggregate_result(self, result: Union[Tuple, TableRow]) -> U:
if len(result) == 1:
if isinstance(result, tuple):
return result[0]
else:
# NOTE (kfstorm): We cannot call `result[0]` directly on
# `PandasRow` because indexing a column with position is not
# supported by pandas.
return list(result.values())[0]
else:
return result

def __repr__(self) -> str:
schema = self.schema()
if schema is None:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/impl/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table":
view = _copy_table(view)
return view

def random_shuffle(self, random_seed: Optional[int]) -> List[T]:
def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table":
random = np.random.RandomState(random_seed)
return self._table.take(random.permutation(self.num_rows()))

Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/impl/delegating_block_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.data.impl.block_builder import BlockBuilder
from ray.data.impl.simple_block import SimpleBlockBuilder
from ray.data.impl.arrow_block import ArrowRow, ArrowBlockBuilder
from ray.data.impl.pandas_block import PandasRow, PandasBlockBuilder


class DelegatingBlockBuilder(BlockBuilder[T]):
Expand All @@ -13,6 +14,7 @@ def __init__(self):
def add(self, item: Any) -> None:

if self._builder is None:
# TODO (kfstorm): Maybe we can use Pandas block format for dict.
if isinstance(item, dict) or isinstance(item, ArrowRow):
import pyarrow

Expand All @@ -23,6 +25,8 @@ def add(self, item: Any) -> None:
self._builder = ArrowBlockBuilder()
except (TypeError, pyarrow.lib.ArrowInvalid):
self._builder = SimpleBlockBuilder()
elif isinstance(item, PandasRow):
self._builder = PandasBlockBuilder()
else:
self._builder = SimpleBlockBuilder()
self._builder.add(item)
Expand Down
Loading