From 14b9d753342f7c0125e0db0351608c2e2cf75e4f Mon Sep 17 00:00:00 2001 From: rongjianmin1 Date: Thu, 13 Apr 2023 11:36:47 +0800 Subject: [PATCH 1/6] =?UTF-8?q?add=20agg=20fn=20alias=20arg=20and=20add=20?= =?UTF-8?q?quantile=E3=80=81mode=20fn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/ray/data/aggregate.py | 179 ++++++++++++++++++++++++++++++++--- 1 file changed, 168 insertions(+), 11 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 0d8ee4117226..82c60137fc3e 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -113,11 +113,16 @@ def __init__(self): class Sum(_AggregateOnKeyBase): """Defines sum aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): + def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): self._set_key_fn(on) null_merge = _null_wrap_merge(ignore_nulls, lambda a1, a2: a1 + a2) + if alias_name: + rs_name = alias_name + else: + rs_name = f"sum({str(on)})" + super().__init__( init=_null_wrap_init(lambda k: 0), merge=null_merge, @@ -127,7 +132,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): null_merge, ), finalize=_null_wrap_finalize(lambda a: a), - name=(f"sum({str(on)})"), + name=(rs_name), ) @@ -135,11 +140,16 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): class Min(_AggregateOnKeyBase): """Defines min aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): + def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): self._set_key_fn(on) null_merge = _null_wrap_merge(ignore_nulls, min) + if alias_name: + rs_name = alias_name + else: + rs_name = f"min({str(on)})" + super().__init__( init=_null_wrap_init(lambda k: float("inf")), merge=null_merge, @@ -149,7 +159,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): null_merge, ), finalize=_null_wrap_finalize(lambda a: a), - name=(f"min({str(on)})"), + name=(rs_name), ) @@ -157,11 +167,16 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): class Max(_AggregateOnKeyBase): """Defines max aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): + def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): self._set_key_fn(on) null_merge = _null_wrap_merge(ignore_nulls, max) + if alias_name: + rs_name = alias_name + else: + rs_name = f"max({str(on)})" + super().__init__( init=_null_wrap_init(lambda k: float("-inf")), merge=null_merge, @@ -171,7 +186,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): null_merge, ), finalize=_null_wrap_finalize(lambda a: a), - name=(f"max({str(on)})"), + name=(rs_name), ) @@ -179,7 +194,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): class Mean(_AggregateOnKeyBase): """Defines mean aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): + def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): self._set_key_fn(on) null_merge = _null_wrap_merge( @@ -198,6 +213,11 @@ def vectorized_mean(block: Block[T]) -> AggType: return None return [sum_, count] + if alias_name: + rs_name = alias_name + else: + rs_name = f"mean({str(on)})" + super().__init__( init=_null_wrap_init(lambda k: [0, 0]), merge=null_merge, @@ -207,7 +227,7 @@ def vectorized_mean(block: Block[T]) -> AggType: null_merge, ), finalize=_null_wrap_finalize(lambda a: a[0] / a[1]), - name=(f"mean({str(on)})"), + name=(rs_name), ) @@ -229,6 +249,7 @@ def __init__( on: Optional[KeyFn] = None, ddof: int = 1, ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None ): self._set_key_fn(on) @@ -273,6 +294,11 @@ def finalize(a: List[float]): return 0.0 return math.sqrt(M2 / (count - ddof)) + if alias_name: + rs_name = alias_name + else: + rs_name = f"std({str(on)})" + super().__init__( init=_null_wrap_init(lambda k: [0, 0, 0]), merge=null_merge, @@ -282,7 +308,7 @@ def finalize(a: List[float]): null_merge, ), finalize=_null_wrap_finalize(finalize), - name=(f"std({str(on)})"), + name=(rs_name), ) @@ -290,10 +316,15 @@ def finalize(a: List[float]): class AbsMax(_AggregateOnKeyBase): """Defines absolute max aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): + def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): self._set_key_fn(on) on_fn = _to_on_fn(on) + if alias_name: + rs_name = alias_name + else: + rs_name = f"abs_max({str(on)})" + super().__init__( init=_null_wrap_init(lambda k: 0), merge=_null_wrap_merge(ignore_nulls, max), @@ -301,7 +332,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): ignore_nulls, on_fn, lambda a, r: max(a, abs(r)) ), finalize=_null_wrap_finalize(lambda a: a), - name=(f"abs_max({str(on)})"), + name=(rs_name), ) @@ -312,3 +343,129 @@ def _to_on_fn(on: Optional[KeyFn]): return lambda r: r[on] else: return on + + +@PublicAPI +class Quantile(_AggregateOnKeyBase): + """Defines Quantile aggregation.""" + + def __init__(self, on: Optional[KeyFn] = None, percent: float = 0.5, ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None): + self._set_key_fn(on) + self._percent = percent + + def merge(a: List[int], b: List[int]): + if isinstance(a, List) and isinstance(b, List): + a.extend(b) + return a + if isinstance(a, List) and (not isinstance(b, List)): + if b is not None and b != '': + a.append(b) + return a + if isinstance(b, List) and (not isinstance(a, List)): + if a is not None and a != '': + b.append(a) + return b + + ls = [] + if a is not None and a != '': + ls.append(a) + if b is not None and b != '': + ls.append(b) + return ls + + null_merge = _null_wrap_merge(ignore_nulls, merge) + + def block_row_ls(block: Block[T]) -> AggType: + block_acc = BlockAccessor.for_block(block) + ls = [] + for row in block_acc.iter_rows(): + ls.append(row.get(on)) + return ls + + if alias_name: + rs_name = alias_name + else: + rs_name = (f"Quantile({str(on)})") + + import math + def percentile(N, key=lambda x: x): + if not N: + return None + N = sorted(N) + k = (len(N) - 1) * self._percent + f = math.floor(k) + c = math.ceil(k) + if f == c: + return key(N[int(k)]) + d0 = key(N[int(f)]) * (c - k) + d1 = key(N[int(c)]) * (k - f) + return round(d0 + d1, 5) + + super().__init__( + init=_null_wrap_init(lambda k: [0]), + merge=null_merge, + accumulate_block=_null_wrap_accumulate_block( + ignore_nulls, + block_row_ls, + null_merge, + ), + finalize=_null_wrap_finalize(percentile), + name=(rs_name), + ) + + +@PublicAPI +class Mode(_AggregateOnKeyBase): + """Defines Mode aggregation.""" + + def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None): + self._set_key_fn(on) + + def merge(a: dict, b: dict): + for key in b: + if a.__contains__(key): + a[key] = a[key] + b[key] + else: + a[key] = b[key] + return a + + null_merge = _null_wrap_merge(ignore_nulls, merge) + + def block_row_dicts(block: Block[T]) -> AggType: + block_acc = BlockAccessor.for_block(block) + elem_dict = {} + for row in block_acc.iter_rows(): + value = row.get(on) + if value is not None: + if elem_dict.__contains__(value): + elem_dict[value] = elem_dict[value] + 1 + else: + elem_dict[value] = 1 + return elem_dict + + if alias_name: + rs_name = alias_name + else: + rs_name = (f"Mode({str(on)})") + + def mode_value(v: dict): + sorted_v = sorted(v.items(), key=lambda x: x[1], reverse=True) + return sorted_v[0][0] + + super().__init__( + init=_null_wrap_init(lambda k: [0]), + merge=null_merge, + accumulate_block=_null_wrap_accumulate_block( + ignore_nulls, + block_row_dicts, + null_merge, + ), + finalize=_null_wrap_finalize(mode_value), + name=(rs_name), + ) + + + + From 37a539c52ec5ac9c56ee9c2ea89b5cb037b41dbc Mon Sep 17 00:00:00 2001 From: rongjianmin1 Date: Thu, 13 Apr 2023 14:45:29 +0800 Subject: [PATCH 2/6] update aggregate.py --- python/ray/data/aggregate.py | 50 ------------------------------------ 1 file changed, 50 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 82c60137fc3e..38f3e76bba34 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -415,56 +415,6 @@ def percentile(N, key=lambda x: x): ) -@PublicAPI -class Mode(_AggregateOnKeyBase): - """Defines Mode aggregation.""" - - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, - alias_name: Optional[KeyFn] = None): - self._set_key_fn(on) - - def merge(a: dict, b: dict): - for key in b: - if a.__contains__(key): - a[key] = a[key] + b[key] - else: - a[key] = b[key] - return a - - null_merge = _null_wrap_merge(ignore_nulls, merge) - - def block_row_dicts(block: Block[T]) -> AggType: - block_acc = BlockAccessor.for_block(block) - elem_dict = {} - for row in block_acc.iter_rows(): - value = row.get(on) - if value is not None: - if elem_dict.__contains__(value): - elem_dict[value] = elem_dict[value] + 1 - else: - elem_dict[value] = 1 - return elem_dict - - if alias_name: - rs_name = alias_name - else: - rs_name = (f"Mode({str(on)})") - - def mode_value(v: dict): - sorted_v = sorted(v.items(), key=lambda x: x[1], reverse=True) - return sorted_v[0][0] - - super().__init__( - init=_null_wrap_init(lambda k: [0]), - merge=null_merge, - accumulate_block=_null_wrap_accumulate_block( - ignore_nulls, - block_row_dicts, - null_merge, - ), - finalize=_null_wrap_finalize(mode_value), - name=(rs_name), - ) From a5846d5b376f59864839b644c12b19f293fb12b3 Mon Sep 17 00:00:00 2001 From: rongjianmin1 Date: Fri, 14 Apr 2023 14:48:53 +0800 Subject: [PATCH 3/6] add alias test case and fix quantile name --- python/ray/data/__init__.py | 10 +-- python/ray/data/_internal/compute.py | 2 +- python/ray/data/aggregate.py | 63 +++++++++++------ .../ray/data/tests/test_dataset_all_to_all.py | 67 ++++++++++++++++++- 4 files changed, 114 insertions(+), 28 deletions(-) diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index 3b72e4084932..64b48a45c84f 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -6,16 +6,16 @@ if sys.version_info >= (3, 7): import pandas # noqa -from ray.data._internal.compute import ActorPoolStrategy +from python.ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.progress_bar import set_progress_bars from ray.data._internal.execution.interfaces import ExecutionOptions, ExecutionResources -from ray.data.dataset import Dataset, Datastream -from ray.data.context import DatasetContext, DataContext -from ray.data.dataset_iterator import DatasetIterator, DataIterator +from python.ray.data.dataset import Dataset, Datastream +from python.ray.data.context import DatasetContext, DataContext +from python.ray.data.dataset_iterator import DatasetIterator, DataIterator from ray.data.dataset_pipeline import DatasetPipeline from ray.data.datasource import Datasource, ReadTask from ray.data.preprocessor import Preprocessor -from ray.data.read_api import ( # noqa: F401 +from python.ray.data.read_api import ( # noqa: F401 from_arrow, from_arrow_refs, from_dask, diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 745556f4b023..0eed13ba59ae 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -19,7 +19,7 @@ CallableClass, RowUDF, ) -from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext +from python.ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext from ray.types import ObjectRef from ray.util.annotations import DeveloperAPI, PublicAPI diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 38f3e76bba34..fa3fe6183704 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -113,7 +113,12 @@ def __init__(self): class Sum(_AggregateOnKeyBase): """Defines sum aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): + def __init__( + self, + on: Optional[KeyFn] = None, + ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None, + ): self._set_key_fn(on) null_merge = _null_wrap_merge(ignore_nulls, lambda a1, a2: a1 + a2) @@ -140,7 +145,12 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_ class Min(_AggregateOnKeyBase): """Defines min aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): + def __init__( + self, + on: Optional[KeyFn] = None, + ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None, + ): self._set_key_fn(on) null_merge = _null_wrap_merge(ignore_nulls, min) @@ -167,7 +177,12 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_ class Max(_AggregateOnKeyBase): """Defines max aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): + def __init__( + self, + on: Optional[KeyFn] = None, + ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None, + ): self._set_key_fn(on) null_merge = _null_wrap_merge(ignore_nulls, max) @@ -194,7 +209,12 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_ class Mean(_AggregateOnKeyBase): """Defines mean aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): + def __init__( + self, + on: Optional[KeyFn] = None, + ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None, + ): self._set_key_fn(on) null_merge = _null_wrap_merge( @@ -249,7 +269,7 @@ def __init__( on: Optional[KeyFn] = None, ddof: int = 1, ignore_nulls: bool = True, - alias_name: Optional[KeyFn] = None + alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) @@ -316,7 +336,12 @@ def finalize(a: List[float]): class AbsMax(_AggregateOnKeyBase): """Defines absolute max aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None): + def __init__( + self, + on: Optional[KeyFn] = None, + ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None, + ): self._set_key_fn(on) on_fn = _to_on_fn(on) @@ -349,8 +374,13 @@ def _to_on_fn(on: Optional[KeyFn]): class Quantile(_AggregateOnKeyBase): """Defines Quantile aggregation.""" - def __init__(self, on: Optional[KeyFn] = None, percent: float = 0.5, ignore_nulls: bool = True, - alias_name: Optional[KeyFn] = None): + def __init__( + self, + on: Optional[KeyFn] = None, + percent: float = 0.5, + ignore_nulls: bool = True, + alias_name: Optional[KeyFn] = None, + ): self._set_key_fn(on) self._percent = percent @@ -359,18 +389,18 @@ def merge(a: List[int], b: List[int]): a.extend(b) return a if isinstance(a, List) and (not isinstance(b, List)): - if b is not None and b != '': + if b is not None and b != "": a.append(b) return a if isinstance(b, List) and (not isinstance(a, List)): - if a is not None and a != '': + if a is not None and a != "": b.append(a) return b ls = [] - if a is not None and a != '': + if a is not None and a != "": ls.append(a) - if b is not None and b != '': + if b is not None and b != "": ls.append(b) return ls @@ -386,9 +416,10 @@ def block_row_ls(block: Block[T]) -> AggType: if alias_name: rs_name = alias_name else: - rs_name = (f"Quantile({str(on)})") + rs_name = f"quantile({str(on)})" import math + def percentile(N, key=lambda x: x): if not N: return None @@ -413,9 +444,3 @@ def percentile(N, key=lambda x: x): finalize=_null_wrap_finalize(percentile), name=(rs_name), ) - - - - - - diff --git a/python/ray/data/tests/test_dataset_all_to_all.py b/python/ray/data/tests/test_dataset_all_to_all.py index f7120da3dbfc..c81623a3acc9 100644 --- a/python/ray/data/tests/test_dataset_all_to_all.py +++ b/python/ray/data/tests/test_dataset_all_to_all.py @@ -9,7 +9,7 @@ import pytest import ray -from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum +from ray.data.aggregate import AggregateFn, Count, Max, Mean, Min, Std, Sum, Quantile from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa from ray.tests.conftest import * # noqa @@ -818,13 +818,14 @@ def test_groupby_arrow_multi_agg(ray_start_regular_shared, num_parts): Max("B"), Mean("B"), Std("B"), + Quantile("B"), ) ) assert agg_ds.count() == 3 agg_df = agg_ds.to_pandas() expected_grouped = df.groupby("A")["B"] np.testing.assert_array_equal(agg_df["count()"].to_numpy(), [34, 33, 33]) - for agg in ["sum", "min", "max", "mean", "std"]: + for agg in ["sum", "min", "max", "mean", "quantile", "std"]: result = agg_df[f"{agg}(B)"].to_numpy() expected = getattr(expected_grouped, agg)().to_numpy() if agg == "std": @@ -843,9 +844,10 @@ def test_groupby_arrow_multi_agg(ray_start_regular_shared, num_parts): Max("A"), Mean("A"), Std("A"), + Quantile("A"), ) ) - for agg in ["sum", "min", "max", "mean", "std"]: + for agg in ["sum", "min", "max", "mean", "quantile", "std"]: result = result_row[f"{agg}(A)"] expected = getattr(df["A"], agg)() if agg == "std": @@ -854,6 +856,65 @@ def test_groupby_arrow_multi_agg(ray_start_regular_shared, num_parts): assert result == expected +@pytest.mark.parametrize("num_parts", [1, 30]) +def test_groupby_arrow_multi_agg_alias(ray_start_regular_shared, num_parts): + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_multi_agg with: {seed}") + random.seed(seed) + xs = list(range(100)) + random.shuffle(xs) + df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs}) + agg_ds = ( + ray.data.from_pandas(df) + .repartition(num_parts) + .groupby("A") + .aggregate( + Sum("B", alias_name="sum_b"), + Min("B", alias_name="min_b"), + Max("B", alias_name="max_b"), + Mean("B", alias_name="mean_b"), + Std("B", alias_name="std_b"), + Quantile("B", alias_name="quantile_b"), + ) + ) + + agg_df = agg_ds.to_pandas() + expected_grouped = df.groupby("A")["B"] + for agg in ["sum", "min", "max", "mean", "quantile", "std"]: + result = agg_df[f"{agg}_b"].to_numpy() + print(agg) + print(result) + expected = getattr(expected_grouped, agg)().to_numpy() + print(expected) + if agg == "std": + np.testing.assert_array_almost_equal(result, expected) + else: + np.testing.assert_array_equal(result, expected) + # Test built-in global std aggregation + df = pd.DataFrame({"A": xs}) + result_row = ( + ray.data.from_pandas(df) + .repartition(num_parts) + .aggregate( + Sum("A", alias_name="sum_b"), + Min("A", alias_name="min_b"), + Max("A", alias_name="max_b"), + Mean("A", alias_name="mean_b"), + Std("A", alias_name="std_b"), + Quantile("A", alias_name="quantile_b"), + ) + ) + for agg in ["sum", "min", "max", "mean", "quantile", "std"]: + result = result_row[f"{agg}_b"] + print(result) + expected = getattr(df["A"], agg)() + print(expected) + if agg == "std": + assert math.isclose(result, expected) + else: + assert result == expected + + def test_groupby_simple(ray_start_regular_shared): seed = int(time.time()) print(f"Seeding RNG for test_groupby_simple with: {seed}") From 1b916067394a3098bc9b83bf9cca011b58205860 Mon Sep 17 00:00:00 2001 From: rongjianmin1 Date: Fri, 14 Apr 2023 14:53:29 +0800 Subject: [PATCH 4/6] Restore Mismodifications --- python/ray/data/__init__.py | 10 +++++----- python/ray/data/_internal/compute.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index 64b48a45c84f..3b72e4084932 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -6,16 +6,16 @@ if sys.version_info >= (3, 7): import pandas # noqa -from python.ray.data._internal.compute import ActorPoolStrategy +from ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.progress_bar import set_progress_bars from ray.data._internal.execution.interfaces import ExecutionOptions, ExecutionResources -from python.ray.data.dataset import Dataset, Datastream -from python.ray.data.context import DatasetContext, DataContext -from python.ray.data.dataset_iterator import DatasetIterator, DataIterator +from ray.data.dataset import Dataset, Datastream +from ray.data.context import DatasetContext, DataContext +from ray.data.dataset_iterator import DatasetIterator, DataIterator from ray.data.dataset_pipeline import DatasetPipeline from ray.data.datasource import Datasource, ReadTask from ray.data.preprocessor import Preprocessor -from python.ray.data.read_api import ( # noqa: F401 +from ray.data.read_api import ( # noqa: F401 from_arrow, from_arrow_refs, from_dask, diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 0eed13ba59ae..745556f4b023 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -19,7 +19,7 @@ CallableClass, RowUDF, ) -from python.ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext +from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext from ray.types import ObjectRef from ray.util.annotations import DeveloperAPI, PublicAPI From a717a957f9fad4af4b30977071641a723d284dfd Mon Sep 17 00:00:00 2001 From: rongjianmin1 Date: Sat, 15 Apr 2023 21:37:20 +0800 Subject: [PATCH 5/6] move alias name variable to one place and update no objection variable names --- python/ray/data/aggregate.py | 87 +++++++++++++++++------------------- 1 file changed, 40 insertions(+), 47 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index fa3fe6183704..ef678d77f026 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -120,13 +120,12 @@ def __init__( alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) - - null_merge = _null_wrap_merge(ignore_nulls, lambda a1, a2: a1 + a2) - if alias_name: - rs_name = alias_name + self._rs_name = alias_name else: - rs_name = f"sum({str(on)})" + self._rs_name = f"sum({str(on)})" + + null_merge = _null_wrap_merge(ignore_nulls, lambda a1, a2: a1 + a2) super().__init__( init=_null_wrap_init(lambda k: 0), @@ -137,7 +136,7 @@ def __init__( null_merge, ), finalize=_null_wrap_finalize(lambda a: a), - name=(rs_name), + name=(self._rs_name), ) @@ -152,13 +151,12 @@ def __init__( alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) - - null_merge = _null_wrap_merge(ignore_nulls, min) - if alias_name: - rs_name = alias_name + self._rs_name = alias_name else: - rs_name = f"min({str(on)})" + self._rs_name = f"min({str(on)})" + + null_merge = _null_wrap_merge(ignore_nulls, min) super().__init__( init=_null_wrap_init(lambda k: float("inf")), @@ -169,7 +167,7 @@ def __init__( null_merge, ), finalize=_null_wrap_finalize(lambda a: a), - name=(rs_name), + name=(self._rs_name), ) @@ -184,13 +182,12 @@ def __init__( alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) - - null_merge = _null_wrap_merge(ignore_nulls, max) - if alias_name: - rs_name = alias_name + self._rs_name = alias_name else: - rs_name = f"max({str(on)})" + self._rs_name = f"max({str(on)})" + + null_merge = _null_wrap_merge(ignore_nulls, max) super().__init__( init=_null_wrap_init(lambda k: float("-inf")), @@ -201,7 +198,7 @@ def __init__( null_merge, ), finalize=_null_wrap_finalize(lambda a: a), - name=(rs_name), + name=(self._rs_name), ) @@ -216,6 +213,10 @@ def __init__( alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) + if alias_name: + self._rs_name = alias_name + else: + self._rs_name = f"mean({str(on)})" null_merge = _null_wrap_merge( ignore_nulls, lambda a1, a2: [a1[0] + a2[0], a1[1] + a2[1]] @@ -233,11 +234,6 @@ def vectorized_mean(block: Block[T]) -> AggType: return None return [sum_, count] - if alias_name: - rs_name = alias_name - else: - rs_name = f"mean({str(on)})" - super().__init__( init=_null_wrap_init(lambda k: [0, 0]), merge=null_merge, @@ -247,7 +243,7 @@ def vectorized_mean(block: Block[T]) -> AggType: null_merge, ), finalize=_null_wrap_finalize(lambda a: a[0] / a[1]), - name=(rs_name), + name=(self._rs_name), ) @@ -272,6 +268,10 @@ def __init__( alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) + if alias_name: + self._rs_name = alias_name + else: + self._rs_name = f"std({str(on)})" def merge(a: List[float], b: List[float]): # Merges two accumulations into one. @@ -314,11 +314,6 @@ def finalize(a: List[float]): return 0.0 return math.sqrt(M2 / (count - ddof)) - if alias_name: - rs_name = alias_name - else: - rs_name = f"std({str(on)})" - super().__init__( init=_null_wrap_init(lambda k: [0, 0, 0]), merge=null_merge, @@ -328,7 +323,7 @@ def finalize(a: List[float]): null_merge, ), finalize=_null_wrap_finalize(finalize), - name=(rs_name), + name=(self._rs_name), ) @@ -344,11 +339,10 @@ def __init__( ): self._set_key_fn(on) on_fn = _to_on_fn(on) - if alias_name: - rs_name = alias_name + self._rs_name = alias_name else: - rs_name = f"abs_max({str(on)})" + self._rs_name = f"abs_max({str(on)})" super().__init__( init=_null_wrap_init(lambda k: 0), @@ -357,7 +351,7 @@ def __init__( ignore_nulls, on_fn, lambda a, r: max(a, abs(r)) ), finalize=_null_wrap_finalize(lambda a: a), - name=(rs_name), + name=(self._rs_name), ) @@ -383,6 +377,10 @@ def __init__( ): self._set_key_fn(on) self._percent = percent + if alias_name: + self._rs_name = alias_name + else: + self._rs_name = f"quantile({str(on)})" def merge(a: List[int], b: List[int]): if isinstance(a, List) and isinstance(b, List): @@ -413,24 +411,19 @@ def block_row_ls(block: Block[T]) -> AggType: ls.append(row.get(on)) return ls - if alias_name: - rs_name = alias_name - else: - rs_name = f"quantile({str(on)})" - import math - def percentile(N, key=lambda x: x): - if not N: + def percentile(input_values, key=lambda x: x): + if not input_values: return None - N = sorted(N) - k = (len(N) - 1) * self._percent + input_values = sorted(input_values) + k = (len(input_values) - 1) * self._percent f = math.floor(k) c = math.ceil(k) if f == c: - return key(N[int(k)]) - d0 = key(N[int(f)]) * (c - k) - d1 = key(N[int(c)]) * (k - f) + return key(input_values[int(k)]) + d0 = key(input_values[int(f)]) * (c - k) + d1 = key(input_values[int(c)]) * (k - f) return round(d0 + d1, 5) super().__init__( @@ -442,5 +435,5 @@ def percentile(N, key=lambda x: x): null_merge, ), finalize=_null_wrap_finalize(percentile), - name=(rs_name), + name=(self._rs_name), ) From 4bc890598d792bdead64bde27fdac9cc01b50c93 Mon Sep 17 00:00:00 2001 From: rongjianmin1 Date: Wed, 19 Apr 2023 14:37:12 +0800 Subject: [PATCH 6/6] update percent parameter to q --- python/ray/data/aggregate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index ef678d77f026..ac827bf762f4 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -371,12 +371,12 @@ class Quantile(_AggregateOnKeyBase): def __init__( self, on: Optional[KeyFn] = None, - percent: float = 0.5, + q: float = 0.5, ignore_nulls: bool = True, alias_name: Optional[KeyFn] = None, ): self._set_key_fn(on) - self._percent = percent + self._q = q if alias_name: self._rs_name = alias_name else: @@ -417,7 +417,7 @@ def percentile(input_values, key=lambda x: x): if not input_values: return None input_values = sorted(input_values) - k = (len(input_values) - 1) * self._percent + k = (len(input_values) - 1) * self._q f = math.floor(k) c = math.ceil(k) if f == c: