diff --git a/modin/experimental/engines/omnisci_on_ray/frame/data.py b/modin/experimental/engines/omnisci_on_ray/frame/data.py index e1329d9f377..dc526ac3efc 100644 --- a/modin/experimental/engines/omnisci_on_ray/frame/data.py +++ b/modin/experimental/engines/omnisci_on_ray/frame/data.py @@ -61,6 +61,7 @@ def __init__( dtypes=None, op=None, index_cols=None, + uses_rowid=False, ): assert dtypes is not None @@ -93,6 +94,8 @@ def __init__( if partitions is not None: self._filter_empties() + self._uses_rowid = uses_rowid + def id_str(self): return f"frame${self.id}" @@ -412,6 +415,7 @@ def _union_all( for frame in [self] + other_modin_frames: aligned_index = None exprs = OrderedDict() + uses_rowid = False if not ignore_index: if frame._index_cols: @@ -425,6 +429,7 @@ def _union_all( aligned_index = ["__index__"] exprs["__index__"] = frame.ref("__rowid__") aligned_index_dtypes = [_get_dtype(int)] + uses_rowid = True aligned_dtypes = aligned_index_dtypes + new_dtypes else: aligned_dtypes = new_dtypes @@ -442,6 +447,7 @@ def _union_all( dtypes=aligned_dtypes, op=aligned_frame_op, index_cols=aligned_index, + uses_rowid=uses_rowid, ) ) @@ -628,7 +634,7 @@ def _execute(self): if isinstance(self._op, FrameNode): return - # MaskNode requires rowid which is available for executed frames only. + # Some frames require rowid which is available for executed frames only. # Also there is a common pattern when MaskNode is executed to print # frame. If we run the whole tree then any following frame usage will # require re-compute. So we just execute MaskNode's operands. @@ -640,12 +646,18 @@ def _execute(self): self._partitions = new_partitions self._op = FrameNode(self) + def _require_executed_base(self): + if isinstance(self._op, MaskNode): + return True + return self._uses_rowid + def _run_sub_queries(self): if isinstance(self._op, FrameNode): return - if isinstance(self._op, MaskNode): - self._op.input[0]._execute() + if self._require_executed_base(): + for op in self._op.input: + op._execute() else: for frame in self._op.input: frame._run_sub_queries() diff --git a/modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py b/modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py index e242f8b5b53..6d1be2b9d4a 100644 --- a/modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py +++ b/modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py @@ -149,6 +149,20 @@ def test_concat_many(self): df_equals(ref, exp) + def test_concat_agg(self): + def concat(lib, df1, df2): + df1 = df1.groupby("a", as_index=False).agg( + {"b": "sum", "d": "sum", "e": "sum"} + ) + df2 = df2.groupby("a", as_index=False).agg( + {"c": "sum", "b": "sum", "f": "sum"} + ) + return lib.concat([df1, df2]) + + run_and_compare( + concat, data=self.data, data2=self.data2, + ) + class TestGroupby: data = {