Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#2374: remove extra code; add pandas way to handle duplicate values in reindex func for binary operations #2378

Merged
merged 16 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
JOIN_DATA_SIZE = MERGE_DATA_SIZE
ARITHMETIC_DATA_SIZE = GROUPBY_DATA_SIZE

CONCAT_DATA_SIZE = [(10_128, 100, 10_000, 128)]


class TimeGroupBy:
param_names = ["impl", "data_type", "data_size"]
Expand Down Expand Up @@ -111,6 +113,51 @@ def time_merge(self, impl, data_type, data_size, how, sort):
self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort)


class TimeConcat:
param_names = ["data_type", "data_size", "how", "axis"]
params = [
["int"],
CONCAT_DATA_SIZE,
["inner"],
[0, 1],
]

def setup(self, data_type, data_size, how, axis):
# shape for generate_dataframe: first - ncols, second - nrows
self.df1 = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)

def time_concat(self, data_type, data_size, how, axis):
pd.concat([self.df1, self.df2], axis=axis, join=how)


class TimeBinaryOp:
param_names = ["data_type", "data_size", "binary_op", "axis"]
params = [
["int"],
CONCAT_DATA_SIZE,
["mul"],
[0, 1],
]

def setup(self, data_type, data_size, binary_op, axis):
# shape for generate_dataframe: first - ncols, second - nrows
self.df1 = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)
self.op = getattr(self.df1, binary_op)

def time_concat(self, data_type, data_size, binary_op, axis):
self.op(self.df2, axis=axis)


class TimeArithmetic:
param_names = ["impl", "data_type", "data_size", "axis"]
params = [
Expand Down
221 changes: 124 additions & 97 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ def internal(block_idx, global_index):
]
return OrderedDict(partition_ids_with_indices)

def _join_index_objects(self, axis, other_index, how, sort):
@staticmethod
def _join_index_objects(axis, indexes, how, sort):
"""
Join the pair of index objects (columns or rows) by a given strategy.

Expand All @@ -976,37 +977,80 @@ def _join_index_objects(self, axis, other_index, how, sort):

Parameters
----------
axis : 0 or 1
The axis index object to join (0 - rows, 1 - columns).
other_index : Index
The other_index to join on.
how : {'left', 'right', 'inner', 'outer'}
The type of join to join to make.
sort : boolean
Whether or not to sort the joined index
axis : 0 or 1
The axis index object to join (0 - rows, 1 - columns).
indexes : list(Index)
The indexes to join on.
how : {'left', 'right', 'inner', 'outer'}
The type of join to join to make.
sort : boolean
Whether or not to sort the joined index

Returns
-------
Index
Joined indices.
(Index, func)
Joined index with make_reindexer func
"""
assert isinstance(indexes, list)

def merge_index(obj1, obj2):
# define helper functions
def merge(left_index, right_index):
if axis == 1 and how == "outer" and not sort:
return obj1.union(obj2, sort=False)
return left_index.union(right_index, sort=False)
else:
return obj1.join(obj2, how=how, sort=sort)

if isinstance(other_index, list):
joined_obj = self.columns if axis else self.index
# TODO: revisit for performance
for obj in other_index:
joined_obj = merge_index(joined_obj, obj)
return joined_obj
if axis:
return merge_index(self.columns, other_index)
return left_index.join(right_index, how=how, sort=sort)

# define condition for joining indexes
do_join_index = False
for index in indexes[1:]:
if not indexes[0].equals(index):
do_join_index = True
break

# define condition for joining indexes with getting indexers
is_duplicates = any(not index.is_unique for index in indexes) and axis == 0
indexers = []
if is_duplicates:
indexers = [None] * len(indexes)

# perform joining indexes
if do_join_index:
if len(indexes) == 2 and is_duplicates:
# in case of count of indexes > 2 we should perform joining all indexes
# after that get indexers
# in the fast path we can obtain joined_index and indexers in one call
joined_index, indexers[0], indexers[1] = indexes[0].join(
indexes[1], how=how, sort=sort, return_indexers=True
)
else:
joined_index = indexes[0]
# TODO: revisit for performance
for index in indexes[1:]:
joined_index = merge(joined_index, index)

if is_duplicates:
for i, index in enumerate(indexes):
indexers[i] = index.get_indexer_for(joined_index)
else:
return self.index.join(other_index, how=how, sort=sort)
joined_index = indexes[0].copy()

def make_reindexer(do_reindex: bool, frame_idx: int):
# the order of the frames must match the order of the indexes
if not do_reindex:
return lambda df: df

if is_duplicates:
assert indexers != []

return lambda df: df._reindex_with_indexers(
{0: [joined_index, indexers[frame_idx]]},
copy=True,
allow_dups=True,
)

return lambda df: df.reindex(joined_index, axis=axis)

return joined_index, make_reindexer

# Internal methods
# These methods are for building the correct answer in a modular way.
Expand Down Expand Up @@ -1697,19 +1741,19 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):

Parameters
----------
axis : 0 or 1
The axis to copartition along (0 - rows, 1 - columns).
other : BasePandasFrame
The other dataframes(s) to copartition against.
how : str
How to manage joining the index object ("left", "right", etc.)
sort : boolean
Whether or not to sort the joined index.
force_repartition : boolean
Whether or not to force the repartitioning. By default,
this method will skip repartitioning if it is possible. This is because
reindexing is extremely inefficient. Because this method is used to
`join` or `append`, it is vital that the internal indices match.
axis : 0 or 1
The axis to copartition along (0 - rows, 1 - columns).
other : BasePandasFrame
The other dataframes(s) to copartition against.
how : str
How to manage joining the index object ("left", "right", etc.)
sort : boolean
Whether or not to sort the joined index.
force_repartition : bool, default False
Whether or not to force the repartitioning. By default,
this method will skip repartitioning if it is possible. This is because
reindexing is extremely inefficient. Because this method is used to
`join` or `append`, it is vital that the internal indices match.

Returns
-------
Expand All @@ -1719,79 +1763,62 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
if isinstance(other, type(self)):
other = [other]

is_aligning_applied = False
for i in range(len(other)):
if (
len(self._partitions) != len(other[i]._partitions)
and len(self.axes[0]) == len(other[i].axes[0])
and axis == 0
):
is_aligning_applied = True
self._partitions = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df
)
other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions(
axis, other[i]._partitions, lambda df: df
)
anmyachev marked this conversation as resolved.
Show resolved Hide resolved

if (
all(o.axes[axis].equals(self.axes[axis]) for o in other)
and not is_aligning_applied
):
return (
self._partitions,
[self._simple_shuffle(axis, o) for o in other],
self.axes[axis].copy(),
)
# define helper functions
def get_axis_lengths(partitions, axis):
if axis:
return [obj.width() for obj in partitions[0]]
return [obj.length() for obj in partitions.T[0]]

index_other_obj = [o.axes[axis] for o in other]
joined_index = self._join_index_objects(axis, index_other_obj, how, sort)
# We have to set these because otherwise when we perform the functions it may
# end up serializing this entire object.
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj
self_index = self.axes[axis]
others_index = [o.axes[axis] for o in other]
joined_index, make_reindexer = self._join_index_objects(
axis, [self_index] + others_index, how, sort
)

def make_map_func():
if not joined_index.is_unique and axis == 0:
return lambda df: df
return lambda df: df.reindex(joined_index, axis=axis)
# define conditions for reindexing and repartitioning `self` frame
do_reindex_self = not self_index.equals(joined_index)
do_repartition_self = force_repartition or do_reindex_self

# Start with this and we'll repartition the first time, and then not again.
if is_aligning_applied or (
not force_repartition and left_old_idx.equals(joined_index)
):
reindexed_self = self._partitions
else:
# perform repartitioning and reindexing for `self` frame if needed
if do_repartition_self:
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis,
self._partitions,
make_map_func(),
# self frame has 0 idx
make_reindexer(do_reindex_self, 0),
)
else:
reindexed_self = self._partitions

def get_column_widths(partitions):
if len(partitions) > 0:
return [obj.width() for obj in partitions[0]]
# define length of `self` and `other` frames to aligning purpose
self_lengths = get_axis_lengths(reindexed_self, axis)
others_lengths = [o._axes_lengths[axis] for o in other]

def get_row_lengths(partitions):
if len(partitions.T) > 0:
return [obj.length() for obj in partitions.T[0]]
# define conditions for reindexing and repartitioning `other` frames
do_reindex_others = [not index.equals(joined_index) for index in others_index]

reindexed_other_list = []
do_repartition_others = [None] * len(other)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
for i in range(len(other)):
if is_aligning_applied or (
not force_repartition and right_old_idxes[i].equals(joined_index)
):
reindexed_other = other[i]._partitions
else:
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
do_repartition_others[i] = (
force_repartition
or do_reindex_others[i]
or others_lengths[i] != self_lengths
)

# perform repartitioning and reindexing for `other` frames if needed
reindexed_other_list = [None] * len(other)
for i in range(len(other)):
if do_repartition_others[i]:
reindexed_other_list[i] = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
make_map_func(),
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
# indices of others frame start from 1 (0 - self frame)
make_reindexer(do_reindex_others[i], 1 + i),
lengths=self_lengths,
)
reindexed_other_list.append(reindexed_other)
else:
reindexed_other_list[i] = other[i]._partitions

return reindexed_self, reindexed_other_list, joined_index

def _simple_shuffle(self, axis, other):
Expand Down Expand Up @@ -1900,7 +1927,7 @@ def _concat(self, axis, others, how, sort):
]
else:
left_parts, right_parts, joined_index = self._copartition(
axis ^ 1, others, how, sort, force_repartition=True
axis ^ 1, others, how, sort, force_repartition=False
)
new_lengths = None
new_widths = None
Expand Down
4 changes: 3 additions & 1 deletion modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def broadcast_axis_partitions(
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
lengths : list(int), default None
The list of lengths to shuffle the object.

Returns
Expand All @@ -250,6 +250,8 @@ def broadcast_axis_partitions(
# partitions as best we can right now.
if keep_partitioning:
num_splits = len(left) if axis == 0 else len(left.T)
elif lengths:
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
num_splits = len(lengths)
else:
num_splits = cls._compute_num_partitions()
preprocessed_map_func = cls.preprocess_func(apply_func)
Expand Down
14 changes: 14 additions & 0 deletions modin/test/backends/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# governing permissions and limitations under the License.

import modin.pandas as pd
from modin.pandas.test.utils import create_test_dfs

pd.DEFAULT_NPARTITIONS = 4
YarShev marked this conversation as resolved.
Show resolved Hide resolved


def test_aligning_blocks():
Expand All @@ -38,3 +41,14 @@ def test_aligning_blocks_with_duplicated_index():
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))

repr(df1 - df2)


def test_aligning_partitions():
data = [0, 1, 2, 3, 4, 5]
modin_df1, _ = create_test_dfs({"a": data, "b": data})
modin_df = modin_df1.loc[:2]

modin_df2 = modin_df.append(modin_df)

modin_df2["c"] = modin_df1["b"]
repr(modin_df2)