diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 7039e517b8..e173929ba2 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -701,34 +701,36 @@ def _do_custom_agg(op, custom_reduction, *input_objs): out = op.outputs[0] for group_key in input_objs[0].groups.keys(): group_objs = [o.get_group(group_key) for o in input_objs] + agg_done = False if op.stage == OperandStage.map: - result = custom_reduction.pre(group_objs[0]) + res_tuple = custom_reduction.pre(group_objs[0]) agg_done = custom_reduction.pre_with_agg - if not isinstance(result, tuple): - result = (result,) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) else: - result = group_objs + res_tuple = group_objs if not agg_done: - result = custom_reduction.agg(*result) - if not isinstance(result, tuple): - result = (result,) + res_tuple = custom_reduction.agg(*res_tuple) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) if op.stage == OperandStage.agg: - result = custom_reduction.post(*result) - if not isinstance(result, tuple): - result = (result,) - - if out.ndim == 2: - if result[0].ndim == 1: - result = tuple(r.to_frame().T for r in result) - if op.stage == OperandStage.agg: - result = tuple(r.astype(out.dtypes) for r in result) - else: - result = tuple(xdf.Series(r) for r in result) + res_tuple = custom_reduction.post(*res_tuple) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) + + new_res_list = [] + for r in res_tuple: + if out.ndim == 2 and r.ndim == 1: + r = r.to_frame().T + elif out.ndim < 2: + if getattr(r, "ndim", 0) == 2: + r = r.iloc[0, :] + else: + r = xdf.Series(r) - for r in result: if len(input_objs[0].grouper.names) == 1: r.index = xdf.Index( [group_key], name=input_objs[0].grouper.names[0] @@ -737,7 +739,21 @@ def _do_custom_agg(op, custom_reduction, *input_objs): r.index = xdf.MultiIndex.from_tuples( [group_key], names=input_objs[0].grouper.names ) - results.append(result) + + if op.groupby_params.get("selection"): + # correct columns for groupby-selection-agg paradigms + selection = op.groupby_params["selection"] + r.columns = [selection] if input_objs[0].ndim == 1 else selection + + if out.ndim == 2 and op.stage == OperandStage.agg: + dtype_cols = set(out.dtypes.index) & set(r.columns) + conv_dtypes = { + k: v for k, v in out.dtypes.items() if k in dtype_cols + } + r = r.astype(conv_dtypes) + new_res_list.append(r) + + results.append(tuple(new_res_list)) if not results and op.stage == OperandStage.agg: empty_df = pd.DataFrame( [], columns=out.dtypes.index, index=out.index_value.to_pandas()[:0] diff --git a/mars/dataframe/groupby/tests/test_groupby.py b/mars/dataframe/groupby/tests/test_groupby.py index 336564d4ae..83960da53f 100644 --- a/mars/dataframe/groupby/tests/test_groupby.py +++ b/mars/dataframe/groupby/tests/test_groupby.py @@ -396,28 +396,28 @@ def test_groupby_fill(): ) mdf = md.DataFrame(df1, chunk_size=3) - r = tile(getattr(mdf.groupby(["one", "two"]), "ffill")()) + r = tile(mdf.groupby(["one", "two"]).ffill()) assert r.op.output_types[0] == OutputType.dataframe assert r.shape == (len(df1), 1) assert len(r.chunks) == 3 assert r.chunks[0].shape == (np.nan, 1) assert r.dtypes.index.tolist() == ["three"] - r = tile(getattr(mdf.groupby(["two"]), "bfill")()) + r = tile(mdf.groupby(["two"]).bfill()) assert r.op.output_types[0] == OutputType.dataframe assert r.shape == (len(df1), 2) assert len(r.chunks) == 3 assert r.chunks[0].shape == (np.nan, 2) assert r.dtypes.index.tolist() == ["one", "three"] - r = tile(getattr(mdf.groupby(["two"]), "backfill")()) + r = tile(mdf.groupby(["two"]).backfill()) assert r.op.output_types[0] == OutputType.dataframe assert r.shape == (len(df1), 2) assert len(r.chunks) == 3 assert r.chunks[0].shape == (np.nan, 2) assert r.dtypes.index.tolist() == ["one", "three"] - r = tile(getattr(mdf.groupby(["one"]), "fillna")(5)) + r = tile(mdf.groupby(["one"]).fillna(5)) assert r.op.output_types[0] == OutputType.dataframe assert r.shape == (len(df1), 2) assert len(r.chunks) == 3 @@ -426,25 +426,25 @@ def test_groupby_fill(): s1 = pd.Series([4, 3, 9, np.nan, np.nan, 7, 10, 8, 1, 6]) ms1 = md.Series(s1, chunk_size=3) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "ffill")()) + r = tile(ms1.groupby(lambda x: x % 2).ffill()) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "bfill")()) + r = tile(ms1.groupby(lambda x: x % 2).bfill()) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "backfill")()) + r = tile(ms1.groupby(lambda x: x % 2).backfill()) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "fillna")(5)) + r = tile(ms1.groupby(lambda x: x % 2).fillna(5)) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) @@ -453,26 +453,47 @@ def test_groupby_fill(): s1 = pd.Series([4, 3, 9, np.nan, np.nan, 7, 10, 8, 1, 6]) ms1 = md.Series(s1, chunk_size=3) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "ffill")()) + r = tile(ms1.groupby(lambda x: x % 2).ffill()) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "bfill")()) + r = tile(ms1.groupby(lambda x: x % 2).bfill()) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "backfill")()) + r = tile(ms1.groupby(lambda x: x % 2).backfill()) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - r = tile(getattr(ms1.groupby(lambda x: x % 2), "fillna")(5)) + r = tile(ms1.groupby(lambda x: x % 2).fillna(5)) assert r.op.output_types[0] == OutputType.series assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) + + +def test_groupby_nunique(): + df1 = pd.DataFrame( + [ + [1, 1, 10], + [1, 1, np.nan], + [1, 1, np.nan], + [1, 2, np.nan], + [1, 2, 20], + [1, 2, np.nan], + [1, 3, np.nan], + [1, 3, np.nan], + ], + columns=["one", "two", "three"], + ) + mdf = md.DataFrame(df1, chunk_size=3) + + r = tile(mdf.groupby(["one", "two"]).nunique()) + assert len(r.chunks) == 1 + assert isinstance(r.chunks[0].op, DataFrameGroupByAgg) diff --git a/mars/dataframe/groupby/tests/test_groupby_execution.py b/mars/dataframe/groupby/tests/test_groupby_execution.py index c1208e2194..2cace472b9 100644 --- a/mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1241,13 +1241,16 @@ def test_groupby_nunique(setup): # test with as_index=False mdf = md.DataFrame(df1, chunk_size=13) if _agg_size_as_frame: + res = mdf.groupby("b", as_index=False)["a"].nunique().execute().fetch() + expected = df1.groupby("b", as_index=False)["a"].nunique() pd.testing.assert_frame_equal( - mdf.groupby("b", as_index=False)["a"] - .nunique() - .execute() - .fetch() - .sort_values(by="b", ignore_index=True), - df1.groupby("b", as_index=False)["a"] - .nunique() - .sort_values(by="b", ignore_index=True), + res.sort_values(by="b", ignore_index=True), + expected.sort_values(by="b", ignore_index=True), + ) + + res = mdf.groupby("b", as_index=False)[["a", "c"]].nunique().execute().fetch() + expected = df1.groupby("b", as_index=False)[["a", "c"]].nunique() + pd.testing.assert_frame_equal( + res.sort_values(by="b", ignore_index=True), + expected.sort_values(by="b", ignore_index=True), ) diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 07d6abf243..94f0d7d0bc 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -41,61 +41,53 @@ def __init__( self._dropna = dropna self._use_arrow_dtype = use_arrow_dtype - @staticmethod - def _drop_duplicates_to_arrow(v, explode=False): + def _drop_duplicates(self, xdf, value, explode=False): if explode: - v = v.explode() - try: - return ArrowListArray([v.drop_duplicates().to_numpy()]) - except pa.ArrowInvalid: - # fallback due to diverse dtypes - return [v.drop_duplicates().to_list()] + value = value.explode() + + if not self._use_arrow_dtype or xdf is cudf: + return [value.drop_duplicates().to_list()] + else: + try: + return ArrowListArray([value.drop_duplicates().to_numpy()]) + except pa.ArrowInvalid: + # fallback due to diverse dtypes + return [value.drop_duplicates().to_list()] def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xdf = cudf if self.is_gpu() else pd if isinstance(in_data, xdf.Series): - unique_values = in_data.drop_duplicates() + unique_values = self._drop_duplicates(xdf, in_data) return xdf.Series(unique_values, name=in_data.name) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if not self._use_arrow_dtype or xdf is cudf: - data[d] = [v.drop_duplicates().to_list()] - else: - data[d] = self._drop_duplicates_to_arrow(v) + data[d] = self._drop_duplicates(xdf, v) df = xdf.DataFrame(data) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - if not self._use_arrow_dtype or xdf is cudf: - df.loc[d] = [v.drop_duplicates().to_list()] - else: - df.loc[d] = self._drop_duplicates_to_arrow(v) + df.loc[d] = self._drop_duplicates(xdf, v) return df def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xdf = cudf if self.is_gpu() else pd if isinstance(in_data, xdf.Series): - unique_values = in_data.explode().drop_duplicates() + unique_values = self._drop_duplicates(xdf, in_data, explode=True) return xdf.Series(unique_values, name=in_data.name) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if not self._use_arrow_dtype or xdf is cudf: - data[d] = [v.explode().drop_duplicates().to_list()] - else: + if self._use_arrow_dtype and xdf is not cudf: v = pd.Series(v.to_numpy()) - data[d] = self._drop_duplicates_to_arrow(v, explode=True) + data[d] = self._drop_duplicates(xdf, v, explode=True) df = xdf.DataFrame(data) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - if not self._use_arrow_dtype or xdf is cudf: - df.loc[d] = [v.explode().drop_duplicates().to_list()] - else: - df.loc[d] = self._drop_duplicates_to_arrow(v, explode=True) + df.loc[d] = self._drop_duplicates(xdf, v, explode=True) return df def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ diff --git a/mars/dataframe/reduction/tests/test_reduction_execution.py b/mars/dataframe/reduction/tests/test_reduction_execution.py index b976475dad..be6ca29815 100644 --- a/mars/dataframe/reduction/tests/test_reduction_execution.py +++ b/mars/dataframe/reduction/tests/test_reduction_execution.py @@ -679,7 +679,7 @@ def test_nunique(setup, check_ref_counts): @pytest.mark.skipif(pa is None, reason="pyarrow not installed") -def test_use_arrow_dtype_n_unique(setup, check_ref_counts): +def test_use_arrow_dtype_nunique(setup, check_ref_counts): with option_context({"dataframe.use_arrow_dtype": True, "combine_size": 2}): rs = np.random.RandomState(0) data1 = pd.DataFrame( diff --git a/mars/oscar/backends/message.pyx b/mars/oscar/backends/message.pyx index 99f5e0d305..ab68fe3328 100644 --- a/mars/oscar/backends/message.pyx +++ b/mars/oscar/backends/message.pyx @@ -556,7 +556,6 @@ cpdef reset_random_seed(): global _rnd_is_seed_set seed_bytes = getrandbits(64).to_bytes(8, "little") - # memcpy(&seed, seed_bytes, 8) _rnd_gen.seed((seed_bytes)[0]) _rnd_is_seed_set = True