Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Normalize block types before internal multi-block operations #43764

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_zip",
size = "small",
srcs = ["tests/test_zip.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_backpressure_policies",
size = "medium",
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ def merge_sorted_blocks(
if len(blocks) == 0:
ret = ArrowBlockAccessor._empty_table()
else:
# Handle blocks of different types.
blocks = TableBlockAccessor.normalize_block_types(blocks, "arrow")
concat_and_sort = get_concat_and_sort_transform(DataContext.get_current())
ret = concat_and_sort(blocks, sort_key)
return ret, ArrowBlockAccessor(ret).get_metadata(None, exec_stats=stats.build())
Expand Down Expand Up @@ -600,6 +602,9 @@ def aggregate_combined_blocks(
else (lambda r: (0,))
)

# Handle blocks of different types.
blocks = TableBlockAccessor.normalize_block_types(blocks, "arrow")

iter = heapq.merge(
*[
ArrowBlockAccessor(block).iter_rows(public_row_format=False)
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ def merge_sorted_blocks(
if len(blocks) == 0:
ret = PandasBlockAccessor._empty_table()
else:
# Handle blocks of different types.
blocks = TableBlockAccessor.normalize_block_types(blocks, "pandas")
ret = pd.concat(blocks, ignore_index=True)
columns, ascending = sort_key.to_pandas_sort_args()
ret = ret.sort_values(by=columns, ascending=ascending)
Expand Down Expand Up @@ -528,6 +530,9 @@ def aggregate_combined_blocks(
else (lambda r: (0,))
)

# Handle blocks of different types.
blocks = TableBlockAccessor.normalize_block_types(blocks, "pandas")

iter = heapq.merge(
*[
PandasBlockAccessor(block).iter_rows(public_row_format=False)
Expand Down
77 changes: 73 additions & 4 deletions python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import collections
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Mapping,
Optional,
TypeVar,
Union,
)

import numpy as np

Expand Down Expand Up @@ -209,9 +219,21 @@ def _zip(self, acc: BlockAccessor) -> "Block":
def zip(self, other: "Block") -> "Block":
acc = BlockAccessor.for_block(other)
if not isinstance(acc, type(self)):
raise ValueError(
"Cannot zip {} with block of type {}".format(type(self), type(other))
)
if isinstance(self, TableBlockAccessor) and isinstance(
acc, TableBlockAccessor
):
# If block types are different, but still both of TableBlock type, try
# converting both to default block type before zipping.
self_norm, other_norm = TableBlockAccessor.normalize_block_types(
[self._table, other],
)
return BlockAccessor.for_block(self_norm).zip(other_norm)
else:
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In whic case, this ValueError will be triggered?

Copy link
Contributor Author

@scottjlee scottjlee Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Blocks which do not extend TableBlock class, i think this will be the case. since both ArrowBlock and PandasBlock are TableBlocks themselves, this isn't an issue for these classes, but this would cover any case in which we have other types of Blocks in the future. i can also remove this if we think it's not useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, it's fine to keep it.

"Cannot zip {} with block of type {}".format(
type(self), type(other)
)
)
if acc.num_rows() != self.num_rows():
raise ValueError(
"Cannot zip self (length {}) with block of length {}".format(
Expand All @@ -238,3 +260,50 @@ def sample(self, n_samples: int, sort_key: "SortKey") -> Any:
return self._empty_table()
k = min(n_samples, self.num_rows())
return self._sample(k, sort_key)

@classmethod
def normalize_block_types(
cls,
blocks: List[Block],
normalize_type: Optional[str] = None,
) -> List[Block]:
"""Normalize input blocks to the specified `normalize_type`. If the blocks
are already all of the same type, returns the original blocks.

Args:
blocks: A list of TableBlocks to be normalized.
normalize_type: The type to normalize the blocks to. If None,
the default block type (Arrow) is used.

Returns:
A list of blocks of the same type.
"""
seen_types = set()
for block in blocks:
acc = BlockAccessor.for_block(block)
if not isinstance(acc, TableBlockAccessor):
raise ValueError(
"Block type normalization is only supported for TableBlock, "
f"but received block of type: {type(block)}."
)
seen_types.add(type(block))

# Return original blocks if they are all of the same type.
if len(seen_types) <= 1:
return blocks

if normalize_type == "arrow":
results = [BlockAccessor.for_block(block).to_arrow() for block in blocks]
elif normalize_type == "pandas":
results = [BlockAccessor.for_block(block).to_pandas() for block in blocks]
else:
results = [BlockAccessor.for_block(block).to_default() for block in blocks]

if any(not isinstance(block, type(results[0])) for block in results):
raise ValueError(
"Expected all blocks to be of the same type after normalization, but "
f"got different types: {[type(b) for b in results]}. "
"Try using blocks of the same type to avoid the issue "
"with block normalization."
)
return results
148 changes: 25 additions & 123 deletions python/ray/data/tests/test_all_to_all.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import itertools
import math
import random
import time
Expand All @@ -13,133 +12,12 @@
from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Quantile, Std, Sum
from ray.data.context import DataContext
from ray.data.tests.conftest import * # noqa
from ray.data.tests.util import column_udf, named_values
from ray.data.tests.util import named_values
from ray.tests.conftest import * # noqa

RANDOM_SEED = 123


def test_zip(ray_start_regular_shared):
ds1 = ray.data.range(5, override_num_blocks=5)
ds2 = ray.data.range(5, override_num_blocks=5).map(
column_udf("id", lambda x: x + 1)
)
ds = ds1.zip(ds2)
assert ds.schema().names == ["id", "id_1"]
assert ds.take() == named_values(
["id", "id_1"], [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]
)
with pytest.raises(ValueError):
ds.zip(ray.data.range(3)).materialize()


@pytest.mark.parametrize(
"num_blocks1,num_blocks2",
list(itertools.combinations_with_replacement(range(1, 12), 2)),
)
def test_zip_different_num_blocks_combinations(
ray_start_regular_shared, num_blocks1, num_blocks2
):
n = 12
ds1 = ray.data.range(n, override_num_blocks=num_blocks1)
ds2 = ray.data.range(n, override_num_blocks=num_blocks2).map(
column_udf("id", lambda x: x + 1)
)
ds = ds1.zip(ds2)
assert ds.schema().names == ["id", "id_1"]
assert ds.take() == named_values(
["id", "id_1"], list(zip(range(n), range(1, n + 1)))
)


@pytest.mark.parametrize(
"num_cols1,num_cols2,should_invert",
[
(1, 1, False),
(4, 1, False),
(1, 4, True),
(1, 10, True),
(10, 10, False),
],
)
def test_zip_different_num_blocks_split_smallest(
ray_start_regular_shared,
num_cols1,
num_cols2,
should_invert,
):
n = 12
num_blocks1 = 4
num_blocks2 = 2
ds1 = ray.data.from_items(
[{str(i): i for i in range(num_cols1)}] * n, override_num_blocks=num_blocks1
)
ds2 = ray.data.from_items(
[{str(i): i for i in range(num_cols1, num_cols1 + num_cols2)}] * n,
override_num_blocks=num_blocks2,
)
ds = ds1.zip(ds2).materialize()
num_blocks = ds._plan._snapshot_blocks.executed_num_blocks()
assert ds.take() == [{str(i): i for i in range(num_cols1 + num_cols2)}] * n
if should_invert:
assert num_blocks == num_blocks2
else:
assert num_blocks == num_blocks1


def test_zip_pandas(ray_start_regular_shared):
ds1 = ray.data.from_pandas(pd.DataFrame({"col1": [1, 2], "col2": [4, 5]}))
ds2 = ray.data.from_pandas(pd.DataFrame({"col3": ["a", "b"], "col4": ["d", "e"]}))
ds = ds1.zip(ds2)
assert ds.count() == 2
assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds)
result = list(ds.take())
assert result[0] == {"col1": 1, "col2": 4, "col3": "a", "col4": "d"}

ds3 = ray.data.from_pandas(pd.DataFrame({"col2": ["a", "b"], "col4": ["d", "e"]}))
ds = ds1.zip(ds3)
assert ds.count() == 2
assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds)
result = list(ds.take())
assert result[0] == {"col1": 1, "col2": 4, "col2_1": "a", "col4": "d"}


def test_zip_arrow(ray_start_regular_shared):
ds1 = ray.data.range(5).map(lambda r: {"id": r["id"]})
ds2 = ray.data.range(5).map(lambda r: {"a": r["id"] + 1, "b": r["id"] + 2})
ds = ds1.zip(ds2)
assert ds.count() == 5
assert "{id: int64, a: int64, b: int64}" in str(ds)
result = list(ds.take())
assert result[0] == {"id": 0, "a": 1, "b": 2}

# Test duplicate column names.
ds = ds1.zip(ds1).zip(ds1)
assert ds.count() == 5
assert "{id: int64, id_1: int64, id_2: int64}" in str(ds)
result = list(ds.take())
assert result[0] == {"id": 0, "id_1": 0, "id_2": 0}


def test_zip_preserve_order(ray_start_regular_shared):
def foo(x):
import time

if x["item"] < 5:
time.sleep(1)
return x

num_items = 10
items = list(range(num_items))
ds1 = ray.data.from_items(items, override_num_blocks=num_items)
ds2 = ray.data.from_items(items, override_num_blocks=num_items)
ds2 = ds2.map_batches(foo, batch_size=1)
result = ds1.zip(ds2).take_all()
assert result == named_values(
["item", "item_1"], list(zip(range(num_items), range(num_items)))
), result


def test_empty_shuffle(ray_start_regular_shared):
ds = ray.data.range(100, override_num_blocks=100)
ds = ds.filter(lambda x: x)
Expand Down Expand Up @@ -1064,6 +942,30 @@ def func(group):
assert sorted([x["out"] for x in ds.take()]) == [1, 3]


@pytest.mark.parametrize("num_parts", [1, 30])
def test_groupby_map_groups_multiple_batch_formats(ray_start_regular_shared, num_parts):
# Reproduces https://github.com/ray-project/ray/issues/39206
def identity(batch):
return batch

xs = list(range(100))
ds = ray.data.from_items([{"A": (x % 3), "B": x} for x in xs]).repartition(
num_parts
)
grouped_ds = (
ds.groupby("A")
.map_groups(identity)
.map_batches(identity, batch_format="pandas")
)
agg_ds = grouped_ds.groupby("A").max("B")
assert agg_ds.count() == 3
assert list(agg_ds.sort("A").iter_rows()) == [
{"A": 0, "max(B)": 99},
{"A": 1, "max(B)": 97},
{"A": 2, "max(B)": 98},
]


def test_groupby_map_groups_extra_args(ray_start_regular_shared):
ds = ray.data.from_items(
[
Expand Down
Loading
Loading