Skip to content

Commit

Permalink
PERF/BUG: use masked algo in groupby cummin and cummax (pandas-dev#40651
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mzeitlin11 authored and yeshsurya committed May 6, 2021
1 parent a37e5d2 commit de19779
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 37 deletions.
28 changes: 28 additions & 0 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,34 @@ def time_frame_agg(self, dtype, method):
self.df.groupby("key").agg(method)


class CumminMax:
param_names = ["dtype", "method"]
params = [
["float64", "int64", "Float64", "Int64"],
["cummin", "cummax"],
]

def setup(self, dtype, method):
N = 500_000
vals = np.random.randint(-10, 10, (N, 5))
null_vals = vals.astype(float, copy=True)
null_vals[::2, :] = np.nan
null_vals[::3, :] = np.nan
df = DataFrame(vals, columns=list("abcde"), dtype=dtype)
null_df = DataFrame(null_vals, columns=list("abcde"), dtype=dtype)
keys = np.random.randint(0, 100, size=N)
df["key"] = keys
null_df["key"] = keys
self.df = df
self.null_df = null_df

def time_frame_transform(self, dtype, method):
self.df.groupby("key").transform(method)

def time_frame_transform_many_nulls(self, dtype, method):
self.null_df.groupby("key").transform(method)


class RankWithTies:
# GH 21237
param_names = ["dtype", "tie_method"]
Expand Down
3 changes: 3 additions & 0 deletions doc/source/whatsnew/v1.3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ Performance improvements
- Performance improvement in :meth:`core.window.ewm.ExponentialMovingWindow.mean` with ``times`` (:issue:`39784`)
- Performance improvement in :meth:`.GroupBy.apply` when requiring the python fallback implementation (:issue:`40176`)
- Performance improvement for concatenation of data with type :class:`CategoricalDtype` (:issue:`40193`)
- Performance improvement in :meth:`.GroupBy.cummin` and :meth:`.GroupBy.cummax` with nullable data types (:issue:`37493`)
-

.. ---------------------------------------------------------------------------
Expand Down Expand Up @@ -839,6 +841,7 @@ Groupby/resample/rolling
- Bug in :meth:`.GroupBy.any` and :meth:`.GroupBy.all` raising ``ValueError`` when using with nullable type columns holding ``NA`` even with ``skipna=True`` (:issue:`40585`)
- Bug in :meth:`GroupBy.cummin` and :meth:`GroupBy.cummax` incorrectly rounding integer values near the ``int64`` implementations bounds (:issue:`40767`)
- Bug in :meth:`.GroupBy.rank` with nullable dtypes incorrectly raising ``TypeError`` (:issue:`41010`)
- Bug in :meth:`.GroupBy.cummin` and :meth:`.GroupBy.cummax` computing wrong result with nullable data types too large to roundtrip when casting to float (:issue:`37493`)

Reshaping
^^^^^^^^^
Expand Down
61 changes: 52 additions & 9 deletions pandas/_libs/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ def group_min(groupby_t[:, ::1] out,
@cython.wraparound(False)
cdef group_cummin_max(groupby_t[:, ::1] out,
ndarray[groupby_t, ndim=2] values,
uint8_t[:, ::1] mask,
const intp_t[:] labels,
int ngroups,
bint is_datetimelike,
Expand All @@ -1290,6 +1291,9 @@ cdef group_cummin_max(groupby_t[:, ::1] out,
Array to store cummin/max in.
values : np.ndarray[groupby_t, ndim=2]
Values to take cummin/max of.
mask : np.ndarray[bool] or None
If not None, indices represent missing values,
otherwise the mask will not be used
labels : np.ndarray[np.intp]
Labels to group by.
ngroups : int
Expand All @@ -1307,11 +1311,14 @@ cdef group_cummin_max(groupby_t[:, ::1] out,
cdef:
Py_ssize_t i, j, N, K, size
groupby_t val, mval
ndarray[groupby_t, ndim=2] accum
groupby_t[:, ::1] accum
intp_t lab
bint val_is_nan, use_mask

use_mask = mask is not None

N, K = (<object>values).shape
accum = np.empty((ngroups, K), dtype=np.asarray(values).dtype)
accum = np.empty((ngroups, K), dtype=values.dtype)
if groupby_t is int64_t:
accum[:] = -_int64_max if compute_max else _int64_max
elif groupby_t is uint64_t:
Expand All @@ -1326,11 +1333,29 @@ cdef group_cummin_max(groupby_t[:, ::1] out,
if lab < 0:
continue
for j in range(K):
val = values[i, j]
val_is_nan = False

if use_mask:
if mask[i, j]:

# `out` does not need to be set since it
# will be masked anyway
val_is_nan = True
else:

# If using the mask, we can avoid grabbing the
# value unless necessary
val = values[i, j]

if _treat_as_na(val, is_datetimelike):
out[i, j] = val
# Otherwise, `out` must be set accordingly if the
# value is missing
else:
val = values[i, j]
if _treat_as_na(val, is_datetimelike):
val_is_nan = True
out[i, j] = val

if not val_is_nan:
mval = accum[lab, j]
if compute_max:
if val > mval:
Expand All @@ -1347,9 +1372,18 @@ def group_cummin(groupby_t[:, ::1] out,
ndarray[groupby_t, ndim=2] values,
const intp_t[:] labels,
int ngroups,
bint is_datetimelike) -> None:
bint is_datetimelike,
uint8_t[:, ::1] mask=None) -> None:
"""See group_cummin_max.__doc__"""
group_cummin_max(out, values, labels, ngroups, is_datetimelike, compute_max=False)
group_cummin_max(
out,
values,
mask,
labels,
ngroups,
is_datetimelike,
compute_max=False
)


@cython.boundscheck(False)
Expand All @@ -1358,6 +1392,15 @@ def group_cummax(groupby_t[:, ::1] out,
ndarray[groupby_t, ndim=2] values,
const intp_t[:] labels,
int ngroups,
bint is_datetimelike) -> None:
bint is_datetimelike,
uint8_t[:, ::1] mask=None) -> None:
"""See group_cummin_max.__doc__"""
group_cummin_max(out, values, labels, ngroups, is_datetimelike, compute_max=True)
group_cummin_max(
out,
values,
mask,
labels,
ngroups,
is_datetimelike,
compute_max=True
)
76 changes: 71 additions & 5 deletions pandas/core/groupby/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
)

from pandas.core.arrays import ExtensionArray
from pandas.core.arrays.masked import (
BaseMaskedArray,
BaseMaskedDtype,
)
import pandas.core.common as com
from pandas.core.frame import DataFrame
from pandas.core.generic import NDFrame
Expand Down Expand Up @@ -124,6 +128,8 @@ def __init__(self, kind: str, how: str):
},
}

_MASKED_CYTHON_FUNCTIONS = {"cummin", "cummax"}

_cython_arity = {"ohlc": 4} # OHLC

# Note: we make this a classmethod and pass kind+how so that caching
Expand Down Expand Up @@ -256,6 +262,9 @@ def get_out_dtype(self, dtype: np.dtype) -> np.dtype:
out_dtype = "object"
return np.dtype(out_dtype)

def uses_mask(self) -> bool:
return self.how in self._MASKED_CYTHON_FUNCTIONS


class BaseGrouper:
"""
Expand Down Expand Up @@ -619,9 +628,45 @@ def _ea_wrap_cython_operation(
f"function is not implemented for this dtype: {values.dtype}"
)

@final
def _masked_ea_wrap_cython_operation(
self,
kind: str,
values: BaseMaskedArray,
how: str,
axis: int,
min_count: int = -1,
**kwargs,
) -> BaseMaskedArray:
"""
Equivalent of `_ea_wrap_cython_operation`, but optimized for masked EA's
and cython algorithms which accept a mask.
"""
orig_values = values

# Copy to ensure input and result masks don't end up shared
mask = values._mask.copy()
arr = values._data

res_values = self._cython_operation(
kind, arr, how, axis, min_count, mask=mask, **kwargs
)
dtype = maybe_cast_result_dtype(orig_values.dtype, how)
assert isinstance(dtype, BaseMaskedDtype)
cls = dtype.construct_array_type()

return cls(res_values.astype(dtype.type, copy=False), mask)

@final
def _cython_operation(
self, kind: str, values, how: str, axis: int, min_count: int = -1, **kwargs
self,
kind: str,
values,
how: str,
axis: int,
min_count: int = -1,
mask: np.ndarray | None = None,
**kwargs,
) -> ArrayLike:
"""
Returns the values of a cython operation.
Expand All @@ -645,10 +690,16 @@ def _cython_operation(
# if not raise NotImplementedError
cy_op.disallow_invalid_ops(dtype, is_numeric)

func_uses_mask = cy_op.uses_mask()
if is_extension_array_dtype(dtype):
return self._ea_wrap_cython_operation(
kind, values, how, axis, min_count, **kwargs
)
if isinstance(values, BaseMaskedArray) and func_uses_mask:
return self._masked_ea_wrap_cython_operation(
kind, values, how, axis, min_count, **kwargs
)
else:
return self._ea_wrap_cython_operation(
kind, values, how, axis, min_count, **kwargs
)

elif values.ndim == 1:
# expand to 2d, dispatch, then squeeze if appropriate
Expand All @@ -659,6 +710,7 @@ def _cython_operation(
how=how,
axis=1,
min_count=min_count,
mask=mask,
**kwargs,
)
if res.shape[0] == 1:
Expand Down Expand Up @@ -688,6 +740,9 @@ def _cython_operation(
assert axis == 1
values = values.T

if mask is not None:
mask = mask.reshape(values.shape, order="C")

out_shape = cy_op.get_output_shape(ngroups, values)
func, values = cy_op.get_cython_func_and_vals(values, is_numeric)
out_dtype = cy_op.get_out_dtype(values.dtype)
Expand All @@ -708,7 +763,18 @@ def _cython_operation(
func(result, counts, values, comp_ids, min_count)
elif kind == "transform":
# TODO: min_count
func(result, values, comp_ids, ngroups, is_datetimelike, **kwargs)
if func_uses_mask:
func(
result,
values,
comp_ids,
ngroups,
is_datetimelike,
mask=mask,
**kwargs,
)
else:
func(result, values, comp_ids, ngroups, is_datetimelike, **kwargs)

if kind == "aggregate":
# i.e. counts is defined. Locations where count<min_count
Expand Down
Loading

0 comments on commit de19779

Please sign in to comment.