Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#2322: add aligning partition' blocks #2367

Merged
merged 9 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ jobs:
run: python -m pytest modin/config/test
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_catcher.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py

test-defaults:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
Expand Down
44 changes: 30 additions & 14 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,8 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
"""
Copartition two dataframes.
Perform aligning of partitions, index and partition blocks.
Parameters
----------
axis : 0 or 1
Expand Down Expand Up @@ -1694,39 +1696,53 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
[self._simple_shuffle(axis, o) for o in other],
self.axes[axis].copy(),
)

index_other_obj = [o.axes[axis] for o in other]
joined_index = self._join_index_objects(axis, index_other_obj, how, sort)
# We have to set these because otherwise when we perform the functions it may
# end up serializing this entire object.
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj

is_avoid_reindex = len(joined_index) != len(joined_index.unique()) and axis == 0
def make_map_func():
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
if not joined_index.is_unique and axis == 0:
return lambda df: df
return lambda df: df.reindex(joined_index, axis=axis)

# Start with this and we'll repartition the first time, and then not again.
if (
not is_aligning_applied
and not is_avoid_reindex
and (force_repartition or not left_old_idx.equals(joined_index))
if is_aligning_applied or (
not force_repartition and left_old_idx.equals(joined_index)
):
reindexed_self = self._partitions
else:
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis)
axis,
self._partitions,
make_map_func(),
)
else:
reindexed_self = self._partitions
reindexed_other_list = []

def get_column_widths(partitions):
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
if len(partitions) > 0:
return [obj.width() for obj in partitions[0]]

def get_row_lengths(partitions):
if len(partitions.T) > 0:
return [obj.length() for obj in partitions.T[0]]

reindexed_other_list = []
for i in range(len(other)):
if (
is_aligning_applied
or is_avoid_reindex
or (not force_repartition and right_old_idxes[i].equals(joined_index))
if is_aligning_applied or (
not force_repartition and right_old_idxes[i].equals(joined_index)
):
reindexed_other = other[i]._partitions
else:
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
lambda df: df.reindex(joined_index, axis=axis),
make_map_func(),
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
)
reindexed_other_list.append(reindexed_other)
return reindexed_self, reindexed_other_list, joined_index
Expand Down
48 changes: 31 additions & 17 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,25 @@ def broadcast_axis_partitions(
left,
right,
keep_partitioning=False,
lengths=None,
):
"""
Broadcast the right partitions to left and apply a function along full axis.
Parameters
----------
axis : The axis to apply and broadcast over.
apply_func : The function to apply.
left : The left partitions.
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
axis : The axis to apply and broadcast over.
apply_func : The function to apply.
left : The left partitions.
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
Returns
-------
A new `np.array` of partition objects.
A new `np.array` of partition objects.
"""
# Since we are already splitting the DataFrame back up after an
# operation, we will just use this time to compute the number of
Expand All @@ -245,12 +248,19 @@ def broadcast_axis_partitions(
# may want to line to partitioning up with another BlockPartitions object. Since
# we don't need to maintain the partitioning, this gives us the opportunity to
# load-balance the data as well.
kw = {
"num_splits": num_splits,
"other_axis_partition": right_partitions,
}
if lengths:
kw["_lengths"] = lengths
kw["manual_partition"] = True

result_blocks = np.array(
[
part.apply(
preprocessed_map_func,
num_splits=num_splits,
other_axis_partition=right_partitions,
**kw,
)
for part in left_partitions
]
Expand Down Expand Up @@ -295,20 +305,23 @@ def map_axis_partitions(
partitions,
map_func,
keep_partitioning=False,
lengths=None,
):
"""
Applies `map_func` to every partition.
Parameters
----------
axis : 0 or 1
The axis to perform the map across (0 - index, 1 - columns).
partitions : NumPy array
The partitions of Modin Frame.
map_func : callable
The function to apply.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
axis : 0 or 1
The axis to perform the map across (0 - index, 1 - columns).
partitions : NumPy array
The partitions of Modin Frame.
map_func : callable
The function to apply.
keep_partitioning : bool. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
Returns
-------
Expand All @@ -326,6 +339,7 @@ def map_axis_partitions(
apply_func=map_func,
keep_partitioning=keep_partitioning,
right=None,
lengths=lengths,
)

@classmethod
Expand Down
3 changes: 2 additions & 1 deletion modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, list_of_blocks):
def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
):
lengths = kwargs.get("_lengths", None)
return deploy_ray_func._remote(
args=(
PandasFrameAxisPartition.deploy_axis_func,
Expand All @@ -43,7 +44,7 @@ def deploy_axis_func(
maintain_partitioning,
)
+ tuple(partitions),
num_returns=num_splits * 3,
num_returns=num_splits * 3 if lengths is None else len(lengths) * 3,
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
)

@classmethod
Expand Down
40 changes: 40 additions & 0 deletions modin/test/backends/pandas/test_internals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import modin.pandas as pd


def test_aligning_blocks():
# Test problem when modin frames have the same number of rows, but different
# blocks (partition.list_of_blocks). See #2322 for details
accm = pd.DataFrame(["-22\n"] * 162)
accm = accm.iloc[2:, :]
accm.reset_index(drop=True, inplace=True)
accm["T"] = pd.Series(["24.67\n"] * 145)

# see #2322 for details
repr(accm)


def test_aligning_blocks_with_duplicated_index():
# Same problem as in `test_aligning_blocks` but with duplicated values in index.
data11 = [0, 1]
data12 = [2, 3]

data21 = [0]
data22 = [1, 2, 3]

df1 = pd.DataFrame(data11).append(pd.DataFrame(data12))
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))

repr(df1 - df2)