Skip to content

Commit

Permalink
FEAT-#2282: support DataFrame.[count|max|min|sum] for OmniSci backend (
Browse files Browse the repository at this point in the history
…#2283)

Signed-off-by: ienkovich <[email protected]>
  • Loading branch information
ienkovich authored Oct 22, 2020
1 parent 64b94f5 commit 6697e05
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 26 deletions.
42 changes: 26 additions & 16 deletions modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ def groupby_size(
else:
shape_hint = None
new_frame = new_frame._set_columns(list(new_frame.columns)[:-1] + ["size"])
new_qc = self.__constructor__(new_frame, shape_hint=shape_hint)
if groupby_args["squeeze"]:
new_qc = new_qc.squeeze()
return new_qc
return self.__constructor__(new_frame, shape_hint=shape_hint)

def groupby_sum(self, by, axis, groupby_args, map_args, **kwargs):
"""Groupby with sum aggregation.
Expand All @@ -234,10 +231,7 @@ def groupby_sum(self, by, axis, groupby_args, map_args, **kwargs):
new_frame = self._modin_frame.groupby_agg(
by, axis, "sum", groupby_args, **kwargs
)
new_qc = self.__constructor__(new_frame)
if groupby_args["squeeze"]:
new_qc = new_qc.squeeze()
return new_qc
return self.__constructor__(new_frame)

def groupby_count(self, by, axis, groupby_args, map_args, **kwargs):
"""Perform a groupby count.
Expand Down Expand Up @@ -266,10 +260,7 @@ def groupby_count(self, by, axis, groupby_args, map_args, **kwargs):
new_frame = self._modin_frame.groupby_agg(
by, axis, "count", groupby_args, **kwargs
)
new_qc = self.__constructor__(new_frame)
if groupby_args["squeeze"]:
new_qc = new_qc.squeeze()
return new_qc
return self.__constructor__(new_frame)

def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
"""Apply aggregation functions to a grouped dataframe per-column.
Expand All @@ -296,10 +287,29 @@ def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
new_frame = self._modin_frame.groupby_agg(
by, 0, func_dict, groupby_args, **agg_args
)
new_qc = self.__constructor__(new_frame)
if groupby_args["squeeze"]:
new_qc = new_qc.squeeze()
return new_qc
return self.__constructor__(new_frame)

def count(self, axis=0, level=None, **kwargs):
return self._agg("count")

def max(self, axis=0, level=None, **kwargs):
return self._agg("max")

def min(self, axis=0, level=None, **kwargs):
return self._agg("min")

def sum(self, axis=0, level=None, **kwargs):
return self._agg("sum")

def _agg(self, agg, axis=0, level=None, **kwargs):
if level is not None or axis != 0:
return getattr(super(), agg)(axis=axis, level=level, **kwargs)

new_frame = self._modin_frame.agg(agg)
new_frame = new_frame._set_index(
pandas.Index.__new__(pandas.Index, data=["__reduced__"], dtype="O")
)
return self.__constructor__(new_frame, shape_hint="row")

def _get_index(self):
if self._modin_frame._has_unsupported_data:
Expand Down
78 changes: 68 additions & 10 deletions modin/experimental/engines/omnisci_on_ray/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,21 @@ def groupby_agg(self, by, axis, agg, groupby_args, **kwargs):

return new_frame

def agg(self, agg):
assert isinstance(agg, str)

agg_exprs = OrderedDict()
for col in self.columns:
agg_exprs[col] = AggregateExpr(agg, self.ref(col))

return self.__constructor__(
columns=self.columns,
dtypes=self._dtypes_for_exprs(agg_exprs),
op=GroupbyAggNode(self, [], agg_exprs, {"sort": False}),
index_cols=None,
force_execution_mode=self._force_execution_mode,
)

def fillna(
self,
value=None,
Expand Down Expand Up @@ -1062,7 +1077,38 @@ def _get_index(self):
return self._index_cache

def _set_index(self, new_index):
raise NotImplementedError("OmnisciOnRayFrame._set_index is not yet suported")
if not isinstance(new_index, (Index, MultiIndex)):
raise NotImplementedError(
"OmnisciOnRayFrame._set_index is not yet suported"
)

self._execute()

assert self._partitions.size == 1
obj = self._partitions[0][0].get()
if isinstance(obj, pd.DataFrame):
raise NotImplementedError(
"OmnisciOnRayFrame._set_index is not yet suported"
)
else:
assert isinstance(obj, pyarrow.Table)

at = obj
if self._index_cols:
at = at.drop(self._index_cols)

index_df = pd.DataFrame(data={}, index=new_index.copy())
index_df = index_df.reset_index()

index_at = pyarrow.Table.from_pandas(index_df)

for i, field in enumerate(at.schema):
index_at = index_at.append_column(field, at.column(i))

index_names = self._mangle_index_names(new_index.names)
index_at = index_at.rename_columns(index_names + list(self.columns))

return self.from_arrow(index_at, index_names, new_index)

def reset_index(self, drop):
if drop:
Expand Down Expand Up @@ -1119,7 +1165,7 @@ def _get_columns(self):
return super(OmnisciOnRayFrame, self)._get_columns()

columns = property(_get_columns)
index = property(_get_index, _set_index)
index = property(_get_index)

def has_multiindex(self):
if self._index_cache is not None:
Expand Down Expand Up @@ -1196,10 +1242,7 @@ def from_pandas(cls, df):
orig_index_names = df.index.names
orig_df = df

index_cols = [
f"__index__{i}_{'__None__' if n is None else n}"
for i, n in enumerate(df.index.names)
]
index_cols = cls._mangle_index_names(df.index.names)
df.index.names = index_cols
df = df.reset_index()

Expand Down Expand Up @@ -1232,19 +1275,33 @@ def from_pandas(cls, df):
)

@classmethod
def from_arrow(cls, at):
def _mangle_index_names(cls, names):
return [
f"__index__{i}_{'__None__' if n is None else n}"
for i, n in enumerate(names)
]

@classmethod
def from_arrow(cls, at, index_cols=None, index=None):
(
new_frame,
new_lengths,
new_widths,
unsupported_cols,
) = cls._frame_mgr_cls.from_arrow(at, return_dims=True)

new_columns = pd.Index(data=at.column_names, dtype="O")
new_index = pd.RangeIndex(at.num_rows)
if index_cols:
data_cols = [col for col in at.column_names if col not in index_cols]
new_index = index
else:
data_cols = at.column_names
assert index is None
new_index = pd.RangeIndex(at.num_rows)

new_columns = pd.Index(data=data_cols, dtype="O")
new_dtypes = pd.Series(
[cls._arrow_type_to_dtype(col.type) for col in at.columns],
index=new_columns,
index=at.column_names,
)

if len(unsupported_cols) > 0:
Expand All @@ -1260,6 +1317,7 @@ def from_arrow(cls, at):
row_lengths=new_lengths,
column_widths=new_widths,
dtypes=new_dtypes,
index_cols=index_cols,
has_unsupported_data=len(unsupported_cols) > 0,
)

Expand Down
16 changes: 16 additions & 0 deletions modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,22 @@ def groupby(df, **kwargs):
run_and_compare(groupby, data=self.data)


class TestAgg:
data = {
"a": [1, 2, None, None, 5, None],
"b": [10, 20, None, 40, 50, None],
"c": [None, 200, None, 400, 500, 600],
"d": [11, 22, 33, 44, 55, 66],
}

@pytest.mark.parametrize("agg", ["count", "max", "min", "sum"])
def test_simple_agg(self, agg):
def agg(df, agg, **kwargs):
return getattr(df, agg)()

run_and_compare(agg, data=self.data, agg=agg, force_lazy=False)


class TestMerge:
data = {
"a": [1, 2, 3],
Expand Down

0 comments on commit 6697e05

Please sign in to comment.