Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ray-data] Add alias parameters to the aggregate function, and add quantile fn #34358

Merged
merged 7 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 136 additions & 11 deletions python/ray/data/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,17 @@ def __init__(self):
class Sum(_AggregateOnKeyBase):
"""Defines sum aggregation."""

def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
def __init__(
self,
on: Optional[KeyFn] = None,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"sum({str(on)})"

null_merge = _null_wrap_merge(ignore_nulls, lambda a1, a2: a1 + a2)

Expand All @@ -127,16 +136,25 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
null_merge,
),
finalize=_null_wrap_finalize(lambda a: a),
name=(f"sum({str(on)})"),
name=(self._rs_name),
)


@PublicAPI
class Min(_AggregateOnKeyBase):
"""Defines min aggregation."""

def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
def __init__(
self,
on: Optional[KeyFn] = None,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"min({str(on)})"

null_merge = _null_wrap_merge(ignore_nulls, min)

Expand All @@ -149,16 +167,25 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
null_merge,
),
finalize=_null_wrap_finalize(lambda a: a),
name=(f"min({str(on)})"),
name=(self._rs_name),
)


@PublicAPI
class Max(_AggregateOnKeyBase):
"""Defines max aggregation."""

def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
def __init__(
self,
on: Optional[KeyFn] = None,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"max({str(on)})"

null_merge = _null_wrap_merge(ignore_nulls, max)

Expand All @@ -171,16 +198,25 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
null_merge,
),
finalize=_null_wrap_finalize(lambda a: a),
name=(f"max({str(on)})"),
name=(self._rs_name),
)


@PublicAPI
class Mean(_AggregateOnKeyBase):
"""Defines mean aggregation."""

def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
def __init__(
self,
on: Optional[KeyFn] = None,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"mean({str(on)})"

null_merge = _null_wrap_merge(
ignore_nulls, lambda a1, a2: [a1[0] + a2[0], a1[1] + a2[1]]
Expand All @@ -207,7 +243,7 @@ def vectorized_mean(block: Block[T]) -> AggType:
null_merge,
),
finalize=_null_wrap_finalize(lambda a: a[0] / a[1]),
name=(f"mean({str(on)})"),
name=(self._rs_name),
)


Expand All @@ -229,8 +265,13 @@ def __init__(
on: Optional[KeyFn] = None,
ddof: int = 1,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"std({str(on)})"

def merge(a: List[float], b: List[float]):
# Merges two accumulations into one.
Expand Down Expand Up @@ -282,17 +323,26 @@ def finalize(a: List[float]):
null_merge,
),
finalize=_null_wrap_finalize(finalize),
name=(f"std({str(on)})"),
name=(self._rs_name),
)


@PublicAPI
class AbsMax(_AggregateOnKeyBase):
"""Defines absolute max aggregation."""

def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
def __init__(
self,
on: Optional[KeyFn] = None,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
on_fn = _to_on_fn(on)
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"abs_max({str(on)})"

super().__init__(
init=_null_wrap_init(lambda k: 0),
Expand All @@ -301,7 +351,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True):
ignore_nulls, on_fn, lambda a, r: max(a, abs(r))
),
finalize=_null_wrap_finalize(lambda a: a),
name=(f"abs_max({str(on)})"),
name=(self._rs_name),
)


Expand All @@ -312,3 +362,78 @@ def _to_on_fn(on: Optional[KeyFn]):
return lambda r: r[on]
else:
return on


@PublicAPI
class Quantile(_AggregateOnKeyBase):
"""Defines Quantile aggregation."""

def __init__(
self,
on: Optional[KeyFn] = None,
q: float = 0.5,
ignore_nulls: bool = True,
alias_name: Optional[KeyFn] = None,
):
self._set_key_fn(on)
self._q = q
if alias_name:
self._rs_name = alias_name
else:
self._rs_name = f"quantile({str(on)})"

def merge(a: List[int], b: List[int]):
if isinstance(a, List) and isinstance(b, List):
a.extend(b)
return a
if isinstance(a, List) and (not isinstance(b, List)):
if b is not None and b != "":
a.append(b)
return a
if isinstance(b, List) and (not isinstance(a, List)):
if a is not None and a != "":
b.append(a)
return b

ls = []
if a is not None and a != "":
ls.append(a)
if b is not None and b != "":
ls.append(b)
return ls

null_merge = _null_wrap_merge(ignore_nulls, merge)

def block_row_ls(block: Block[T]) -> AggType:
block_acc = BlockAccessor.for_block(block)
ls = []
for row in block_acc.iter_rows():
ls.append(row.get(on))
return ls

import math

def percentile(input_values, key=lambda x: x):
if not input_values:
return None
input_values = sorted(input_values)
k = (len(input_values) - 1) * self._q
f = math.floor(k)
c = math.ceil(k)
if f == c:
return key(input_values[int(k)])
d0 = key(input_values[int(f)]) * (c - k)
d1 = key(input_values[int(c)]) * (k - f)
return round(d0 + d1, 5)

super().__init__(
init=_null_wrap_init(lambda k: [0]),
merge=null_merge,
accumulate_block=_null_wrap_accumulate_block(
ignore_nulls,
block_row_ls,
null_merge,
),
finalize=_null_wrap_finalize(percentile),
name=(self._rs_name),
)
67 changes: 64 additions & 3 deletions python/ray/data/tests/test_all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pytest

import ray
from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum
from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum, Quantile
from ray.data.context import DataContext
from ray.data.tests.conftest import * # noqa
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -818,13 +818,14 @@ def test_groupby_arrow_multi_agg(ray_start_regular_shared, num_parts):
Max("B"),
Mean("B"),
Std("B"),
Quantile("B"),
)
)
assert agg_ds.count() == 3
agg_df = agg_ds.to_pandas()
expected_grouped = df.groupby("A")["B"]
np.testing.assert_array_equal(agg_df["count()"].to_numpy(), [34, 33, 33])
for agg in ["sum", "min", "max", "mean", "std"]:
for agg in ["sum", "min", "max", "mean", "quantile", "std"]:
result = agg_df[f"{agg}(B)"].to_numpy()
expected = getattr(expected_grouped, agg)().to_numpy()
if agg == "std":
Expand All @@ -843,9 +844,10 @@ def test_groupby_arrow_multi_agg(ray_start_regular_shared, num_parts):
Max("A"),
Mean("A"),
Std("A"),
Quantile("A"),
)
)
for agg in ["sum", "min", "max", "mean", "std"]:
for agg in ["sum", "min", "max", "mean", "quantile", "std"]:
result = result_row[f"{agg}(A)"]
expected = getattr(df["A"], agg)()
if agg == "std":
Expand All @@ -854,6 +856,65 @@ def test_groupby_arrow_multi_agg(ray_start_regular_shared, num_parts):
assert result == expected


@pytest.mark.parametrize("num_parts", [1, 30])
def test_groupby_arrow_multi_agg_alias(ray_start_regular_shared, num_parts):
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_multi_agg with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs})
agg_ds = (
ray.data.from_pandas(df)
.repartition(num_parts)
.groupby("A")
.aggregate(
Sum("B", alias_name="sum_b"),
Min("B", alias_name="min_b"),
Max("B", alias_name="max_b"),
Mean("B", alias_name="mean_b"),
Std("B", alias_name="std_b"),
Quantile("B", alias_name="quantile_b"),
)
)

agg_df = agg_ds.to_pandas()
expected_grouped = df.groupby("A")["B"]
for agg in ["sum", "min", "max", "mean", "quantile", "std"]:
result = agg_df[f"{agg}_b"].to_numpy()
print(agg)
print(result)
expected = getattr(expected_grouped, agg)().to_numpy()
print(expected)
if agg == "std":
np.testing.assert_array_almost_equal(result, expected)
else:
np.testing.assert_array_equal(result, expected)
# Test built-in global std aggregation
df = pd.DataFrame({"A": xs})
result_row = (
ray.data.from_pandas(df)
.repartition(num_parts)
.aggregate(
Sum("A", alias_name="sum_b"),
Min("A", alias_name="min_b"),
Max("A", alias_name="max_b"),
Mean("A", alias_name="mean_b"),
Std("A", alias_name="std_b"),
Quantile("A", alias_name="quantile_b"),
)
)
for agg in ["sum", "min", "max", "mean", "quantile", "std"]:
result = result_row[f"{agg}_b"]
print(result)
expected = getattr(df["A"], agg)()
print(expected)
if agg == "std":
assert math.isclose(result, expected)
else:
assert result == expected


def test_groupby_simple(ray_start_regular_shared):
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple with: {seed}")
Expand Down