diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index f5258e6cab8..54f8958c9eb 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -1,5 +1,6 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. +from functools import wraps from typing import Set import numpy as np @@ -16,12 +17,8 @@ import cudf from cudf.utils.utils import _dask_cudf_nvtx_annotate -CUMULATIVE_AGGS = ( - "cumsum", - "cumcount", -) - -AGGS = ( +# aggregations that are dask-cudf optimized +OPTIMIZED_AGGS = ( "count", "mean", "std", @@ -34,19 +31,18 @@ "last", ) -SUPPORTED_AGGS = (*AGGS, *CUMULATIVE_AGGS) - -def _check_groupby_supported(func): +def _check_groupby_optimized(func): """ Decorator for dask-cudf's groupby methods that returns the dask-cudf - method if the groupby object is supported, otherwise reverting to the - upstream Dask method + optimized method if the groupby object is supported, otherwise + reverting to the upstream Dask method """ + @wraps(func) def wrapper(*args, **kwargs): gb = args[0] - if _groupby_supported(gb): + if _groupby_optimized(gb): return func(*args, **kwargs) # note that we use upstream Dask's default kwargs for this call if # none are specified; this shouldn't be an issue as those defaults are @@ -94,7 +90,7 @@ def _make_groupby_method_aggs(self, agg_name): return {c: agg_name for c in self.obj.columns if c != self.by} @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def count(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -109,7 +105,7 @@ def count(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -124,7 +120,7 @@ def mean(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -139,7 +135,7 @@ def std(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -154,7 +150,7 @@ def var(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def sum(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -169,7 +165,7 @@ def sum(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def min(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -184,7 +180,7 @@ def min(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def max(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -199,7 +195,7 @@ def max(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -214,7 +210,7 @@ def collect(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def first(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -229,7 +225,7 @@ def first(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def last(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -250,7 +246,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): arg = _redirect_aggs(arg) - if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): + if _groupby_optimized(self) and _aggs_optimized(arg, OPTIMIZED_AGGS): if isinstance(self._meta.grouping.keys, cudf.MultiIndex): keys = self._meta.grouping.keys.names else: @@ -287,7 +283,7 @@ def __init__(self, *args, sort=None, **kwargs): super().__init__(*args, sort=sort, **kwargs) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def count(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -302,7 +298,7 @@ def count(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -317,7 +313,7 @@ def mean(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -332,7 +328,7 @@ def std(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -347,7 +343,7 @@ def var(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def sum(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -362,7 +358,7 @@ def sum(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def min(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -377,7 +373,7 @@ def min(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def max(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -392,7 +388,7 @@ def max(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -407,7 +403,7 @@ def collect(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def first(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -422,7 +418,7 @@ def first(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def last(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -446,7 +442,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if not isinstance(arg, dict): arg = {self._slice: arg} - if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): + if _groupby_optimized(self) and _aggs_optimized(arg, OPTIMIZED_AGGS): return groupby_agg( self.obj, self.by, @@ -569,9 +565,9 @@ def groupby_agg( """ # Assert that aggregations are supported aggs = _redirect_aggs(aggs_in) - if not _aggs_supported(aggs, SUPPORTED_AGGS): + if not _aggs_optimized(aggs, OPTIMIZED_AGGS): raise ValueError( - f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. " + f"Supported aggs include {OPTIMIZED_AGGS} for groupby_agg API. " f"Aggregations must be specified with dict or list syntax." ) @@ -735,7 +731,7 @@ def _redirect_aggs(arg): @_dask_cudf_nvtx_annotate -def _aggs_supported(arg, supported: set): +def _aggs_optimized(arg, supported: set): """Check that aggregations in `arg` are a subset of `supported`""" if isinstance(arg, (list, dict)): if isinstance(arg, dict): @@ -757,8 +753,8 @@ def _aggs_supported(arg, supported: set): @_dask_cudf_nvtx_annotate -def _groupby_supported(gb): - """Check that groupby input is supported by dask-cudf""" +def _groupby_optimized(gb): + """Check that groupby input can use dask-cudf optimized codepath""" return isinstance(gb.obj, DaskDataFrame) and ( isinstance(gb.by, str) or (isinstance(gb.by, list) and all(isinstance(x, str) for x in gb.by)) @@ -830,7 +826,7 @@ def _tree_node_agg(df, gb_cols, dropna, sort, sep): agg = col.split(sep)[-1] if agg in ("count", "sum"): agg_dict[col] = ["sum"] - elif agg in SUPPORTED_AGGS: + elif agg in OPTIMIZED_AGGS: agg_dict[col] = [agg] else: raise ValueError(f"Unexpected aggregation: {agg}") diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index f2047c34684..e43fead0b63 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -6,16 +6,28 @@ import dask from dask import dataframe as dd +from dask.utils_test import hlg_layer import cudf from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported +from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized -@pytest.fixture -def pdf(): +def assert_cudf_groupby_layers(ddf): + for prefix in ("cudf-aggregate-chunk", "cudf-aggregate-agg"): + try: + hlg_layer(ddf.dask, prefix) + except KeyError: + raise AssertionError( + "Expected Dask dataframe to contain groupby layer with " + f"prefix {prefix}" + ) + + +@pytest.fixture(params=["non_null", "null"]) +def pdf(request): np.random.seed(0) # note that column name "x" is a substring of the groupby key; @@ -27,13 +39,17 @@ def pdf(): "y": np.random.normal(size=10000), } ) + + # insert nulls into dataframe at random + if request.param == "null": + pdf = pdf.mask(np.random.choice([True, False], size=pdf.shape)) + return pdf -@pytest.mark.parametrize("aggregation", AGGS) +@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): - gdf = cudf.DataFrame.from_pandas(pdf) gdf_grouped = gdf.groupby("xx") ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx") @@ -42,30 +58,38 @@ def test_groupby_basic(series, aggregation, pdf): gdf_grouped = gdf_grouped.xx ddf_grouped = ddf_grouped.xx - a = getattr(gdf_grouped, aggregation)() - b = getattr(ddf_grouped, aggregation)().compute() + check_dtype = aggregation != "count" - if aggregation == "count": - dd.assert_eq(a, b, check_dtype=False) - else: - dd.assert_eq(a, b) + expect = getattr(gdf_grouped, aggregation)() + actual = getattr(ddf_grouped, aggregation)() - a = gdf_grouped.agg({"xx": aggregation}) - b = ddf_grouped.agg({"xx": aggregation}).compute() + assert_cudf_groupby_layers(actual) - if aggregation == "count": - dd.assert_eq(a, b, check_dtype=False) - else: - dd.assert_eq(a, b) + dd.assert_eq(expect, actual, check_dtype=check_dtype) + + expect = gdf_grouped.agg({"xx": aggregation}) + actual = ddf_grouped.agg({"xx": aggregation}) + + assert_cudf_groupby_layers(actual) + + dd.assert_eq(expect, actual, check_dtype=check_dtype) +# TODO: explore adding support with `.agg()` @pytest.mark.parametrize("series", [True, False]) -@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS) +@pytest.mark.parametrize("aggregation", ["cumsum", "cumcount"]) def test_groupby_cumulative(aggregation, pdf, series): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) - gdf_grouped = gdf.groupby("xx") + if pdf.isna().sum().any(): + with pytest.xfail( + reason="https://github.com/rapidsai/cudf/issues/12055" + ): + gdf_grouped = gdf.groupby("xx") + else: + gdf_grouped = gdf.groupby("xx") + ddf_grouped = ddf.groupby("xx") if series: @@ -73,7 +97,7 @@ def test_groupby_cumulative(aggregation, pdf, series): ddf_grouped = ddf_grouped.xx a = getattr(gdf_grouped, aggregation)() - b = getattr(ddf_grouped, aggregation)().compute() + b = getattr(ddf_grouped, aggregation)() if aggregation == "cumsum" and series: with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"): @@ -82,37 +106,31 @@ def test_groupby_cumulative(aggregation, pdf, series): dd.assert_eq(a, b) +@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) @pytest.mark.parametrize( "func", [ - lambda df: df.groupby("x").agg({"y": "max"}), - lambda df: df.groupby("x").agg(["sum", "max"]), - lambda df: df.groupby("x").y.agg(["sum", "max"]), - lambda df: df.groupby("x").agg("sum"), - lambda df: df.groupby("x").y.agg("sum"), + lambda df, agg: df.groupby("xx").agg({"y": agg}), + lambda df, agg: df.groupby("xx").y.agg({"y": agg}), + lambda df, agg: df.groupby("xx").agg([agg]), + lambda df, agg: df.groupby("xx").y.agg([agg]), + lambda df, agg: df.groupby("xx").agg(agg), + lambda df, agg: df.groupby("xx").y.agg(agg), ], ) -def test_groupby_agg(func): - pdf = pd.DataFrame( - { - "x": np.random.randint(0, 5, size=10000), - "y": np.random.normal(size=10000), - } - ) - +def test_groupby_agg(func, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) - a = func(gdf).to_pandas() - b = func(ddf).compute().to_pandas() + actual = func(ddf, aggregation) + expect = func(gdf, aggregation) - a.index.name = None - a.name = None - b.index.name = None - b.name = None + check_dtype = aggregation != "count" - dd.assert_eq(a, b) + assert_cudf_groupby_layers(actual) + + dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype) @pytest.mark.parametrize("split_out", [1, 3]) @@ -136,28 +154,6 @@ def test_groupby_agg_empty_partition(tmpdir, split_out): dd.assert_eq(gb.compute().sort_index(), expect) -@pytest.mark.parametrize( - "func", - [lambda df: df.groupby("x").std(), lambda df: df.groupby("x").y.std()], -) -def test_groupby_std(func): - pdf = pd.DataFrame( - { - "x": np.random.randint(0, 5, size=10000), - "y": np.random.normal(size=10000), - } - ) - - gdf = cudf.DataFrame.from_pandas(pdf) - - ddf = dask_cudf.from_cudf(gdf, npartitions=5) - - a = func(gdf).to_pandas() - b = func(ddf).compute().to_pandas() - - dd.assert_eq(a, b) - - # reason gotattr in cudf @pytest.mark.parametrize( "func", @@ -710,7 +706,7 @@ def test_groupby_agg_redirect(aggregations): ], ) def test_is_supported(arg, supported): - assert _aggs_supported(arg, AGGS) is supported + assert _aggs_optimized(arg, OPTIMIZED_AGGS) is supported def test_groupby_unique_lists(): @@ -746,22 +742,20 @@ def test_groupby_first_last(data, agg): gddf = dask_cudf.from_cudf(gdf, npartitions=2) dd.assert_eq( - ddf.groupby("a").agg(agg).compute(), - gddf.groupby("a").agg(agg).compute(), + ddf.groupby("a").agg(agg), + gddf.groupby("a").agg(agg), ) dd.assert_eq( - getattr(ddf.groupby("a"), agg)().compute(), - getattr(gddf.groupby("a"), agg)().compute(), + getattr(ddf.groupby("a"), agg)(), + getattr(gddf.groupby("a"), agg)(), ) - dd.assert_eq( - gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg).compute() - ) + dd.assert_eq(gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg)) dd.assert_eq( getattr(gdf.groupby("a"), agg)(), - getattr(gddf.groupby("a"), agg)().compute(), + getattr(gddf.groupby("a"), agg)(), )