Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support for Dask groupby cumulative sum, count #10296

Closed
beckernick opened this issue Feb 15, 2022 · 3 comments · Fixed by #10889
Closed

[FEA] Support for Dask groupby cumulative sum, count #10296

beckernick opened this issue Feb 15, 2022 · 3 comments · Fixed by #10889
Assignees
Labels
dask Dask issue feature request New feature or request Python Affects Python cuDF API.

Comments

@beckernick
Copy link
Member

beckernick commented Feb 15, 2022

In cuDF, we support groupby.cumcount like pandas. Dask supports groupby cumulative count on CPUs but not GPUs. From the traceback, it looks like Dask is using a Grouper object and we go down a codepath where the cuDF.Grouper appears to be failing an instance check (perhaps it's in a list or tuple) based on if isinstance(by, cudf.Grouper) and by.freq.

EDIT: Generalizing, as this appears to happen for both groupby cumulative count and sum operations.

import dask.dataframe as dd
import pandas as pd
import dask_cudf
import cudfdf = pd.DataFrame({
    "a": [0,0,1,0,0,1,1,1,1,1],
    "b": [0,0.3,0,-4,0,-3,1,1,11,1],
    "c": [19,0,30,0,0,1,41,1,1,1],
})
gdf = cudf.from_pandas(df)
ddf = dd.from_pandas(df, 2)
gddf = dask_cudf.from_dask_dataframe(ddf)
​
print(gdf.groupby("a").cumcount())
print(gddf.groupby("a").cumcount().compute())
0    0
1    1
2    0
3    2
4    3
5    1
6    2
7    3
8    4
9    5
dtype: int32
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:1517, in _Grouping._handle_by_or_level(self, by, level)
   1516 try:
-> 1517     self._handle_label(by)
   1518 except (KeyError, TypeError):

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:1572, in _Grouping._handle_label(self, by)
   1571 def _handle_label(self, by):
-> 1572     self._key_columns.append(self._obj._data[by])
   1573     self.names.append(by)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column_accessor.py:155, in ColumnAccessor.__getitem__(self, key)
    154 def __getitem__(self, key: Any) -> ColumnBase:
--> 155     return self._data[key]

KeyError: Grouper(key='a', axis=0, sort=False)

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column/column.py:1999, in as_column(arbitrary, nan_as_null, dtype, length)
   1997 try:
   1998     data = as_column(
-> 1999         memoryview(arbitrary), dtype=dtype, nan_as_null=nan_as_null
   2000     )
   2001 except TypeError:

TypeError: memoryview: a bytes-like object is required, not 'Grouper'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column/column.py:2091, in as_column(arbitrary, nan_as_null, dtype, length)
   2089         pa_type = np_to_pa_dtype(np.dtype(dtype))
   2090     data = as_column(
-> 2091         pa.array(
   2092             arbitrary,
   2093             type=pa_type,
   2094             from_pandas=True
   2095             if nan_as_null is None
   2096             else nan_as_null,
   2097         ),
   2098         dtype=dtype,
   2099         nan_as_null=nan_as_null,
   2100     )
   2101 except (pa.ArrowInvalid, pa.ArrowTypeError, TypeError):

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/pyarrow/array.pxi:315, in pyarrow.lib.array()

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/pyarrow/array.pxi:39, in pyarrow.lib._sequence_to_array()

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/pyarrow/error.pxi:143, in pyarrow.lib.pyarrow_internal_check_status()

TypeError: 'Grouper' object is not iterable

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column/column.py:2139, in _construct_array(arbitrary, dtype)
   2138     dtype = dtype if dtype is None else cudf.dtype(dtype)
-> 2139     arbitrary = cupy.asarray(arbitrary, dtype=dtype)
   2140 except (TypeError, ValueError):

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cupy/_creation/from_data.py:66, in asarray(a, dtype, order)
     45 """Converts an object to array.
     46 
     47 This is equivalent to ``array(a, dtype, copy=False)``.
   (...)
     64 
     65 """
---> 66 return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2165, in cupy._core.core.array()

File cupy/_core/core.pyx:2244, in cupy._core.core.array()

File cupy/_core/core.pyx:2316, in cupy._core.core._send_object_to_gpu()

ValueError: Unsupported dtype object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
Input In [144], in <module>
     13 gddf = dask_cudf.from_dask_dataframe(ddf)
     15 print(gdf.groupby("a").cumcount())
---> 16 print(gddf.groupby("a").cumcount().compute())

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:553, in get_sync(dsk, keys, **kwargs)
    548 """A naive synchronous version of get_async
    549 
    550 Can be useful for debugging.
    551 """
    552 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553 return get_async(
    554     synchronous_executor.submit,
    555     synchronous_executor._max_workers,
    556     dsk,
    557     keys,
    558     **kwargs,
    559 )

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:496, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494 while state["waiting"] or state["ready"] or state["running"]:
    495     fire_tasks(chunksize)
--> 496     for key, res_info, failed in queue_get(queue).result():
    497         if failed:
    498             exc, tb = loads(res_info)

File ~/conda/envs/rapids-22.04/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/conda/envs/rapids-22.04/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:538, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    536 fut = Future()
    537 try:
--> 538     fut.set_result(fn(*args, **kwargs))
    539 except BaseException as e:
    540     fut.set_exception(e)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:234, in batch_execute_tasks(it)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:234, in <listcomp>(.0)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:225, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223     failed = False
    224 except BaseException as e:
--> 225     result = pack_exception(e, dumps)
    226     failed = True
    227 return key, result, failed

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/optimization.py:969, in SubgraphCallable.__call__(self, *args)
    967 if not len(args) == len(self.inkeys):
    968     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/utils.py:39, in apply(func, args, kwargs)
     37 def apply(func, args, kwargs=None):
     38     if kwargs:
---> 39         return func(*args, **kwargs)
     40     else:
     41         return func(*args)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:6148, in apply_and_enforce(*args, **kwargs)
   6146 func = kwargs.pop("_func")
   6147 meta = kwargs.pop("_meta")
-> 6148 df = func(*args, **kwargs)
   6149 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   6150     if not len(df):

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:303, in _apply_chunk(df, dropna, observed, *by, **kwargs)
    300 dropna = {"dropna": dropna} if dropna is not None else {}
    301 observed = {"observed": observed} if observed is not None else {}
--> 303 g = _groupby_raise_unaligned(df, by=by, **observed, **dropna)
    304 if is_series_like(df) or columns is None:
    305     return func(g, **kwargs)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:157, in _groupby_raise_unaligned(df, **kwargs)
    155         by = [by]
    156     kwargs.update(by=list(by))
--> 157 return df.groupby(**kwargs)

File ~/conda/envs/rapids-22.04/lib/python3.8/contextlib.py:75, in ContextDecorator.__call__.<locals>.inner(*args, **kwds)
     72 @wraps(func)
     73 def inner(*args, **kwds):
     74     with self._recreate_cm():
---> 75         return func(*args, **kwds)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/dataframe.py:3652, in DataFrame.groupby(self, by, axis, level, as_index, sort, group_keys, squeeze, observed, dropna)
   3644 if by is None and level is None:
   3645     raise TypeError(
   3646         "groupby() requires either by or level to be specified."
   3647     )
   3649 return (
   3650     DataFrameResampler(self, by=by)
   3651     if isinstance(by, cudf.Grouper) and by.freq
-> 3652     else DataFrameGroupBy(
   3653         self,
   3654         by=by,
   3655         level=level,
   3656         as_index=as_index,
   3657         dropna=dropna,
   3658         sort=sort,
   3659     )
   3660 )

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:83, in GroupBy.__init__(self, obj, by, level, sort, as_index, dropna)
     81     self.grouping = by
     82 else:
---> 83     self.grouping = _Grouping(obj, by, level)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:1489, in _Grouping.__init__(self, obj, by, level)
   1486 # Need to keep track of named key columns
   1487 # to support `as_index=False` correctly
   1488 self._named_columns = []
-> 1489 self._handle_by_or_level(by, level)
   1491 if len(obj) and not len(self._key_columns):
   1492     raise ValueError("No group keys passed")

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:1519, in _Grouping._handle_by_or_level(self, by, level)
   1517     self._handle_label(by)
   1518 except (KeyError, TypeError):
-> 1519     self._handle_misc(by)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:1593, in _Grouping._handle_misc(self, by)
   1592 def _handle_misc(self, by):
-> 1593     by = cudf.core.column.as_column(by)
   1594     if len(by) != len(self._obj):
   1595         raise ValueError("Grouper and object must have same length")

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column/column.py:2124, in as_column(arbitrary, nan_as_null, dtype, length)
   2119                 return cudf.core.column.ListColumn.from_sequences(
   2120                     arbitrary
   2121                 )
   2122             else:
   2123                 data = as_column(
-> 2124                     _construct_array(arbitrary, dtype),
   2125                     dtype=dtype,
   2126                     nan_as_null=nan_as_null,
   2127                 )
   2128 return data

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column/column.py:2145, in _construct_array(arbitrary, dtype)
   2140 except (TypeError, ValueError):
   2141     native_dtype = dtype
   2142     if (
   2143         dtype is None
   2144         and not cudf._lib.scalar._is_null_host_scalar(arbitrary)
-> 2145         and infer_dtype(arbitrary) in ("mixed", "mixed-integer",)
   2146     ):
   2147         native_dtype = "object"
   2148     arbitrary = np.asarray(
   2149         arbitrary,
   2150         dtype=native_dtype
   2151         if native_dtype is None
   2152         else np.dtype(native_dtype),
   2153     )

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/pandas/_libs/lib.pyx:1441, in pandas._libs.lib.infer_dtype()

TypeError: 'Grouper' object is not iterable
@beckernick beckernick added feature request New feature or request Python Affects Python cuDF API. dask Dask issue labels Feb 15, 2022
@beckernick beckernick changed the title [FEA] Support for Dask groupby cumulative count (cumcount) [FEA] Support for Dask groupby cumulative sum, count Feb 15, 2022
@github-actions
Copy link

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

@brandon-b-miller
Copy link
Contributor

ran across this today and happy to pick this up.

@github-actions
Copy link

github-actions bot commented Jun 2, 2022

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

rapids-bot bot pushed a commit that referenced this issue Jun 7, 2022
This PR came up as part of solving #10296 which has to go through the `reindex` codepath with a `fill_value`. It does a number of things:

- Aligns our `reindex` signature with pandas
- Moves our `_reindex` helper to `IndexedFrame` from `DataFrame` whereas `Series` used to be promoting itself to a frame and calling the dataframe function
- Provides support for `fill_value`
- Refactors the relatively old tests for this functionality to support testing `fill_value` better and reduce code overall

Authors:
  - https://github.com/brandon-b-miller

Approvers:
  - Michael Wang (https://github.com/isVoid)
  - Ashwin Srinath (https://github.com/shwina)

URL: #10815
@rapids-bot rapids-bot bot closed this as completed in 4d2211e Jul 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask issue feature request New feature or request Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants