Skip to content

Commit

Permalink
FIX-#2322: add aligning partition' blocks (#2367)
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Nov 9, 2020
1 parent fc34852 commit a13384c
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ jobs:
run: python -m pytest modin/config/test
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_catcher.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py

test-defaults:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
Expand Down
35 changes: 35 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,41 @@ jobs:
architecture: "x64"
- run: pip install "ray>=1.0.0"

test-internals:
needs: prepare-cache
runs-on: ubuntu-latest
name: test-internals
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 1
- name: Cache pip
uses: actions/cache@v1
with:
path: ~/.cache/pip
key: ${{ runner.os }}-python-3.6-pip-${{ github.run_id }}-${{ hashFiles('environment.yml') }}
- uses: goanpeca/[email protected]
with:
activate-environment: modin
environment-file: environment.yml
python-version: 3.6
channel-priority: strict
use-only-tar-bz2: true # IMPORTANT: This needs to be set for caching to work properly!
- name: Conda environment
shell: bash -l {0}
run: |
conda info
conda list
- name: Internals tests
shell: bash -l {0}
run: python -m pytest modin/data_management/factories/test/test_dispatcher.py modin/experimental/cloud/test/test_cloud.py
- shell: bash -l {0}
run: python -m pytest modin/config/test
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_catcher.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py

test-defaults:
needs: prepare-cache
runs-on: ubuntu-latest
Expand Down
44 changes: 30 additions & 14 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,8 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
"""
Copartition two dataframes.
Perform aligning of partitions, index and partition blocks.
Parameters
----------
axis : 0 or 1
Expand Down Expand Up @@ -1694,39 +1696,53 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
[self._simple_shuffle(axis, o) for o in other],
self.axes[axis].copy(),
)

index_other_obj = [o.axes[axis] for o in other]
joined_index = self._join_index_objects(axis, index_other_obj, how, sort)
# We have to set these because otherwise when we perform the functions it may
# end up serializing this entire object.
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj

is_avoid_reindex = len(joined_index) != len(joined_index.unique()) and axis == 0
def make_map_func():
if not joined_index.is_unique and axis == 0:
return lambda df: df
return lambda df: df.reindex(joined_index, axis=axis)

# Start with this and we'll repartition the first time, and then not again.
if (
not is_aligning_applied
and not is_avoid_reindex
and (force_repartition or not left_old_idx.equals(joined_index))
if is_aligning_applied or (
not force_repartition and left_old_idx.equals(joined_index)
):
reindexed_self = self._partitions
else:
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis)
axis,
self._partitions,
make_map_func(),
)
else:
reindexed_self = self._partitions
reindexed_other_list = []

def get_column_widths(partitions):
if len(partitions) > 0:
return [obj.width() for obj in partitions[0]]

def get_row_lengths(partitions):
if len(partitions.T) > 0:
return [obj.length() for obj in partitions.T[0]]

reindexed_other_list = []
for i in range(len(other)):
if (
is_aligning_applied
or is_avoid_reindex
or (not force_repartition and right_old_idxes[i].equals(joined_index))
if is_aligning_applied or (
not force_repartition and right_old_idxes[i].equals(joined_index)
):
reindexed_other = other[i]._partitions
else:
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
lambda df: df.reindex(joined_index, axis=axis),
make_map_func(),
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
)
reindexed_other_list.append(reindexed_other)
return reindexed_self, reindexed_other_list, joined_index
Expand Down
48 changes: 31 additions & 17 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,25 @@ def broadcast_axis_partitions(
left,
right,
keep_partitioning=False,
lengths=None,
):
"""
Broadcast the right partitions to left and apply a function along full axis.
Parameters
----------
axis : The axis to apply and broadcast over.
apply_func : The function to apply.
left : The left partitions.
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
axis : The axis to apply and broadcast over.
apply_func : The function to apply.
left : The left partitions.
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
Returns
-------
A new `np.array` of partition objects.
A new `np.array` of partition objects.
"""
# Since we are already splitting the DataFrame back up after an
# operation, we will just use this time to compute the number of
Expand All @@ -245,12 +248,19 @@ def broadcast_axis_partitions(
# may want to line to partitioning up with another BlockPartitions object. Since
# we don't need to maintain the partitioning, this gives us the opportunity to
# load-balance the data as well.
kw = {
"num_splits": num_splits,
"other_axis_partition": right_partitions,
}
if lengths:
kw["_lengths"] = lengths
kw["manual_partition"] = True

result_blocks = np.array(
[
part.apply(
preprocessed_map_func,
num_splits=num_splits,
other_axis_partition=right_partitions,
**kw,
)
for part in left_partitions
]
Expand Down Expand Up @@ -295,20 +305,23 @@ def map_axis_partitions(
partitions,
map_func,
keep_partitioning=False,
lengths=None,
):
"""
Applies `map_func` to every partition.
Parameters
----------
axis : 0 or 1
The axis to perform the map across (0 - index, 1 - columns).
partitions : NumPy array
The partitions of Modin Frame.
map_func : callable
The function to apply.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
axis : 0 or 1
The axis to perform the map across (0 - index, 1 - columns).
partitions : NumPy array
The partitions of Modin Frame.
map_func : callable
The function to apply.
keep_partitioning : bool. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
Returns
-------
Expand All @@ -326,6 +339,7 @@ def map_axis_partitions(
apply_func=map_func,
keep_partitioning=keep_partitioning,
right=None,
lengths=lengths,
)

@classmethod
Expand Down
3 changes: 2 additions & 1 deletion modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, list_of_blocks):
def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
):
lengths = kwargs.get("_lengths", None)
return deploy_ray_func._remote(
args=(
PandasFrameAxisPartition.deploy_axis_func,
Expand All @@ -43,7 +44,7 @@ def deploy_axis_func(
maintain_partitioning,
)
+ tuple(partitions),
num_returns=num_splits * 3,
num_returns=num_splits * 3 if lengths is None else len(lengths) * 3,
)

@classmethod
Expand Down
40 changes: 40 additions & 0 deletions modin/test/backends/pandas/test_internals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import modin.pandas as pd


def test_aligning_blocks():
# Test problem when modin frames have the same number of rows, but different
# blocks (partition.list_of_blocks). See #2322 for details
accm = pd.DataFrame(["-22\n"] * 162)
accm = accm.iloc[2:, :]
accm.reset_index(drop=True, inplace=True)
accm["T"] = pd.Series(["24.67\n"] * 145)

# see #2322 for details
repr(accm)


def test_aligning_blocks_with_duplicated_index():
# Same problem as in `test_aligning_blocks` but with duplicated values in index.
data11 = [0, 1]
data12 = [2, 3]

data21 = [0]
data22 = [1, 2, 3]

df1 = pd.DataFrame(data11).append(pd.DataFrame(data12))
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))

repr(df1 - df2)

0 comments on commit a13384c

Please sign in to comment.