Skip to content

Commit

Permalink
REFACTOR-#2648: Correct uses of MapReduceFunction and metadata manipu… (
Browse files Browse the repository at this point in the history
#2655)

* REFACTOR-#2648: Correct uses of MapReduceFunction and metadata manipulation

Resolves #2648

Removes some code that is problematic for performance. There was a mix
of use cases for modifying the external metadata and internal metadata,
and some problematic components of these APIs that could hide bugs. The
implementation has been updated to ensure that these bugs do not
resurface.

Previously, the internal and external indices were compared, and then
updated according to some arguments that were passed in. This is not
scalable because collecting the indices is expensive. The possible bugs
hidden in this implementation decision could end up being very difficult
to detect: it implicitly updates the internal or external indices based
on a somewhat cryptic string pattern combined with a boolean flag.
Another very large issue is that sometimes external indices are updated
based on the partition lengths metadata. This was likely done to solve a
use case of not using the APIs properly.

This implementation has been removed and replaced with something more
explicit. If the internal indices need to be updated, they are updated
explicitly via existing APIs. Likewise if external indices need to be
updated, they are updated with a different API.

Several QueryCompiler APIs had to be reverted because they were misusing
the ReductionFunction or MapReduceFunction, thus the need for the
implicit modification of metadata. When this implicit modification was
removed, these APIs no longer worked, and so were reverted until they
can be reimplemented using correct APIs. The following APIs were
reverted as a part of this commit:

* `is_monotonic_increasing`
* `is_monotonic_decreasing`
* `value_counts`
* `searchsorted`
* `dt_tz`
* `dt_freq`

Signed-off-by: Devin Petersohn <[email protected]>

* REFACTOR-#2648: Remove debug code

Signed-off-by: Devin Petersohn <[email protected]>

* REFACTOR-#2648: Fix explicit rename

Signed-off-by: Devin Petersohn <[email protected]>
  • Loading branch information
devin-petersohn authored Jan 28, 2021
1 parent 4f26fc1 commit 09d7c18
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 380 deletions.
4 changes: 2 additions & 2 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,14 @@ def reset_index(self, **kwargs):
# we will implement a Distributed Series, and this will be returned
# instead.

def is_monotonic(self):
def is_monotonic_increasing(self):
"""Return boolean if values in the object are monotonic_increasing.
Returns
-------
bool
"""
return SeriesDefault.register(pandas.Series.is_monotonic)(self)
return SeriesDefault.register(pandas.Series.is_monotonic_increasing)(self)

def is_monotonic_decreasing(self):
"""Return boolean if values in the object are monotonic_decreasing.
Expand Down
234 changes: 41 additions & 193 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
is_list_like,
is_numeric_dtype,
is_datetime_or_timedelta_dtype,
is_scalar,
)
from pandas.core.base import DataError
from typing import Type, Callable
from collections.abc import Iterable, Container
import warnings

Expand All @@ -31,7 +29,6 @@
from modin.error_message import ErrorMessage
from modin.utils import try_cast_to_pandas, wrap_udf_function, hashable
from modin.data_management.functions import (
Function,
FoldFunction,
MapFunction,
MapReduceFunction,
Expand Down Expand Up @@ -155,35 +152,6 @@ def caller(df, *args, **kwargs):
return caller


def _numeric_only_reduce_fn(applier: Type[Function], *funcs) -> Callable:
"""
Build reduce function for statistic operations with `numeric_only` parameter.
Parameters
----------
applier: Callable
Function object to register `funcs`
*funcs: list
List of functions to register in `applier`
Returns
-------
callable
A callable function to be applied in the partitions
"""

def caller(self, *args, **kwargs):
# If `numeric_only` is None and the frame contains non-numeric columns,
# then we don't know what columns/indices will be dropped at the result
# of reduction function, and so can't preserve labels
preserve_index = kwargs.get("numeric_only", None) is not None
return applier.register(*funcs, preserve_index=preserve_index)(
self, *args, **kwargs
)

return caller


class PandasQueryCompiler(BaseQueryCompiler):
"""This class implements the logic necessary for operating on partitions
with a Pandas backend. This logic is specific to Pandas."""
Expand Down Expand Up @@ -625,46 +593,23 @@ def is_series_like(self):

# MapReduce operations

def _is_monotonic(self, func_type=None):
funcs = {
"increasing": lambda df: df.is_monotonic_increasing,
"decreasing": lambda df: df.is_monotonic_decreasing,
}

monotonic_fn = funcs.get(func_type, funcs["increasing"])

def is_monotonic_map(df):
df = df.squeeze(axis=1)
return [monotonic_fn(df), df.iloc[0], df.iloc[len(df) - 1]]

def is_monotonic_reduce(df):
df = df.squeeze(axis=1)

common_case = df[0].all()
left_edges = df[1]
right_edges = df[2]

edges_list = []
for i in range(len(left_edges)):
edges_list.extend([left_edges.iloc[i], right_edges.iloc[i]])

edge_case = monotonic_fn(pandas.Series(edges_list))
return [common_case and edge_case]
def is_monotonic_decreasing(self):
def is_monotonic_decreasing(df):
return pandas.DataFrame([df.squeeze(axis=1).is_monotonic_decreasing])

return MapReduceFunction.register(
is_monotonic_map, is_monotonic_reduce, axis=0
)(self)
return self.default_to_pandas(is_monotonic_decreasing)

def is_monotonic_decreasing(self):
return self._is_monotonic(func_type="decreasing")
def is_monotonic_increasing(self):
def is_monotonic_increasing(df):
return pandas.DataFrame([df.squeeze(axis=1).is_monotonic_increasing])

is_monotonic = _is_monotonic
return self.default_to_pandas(is_monotonic_increasing)

count = MapReduceFunction.register(pandas.DataFrame.count, pandas.DataFrame.sum)
max = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.max)
min = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.min)
sum = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.sum)
prod = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.prod)
max = MapReduceFunction.register(pandas.DataFrame.max)
min = MapReduceFunction.register(pandas.DataFrame.min)
sum = MapReduceFunction.register(pandas.DataFrame.sum)
prod = MapReduceFunction.register(pandas.DataFrame.prod)
any = MapReduceFunction.register(pandas.DataFrame.any, pandas.DataFrame.any)
all = MapReduceFunction.register(pandas.DataFrame.all, pandas.DataFrame.all)
memory_usage = MapReduceFunction.register(
Expand Down Expand Up @@ -716,60 +661,26 @@ def value_counts(self, **kwargs):
-------
PandasQueryCompiler
"""
if kwargs.get("bins", None) is not None:
new_modin_frame = self._modin_frame._apply_full_axis(
0, lambda df: df.squeeze(axis=1).value_counts(**kwargs)
)
return self.__constructor__(new_modin_frame)

def map_func(df, *args, **kwargs):
def value_counts(df):
return df.squeeze(axis=1).value_counts(**kwargs).to_frame()

def reduce_func(df, *args, **kwargs):
normalize = kwargs.get("normalize", False)
sort = kwargs.get("sort", True)
ascending = kwargs.get("ascending", False)
dropna = kwargs.get("dropna", True)

try:
result = (
df.squeeze(axis=1)
.groupby(df.index, sort=False, dropna=dropna)
.sum()
)
# This will happen with Arrow buffer read-only errors. We don't want to copy
# all the time, so this will try to fast-path the code first.
except (ValueError):
result = (
df.copy()
.squeeze(axis=1)
.groupby(df.index, sort=False, dropna=dropna)
.sum()
)

if normalize:
result = result / df.squeeze(axis=1).sum()

return result.sort_values(ascending=ascending) if sort else result

return MapReduceFunction.register(
map_func, reduce_func, axis=0, preserve_index=False
)(self, **kwargs)
return self.default_to_pandas(value_counts)

# END MapReduce operations

# Reduction operations
idxmax = ReductionFunction.register(pandas.DataFrame.idxmax)
idxmin = ReductionFunction.register(pandas.DataFrame.idxmin)
median = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.median)
median = ReductionFunction.register(pandas.DataFrame.median)
nunique = ReductionFunction.register(pandas.DataFrame.nunique)
skew = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.skew)
kurt = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.kurt)
sem = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sem)
std = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.std)
var = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.var)
sum_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sum)
prod_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.prod)
skew = ReductionFunction.register(pandas.DataFrame.skew)
kurt = ReductionFunction.register(pandas.DataFrame.kurt)
sem = ReductionFunction.register(pandas.DataFrame.sem)
std = ReductionFunction.register(pandas.DataFrame.std)
var = ReductionFunction.register(pandas.DataFrame.var)
sum_min_count = ReductionFunction.register(pandas.DataFrame.sum)
prod_min_count = ReductionFunction.register(pandas.DataFrame.prod)
quantile_for_single_value = ReductionFunction.register(pandas.DataFrame.quantile)
mad = ReductionFunction.register(pandas.DataFrame.mad)
to_datetime = ReductionFunction.register(
Expand Down Expand Up @@ -1350,83 +1261,13 @@ def searchsorted(self, **kwargs):
PandasQueryCompiler
"""

def map_func(part, *args, **kwargs):

elements_number = len(part.index)
assert elements_number > 0, "Wrong mapping behaviour of MapReduce"

# unify value type
value = kwargs.pop("value")
value = np.array([value]) if is_scalar(value) else value

if elements_number == 1:
part = part[part.columns[0]]
else:
part = part.squeeze()

part_index_start = part.index.start
part_index_stop = part.index.stop

result = part.searchsorted(value=value, *args, **kwargs)

processed_results = {}
value_number = 0
for value_result in result:
value_result += part_index_start

if value_result > part_index_start and value_result < part_index_stop:
processed_results[f"value{value_number}"] = {
"relative_location": "current_partition",
"index": value_result,
}
elif value_result <= part_index_start:
processed_results[f"value{value_number}"] = {
"relative_location": "previoius_partitions",
"index": part_index_start,
}
else:
processed_results[f"value{value_number}"] = {
"relative_location": "next_partitions",
"index": part_index_stop,
}

value_number += 1

return pandas.DataFrame(processed_results)

def reduce_func(map_results, *args, **kwargs):
def get_value_index(value_result):
value_result_grouped = value_result.groupby(level=0)
rel_location = value_result_grouped.get_group("relative_location")
ind = value_result_grouped.get_group("index")
# executes if result is inside of the mapped part
if "current_partition" in rel_location.values:
assert (
rel_location[rel_location == "current_partition"].count() == 1
), "Each value should have single result"
return ind[rel_location.values == "current_partition"]
# executes if result is between mapped parts
elif rel_location.nunique(dropna=False) > 1:
return ind[rel_location.values == "previoius_partitions"][0]
# executes if result is outside of the mapped part
else:
if "next_partitions" in rel_location.values:
return ind[-1]
else:
return ind[0]

map_results_parsed = map_results.apply(
lambda ser: get_value_index(ser)
).squeeze()

if isinstance(map_results_parsed, pandas.Series):
map_results_parsed = map_results_parsed.to_list()

return pandas.Series(map_results_parsed)
def searchsorted(df):
result = df.squeeze(axis=1).searchsorted(**kwargs)
if not is_list_like(result):
result = [result]
return pandas.DataFrame(result)

return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)(
self, **kwargs
)
return self.default_to_pandas(searchsorted)

# Dt map partitions operations

Expand Down Expand Up @@ -1456,12 +1297,19 @@ def get_value_index(value_result):
dt_is_leap_year = MapFunction.register(_dt_prop_map("is_leap_year"))
dt_daysinmonth = MapFunction.register(_dt_prop_map("daysinmonth"))
dt_days_in_month = MapFunction.register(_dt_prop_map("days_in_month"))
dt_tz = MapReduceFunction.register(
_dt_prop_map("tz"), lambda df: pandas.DataFrame(df.iloc[0]), axis=0
)
dt_freq = MapReduceFunction.register(
_dt_prop_map("freq"), lambda df: pandas.DataFrame(df.iloc[0]), axis=0
)

def dt_tz(self):
def datetime_tz(df):
return pandas.DataFrame([df.squeeze(axis=1).dt.tz])

return self.default_to_pandas(datetime_tz)

def dt_freq(self):
def datetime_freq(df):
return pandas.DataFrame([df.squeeze(axis=1).dt.freq])

return self.default_to_pandas(datetime_freq)

dt_to_period = MapFunction.register(_dt_func_map("to_period"))
dt_to_pydatetime = MapFunction.register(_dt_func_map("to_pydatetime"))
dt_tz_localize = MapFunction.register(_dt_func_map("tz_localize"))
Expand Down
Loading

0 comments on commit 09d7c18

Please sign in to comment.