Skip to content

Commit

Permalink
Use shuffle when nunique is calculated
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed May 7, 2022
1 parent f833a7b commit 5c67580
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 38 deletions.
62 changes: 52 additions & 10 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ... import opcodes as OperandDef
from ...config import options
from ...core.custom_log import redirect_custom_log
from ...core import ENTITY_TYPE, OutputType
from ...core import ENTITY_TYPE, OutputType, recursive_tile
from ...core.context import get_context
from ...core.operand import OperandStage
from ...serialization.serializables import (
Expand Down Expand Up @@ -64,6 +64,8 @@

_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)

_FUNCS_PREFER_SHUFFLE = {"nunique"}


class SizeRecorder:
def __init__(self):
Expand Down Expand Up @@ -163,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin):
method = StringField("method")
use_inf_as_na = BoolField("use_inf_as_na")

map_on_shuffle = AnyField("map_on_shuffle")

# for chunk
combine_size = Int32Field("combine_size")
chunk_store_limit = Int64Field("chunk_store_limit")
Expand Down Expand Up @@ -421,10 +425,29 @@ def _tile_with_shuffle(
in_df: TileableType,
out_df: TileableType,
func_infos: ReductionSteps,
agg_chunks: List[ChunkType] = None,
):
# First, perform groupby and aggregation on each chunk.
agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)
if op.map_on_shuffle is None:
op.map_on_shuffle = all(
agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs
)

if not op.map_on_shuffle:
groupby_params = op.groupby_params.copy()
selection = groupby_params.pop("selection", None)
groupby = in_df.groupby(**groupby_params)
if selection:
groupby = groupby[selection]
result = groupby.transform(
op.raw_func, _call_agg=True, index=out_df.index_value
)
return (yield from recursive_tile(result))
else:
# First, perform groupby and aggregation on each chunk.
agg_chunks = agg_chunks or cls._gen_map_chunks(
op, in_df.chunks, out_df, func_infos
)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)

@classmethod
def _perform_shuffle(
Expand Down Expand Up @@ -624,8 +647,10 @@ def _tile_auto(
else:
# otherwise, use shuffle
logger.debug("Choose shuffle method for groupby operand %s", op)
return cls._perform_shuffle(
op, chunks + left_chunks, in_df, out_df, func_infos
return (
yield from cls._tile_with_shuffle(
op, in_df, out_df, func_infos, chunks + left_chunks
)
)

@classmethod
Expand All @@ -638,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"):
func_infos = cls._compile_funcs(op, in_df)

if op.method == "auto":
if len(in_df.chunks) <= op.combine_size:
if set(op.func) & _FUNCS_PREFER_SHUFFLE:
return (
yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)
)
elif len(in_df.chunks) <= op.combine_size:
return cls._tile_with_tree(op, in_df, out_df, func_infos)
else:
return (yield from cls._tile_auto(op, in_df, out_df, func_infos))
if op.method == "shuffle":
return cls._tile_with_shuffle(op, in_df, out_df, func_infos)
return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos))
elif op.method == "tree":
return cls._tile_with_tree(op, in_df, out_df, func_infos)
else: # pragma: no cover
Expand Down Expand Up @@ -1075,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"):
pd.reset_option("mode.use_inf_as_na")


def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
def agg(
groupby,
func=None,
method="auto",
combine_size=None,
map_on_shuffle=None,
*args,
**kwargs,
):
"""
Aggregate using one or more operations on grouped data.
Expand All @@ -1091,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
in distributed mode and use 'tree' in local mode.
combine_size : int
The number of chunks to combine when method is 'tree'
map_on_shuffle : bool
When not specified, will decide whether to perform aggregation on the
map stage of shuffle (currently no aggregation when there is custom
reduction in functions). Otherwise, whether to call map on map stage
of shuffle is determined by the value.
Returns
-------
Expand Down Expand Up @@ -1138,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
combine_size=combine_size or options.combine_size,
chunk_store_limit=options.chunk_store_limit,
use_inf_as_na=use_inf_as_na,
map_on_shuffle=map_on_shuffle,
)
return agg_op(groupby)
21 changes: 0 additions & 21 deletions mars/dataframe/groupby/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,24 +476,3 @@ def test_groupby_fill():
assert len(r.chunks) == 4
assert r.shape == (len(s1),)
assert r.chunks[0].shape == (np.nan,)


def test_groupby_nunique():
df1 = pd.DataFrame(
[
[1, 1, 10],
[1, 1, np.nan],
[1, 1, np.nan],
[1, 2, np.nan],
[1, 2, 20],
[1, 2, np.nan],
[1, 3, np.nan],
[1, 3, np.nan],
],
columns=["one", "two", "three"],
)
mdf = md.DataFrame(df1, chunk_size=3)

r = tile(mdf.groupby(["one", "two"]).nunique())
assert len(r.chunks) == 1
assert isinstance(r.chunks[0].op, DataFrameGroupByAgg)
5 changes: 4 additions & 1 deletion mars/dataframe/merge/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ def _auto_concat_dataframe_chunks(chunk, inputs):
)

if chunk.op.axis is not None:
return xdf.concat(inputs, axis=op.axis)
try:
return xdf.concat(inputs, axis=op.axis)
except:
raise

# auto generated concat when executing a DataFrame
if len(inputs) == 1:
Expand Down
20 changes: 14 additions & 6 deletions mars/dataframe/reduction/nunique.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ...config import options
from ...serialization.serializables import BoolField
from ...utils import lazy_import
from ..arrays import ArrowListArray, ArrowListDtype
from ..arrays import ArrowListArray
from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction

cp = lazy_import("cupy", globals=globals(), rename="cp")
Expand All @@ -52,18 +52,24 @@ def _get_modules(self):

def _drop_duplicates(self, value, explode=False, agg=False):
xp, xdf = self._get_modules()
use_arrow_dtype = self._use_arrow_dtype and xp is not cp
if self._use_arrow_dtype and xp is not cp and hasattr(value, "to_numpy"):
value = value.to_numpy()
else:
value = value.values

if explode:
if len(value) == 0:
if not use_arrow_dtype:
return [xp.array([], dtype=object)]
else:
return [ArrowListArray([])]
value = xp.concatenate(value)

value = xdf.unique(value)

if not agg:
if not self._use_arrow_dtype or xp is cp:
if not use_arrow_dtype:
return [value]
else:
try:
Expand All @@ -78,15 +84,16 @@ def _drop_duplicates(self, value, explode=False, agg=False):

def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
xp, xdf = self._get_modules()
out_dtype = object if not self._use_arrow_dtype or xp is cp else None
if isinstance(in_data, xdf.Series):
unique_values = self._drop_duplicates(in_data)
return xdf.Series(unique_values, name=in_data.name, dtype=object)
return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype)
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
data[d] = self._drop_duplicates(v)
df = xdf.DataFrame(data, copy=False, dtype=object)
df = xdf.DataFrame(data, copy=False, dtype=out_dtype)
else:
df = xdf.DataFrame(columns=[0])
for d, v in in_data.iterrows():
Expand All @@ -95,15 +102,16 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ

def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
xp, xdf = self._get_modules()
out_dtype = object if not self._use_arrow_dtype or xp is cp else None
if isinstance(in_data, xdf.Series):
unique_values = self._drop_duplicates(in_data, explode=True)
return xdf.Series(unique_values, name=in_data.name, dtype=object)
return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype)
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
data[d] = self._drop_duplicates(v, explode=True)
df = xdf.DataFrame(data, copy=False, dtype=object)
df = xdf.DataFrame(data, copy=False, dtype=out_dtype)
else:
df = xdf.DataFrame(columns=[0])
for d, v in in_data.iterrows():
Expand Down

0 comments on commit 5c67580

Please sign in to comment.