Skip to content

Commit

Permalink
FEAT-#2091: add distributed dataframe compare (#2579)
Browse files Browse the repository at this point in the history
Signed-off-by: Khang Vu <[email protected]>
  • Loading branch information
kvu35 authored Jan 25, 2021
1 parent 2f880c1 commit ad55231
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/supported_apis/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ default to pandas.
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``combine_first`` | `combine_first`_ | Y | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``compare`` | `compare`_ | D | |
| ``compare`` | `compare`_ | Y | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``copy`` | `copy`_ | Y | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_apis/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ the related section on `Defaulting to pandas`_.
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``combine_first`` | Y | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``compare`` | D | |
| ``compare`` | Y | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``compress`` | D | |
+-----------------------------+---------------------------------+----------------------------------------------------+
Expand Down
1 change: 1 addition & 0 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1884,5 +1884,6 @@ def set_index_names(self, names, axis=0):
kurt = DataFrameDefault.register(pandas.DataFrame.kurt)
sum_min_count = DataFrameDefault.register(pandas.DataFrame.sum)
prod_min_count = DataFrameDefault.register(pandas.DataFrame.prod)
compare = DataFrameDefault.register(pandas.DataFrame.compare)

# End of DataFrame methods
9 changes: 9 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3023,3 +3023,12 @@ def cat_codes(self):
return self.default_to_pandas(lambda df: df[df.columns[0]].cat.codes)

# END Cat operations

def compare(self, other, **kwargs):
return self.__constructor__(
self._modin_frame.broadcast_apply_full_axis(
0,
lambda l, r: pandas.DataFrame.compare(l, r, **kwargs),
other._modin_frame,
)
)
4 changes: 4 additions & 0 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def _validate_other(
numeric_or_time_only=False,
numeric_or_object_only=False,
comparison_dtypes_only=False,
compare_index=False,
):
"""
Help to check validity of other in inter-df operations.
Expand Down Expand Up @@ -260,6 +261,9 @@ def _validate_other(
else len(self._query_compiler.columns)
)
]
if compare_index:
if not self.index.equals(other.index):
raise TypeError("Cannot perform operation with non-equal index")
# Do dtype checking.
if numeric_only:
if not all(
Expand Down
16 changes: 10 additions & 6 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,16 @@ def compare(
keep_shape: bool = False,
keep_equal: bool = False,
) -> "DataFrame":
return self._default_to_pandas(
pandas.DataFrame.compare,
other=other,
align_axis=align_axis,
keep_shape=keep_shape,
keep_equal=keep_equal,
if not isinstance(other, DataFrame):
raise TypeError(f"Cannot compare DataFrame to {type(other)}")
other = self._validate_other(other, 0, compare_index=True)
return self.__constructor__(
query_compiler=self._query_compiler.compare(
other,
align_axis=align_axis,
keep_shape=keep_shape,
keep_equal=keep_equal,
)
)

def corr(self, method="pearson", min_periods=1):
Expand Down
14 changes: 11 additions & 3 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,21 @@ def compare(
keep_shape: bool = False,
keep_equal: bool = False,
):
return self._default_to_pandas(
pandas.Series.compare,
other=other,
if not isinstance(other, Series):
raise TypeError(f"Cannot compare Series to {type(other)}")
result = self.to_frame().compare(
other.to_frame(),
align_axis=align_axis,
keep_shape=keep_shape,
keep_equal=keep_equal,
)
if align_axis == "columns" or align_axis == 1:
# Pandas.DataFrame.Compare returns a dataframe with a multidimensional index object as the
# columns so we have to change column object back.
result.columns = pandas.Index(["self", "other"])
else:
result = result.squeeze().rename(None)
return result

def corr(self, other, method="pearson", min_periods=None):
if method == "pearson":
Expand Down
40 changes: 40 additions & 0 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,43 @@ def test_where():
pandas_result = pandas_df.where(pandas_df < 2, True)
modin_result = modin_df.where(modin_df < 2, True)
assert all((to_pandas(modin_result) == pandas_result).all())


@pytest.mark.parametrize("align_axis", ["index", "columns"])
@pytest.mark.parametrize("keep_shape", [False, True])
@pytest.mark.parametrize("keep_equal", [False, True])
def test_compare(align_axis, keep_shape, keep_equal):
kwargs = {
"align_axis": align_axis,
"keep_shape": keep_shape,
"keep_equal": keep_equal,
}
frame_data1 = random_state.randn(100, 10)
frame_data2 = random_state.randn(100, 10)
pandas_df = pandas.DataFrame(frame_data1, columns=list("abcdefghij"))
pandas_df2 = pandas.DataFrame(frame_data2, columns=list("abcdefghij"))
modin_df = pd.DataFrame(frame_data1, columns=list("abcdefghij"))
modin_df2 = pd.DataFrame(frame_data2, columns=list("abcdefghij"))

modin_result = modin_df.compare(modin_df2, **kwargs)
pandas_result = pandas_df.compare(pandas_df2, **kwargs)
assert to_pandas(modin_result).equals(pandas_result)

modin_result = modin_df2.compare(modin_df, **kwargs)
pandas_result = pandas_df2.compare(pandas_df, **kwargs)
assert to_pandas(modin_result).equals(pandas_result)

series_data1 = ["a", "b", "c", "d", "e"]
series_data2 = ["a", "a", "c", "b", "e"]
pandas_series1 = pandas.Series(series_data1)
pandas_series2 = pandas.Series(series_data2)
modin_series1 = pd.Series(series_data1)
modin_series2 = pd.Series(series_data2)

modin_result = modin_series1.compare(modin_series2, **kwargs)
pandas_result = pandas_series1.compare(pandas_series2, **kwargs)
assert to_pandas(modin_result).equals(pandas_result)

modin_result = modin_series2.compare(modin_series1, **kwargs)
pandas_result = pandas_series2.compare(pandas_series1, **kwargs)
assert to_pandas(modin_result).equals(pandas_result)

0 comments on commit ad55231

Please sign in to comment.