Skip to content

Commit

Permalink
REFACTOR-modin-project#2642: Apply comment; Use apply instead of new …
Browse files Browse the repository at this point in the history
…func

Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev committed Feb 12, 2021
1 parent 07117d7 commit 259eb3e
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 67 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ jobs:
python -m pip install -e .[all]
MODIN_ENGINE=dask python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
MODIN_ENGINE=ray python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
test-clean-install-windows:
needs: [ lint-commit, lint-flake8, lint-black, test-api, test-headers ]
runs-on: windows-latest
Expand All @@ -154,6 +155,7 @@ jobs:
python -m pip install -e .[all]
MODIN_ENGINE=dask python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
MODIN_ENGINE=ray python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
test-internals:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
runs-on: ubuntu-latest
Expand Down Expand Up @@ -312,6 +314,7 @@ jobs:
run: |
conda info
conda list
- name: Running benchmarks
shell: bash -l {0}
run: |
Expand Down
7 changes: 4 additions & 3 deletions modin/distributed/dataframe/pandas/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def unwrap_partitions(api_layer_object, axis=None, get_ip=False):
Notes
-----
If ``get_ip=True``, a list containing tuples of node ip addresses and Ray.ObjectRef/Dask.Future to
partitions of the ``api_layer_object``, respectively, is returned if Ray/Dask is used as an engine.
If ``get_ip=True``, a list of tuples of node ip addresses and Ray.ObjectRef/Dask.Future to
partitions of the ``api_layer_object``, respectively, is returned if Ray/Dask is used as an engine
(i.e. [(str, Ray.ObjectRef/Dask.Future), ...]).
"""
if not hasattr(api_layer_object, "_query_compiler"):
raise ValueError(
Expand Down Expand Up @@ -92,7 +93,7 @@ def from_partitions(partitions, axis):
partitions : list
List of Ray.ObjectRef/Dask.Future to partitions depending on the engine used.
Or list of tuples of node ip addresses and Ray.ObjectRef/Dask.Future to partitions
depending on the engine used.
depending on the engine used (i.e. [(str, Ray.ObjectRef/Dask.Future), ...]).
axis : None, 0 or 1
The ``axis`` parameter is used to identify what are the partitions passed.
You have to set:
Expand Down
4 changes: 2 additions & 2 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ def unwrap(self, squeeze=False, get_ip=False):
Notes
-----
In case get_ip=True, list containing tuples of node ip addresses and
If `get_ip=True`, list of tuples of node ip addresses and
Ray.ObjectRef/Dask.Future to unwrapped partitions, respectively, is returned
if Ray/Dask is used as an engine.
if Ray/Dask is used as an engine (i.e. [(str, Ray.ObjectRef/Dask.Future), ...]).
"""
if squeeze and len(self.list_of_blocks) == 1:
if get_ip:
Expand Down
25 changes: 0 additions & 25 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,6 @@ def __init__(self, list_of_blocks, get_ip=False):
partition_type = PandasOnDaskFramePartition
instance_type = Future

def broadcast_apply(self, rt_axis_parts, axis, apply_func, other_name):
def map_func(df, others):
other = pandas.concat(others, axis=axis ^ 1)
return apply_func(df, **{other_name: other})

client = get_client()
results = [
client.submit(
deploy_dask_func,
map_func,
block,
rt_axis_parts[i].list_of_blocks
if axis
else rt_axis_parts.list_of_blocks,
pure=False,
)
for i, block in enumerate(self.list_of_blocks)
]
partitions = [
client.submit(lambda l: l[j], results[i], pure=False)
for i in range(len(results))
for j in range(4)
]
return self._wrap_partitions(partitions)

@classmethod
def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
Expand Down
2 changes: 1 addition & 1 deletion modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from modin.engines.base.frame.partition import BaseFramePartition

from distributed.client import get_client
from distributed.client import Future
from distributed import Future
from distributed.utils import get_ip
import cloudpickle as pkl

Expand Down
28 changes: 20 additions & 8 deletions modin/engines/dask/pandas_on_dask/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from .partition import PandasOnDaskFramePartition
from modin.error_message import ErrorMessage
import pandas

from distributed.client import _get_global_client

Expand All @@ -36,6 +37,7 @@ class DaskFrameManager(BaseFrameManager):
def get_indices(cls, axis, partitions, index_func):
"""
This gets the internal indices stored in the partitions.
Parameters
----------
axis : 0 or 1
Expand All @@ -44,10 +46,12 @@ def get_indices(cls, axis, partitions, index_func):
The array of partitions from which need to extract the labels.
index_func : callable
The function to be used to extract the function.
Returns
-------
Index
A Pandas Index object.
Notes
-----
These are the global indices of the object. This is mostly useful
Expand Down Expand Up @@ -75,16 +79,24 @@ def get_indices(cls, axis, partitions, index_func):

@classmethod
def broadcast_apply(cls, axis, apply_func, left, right, other_name="r"):
lt_axis_parts = cls.axis_partition(left, 1)
def map_func(df, others):
other = pandas.concat(others, axis=axis ^ 1)
return apply_func(df, **{other_name: other})

rt_axis_parts = cls.axis_partition(right, axis ^ 1)
return np.array(
[
lt_axis_part.broadcast_apply(
rt_axis_parts if axis else rt_axis_parts[i],
axis,
apply_func,
other_name,
)
for i, lt_axis_part in enumerate(lt_axis_parts)
[
part.apply(
map_func,
**{
"others": rt_axis_parts[col_idx].list_of_blocks
if axis
else rt_axis_parts[row_idx].list_of_blocks
},
)
for col_idx, part in enumerate(left[row_idx])
]
for row_idx in range(len(left))
]
)
19 changes: 0 additions & 19 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,6 @@ def __init__(self, list_of_blocks, get_ip=False):
partition_type = PandasOnRayFramePartition
instance_type = ray.ObjectRef

def broadcast_apply(self, rt_axis_parts, axis, apply_func, other_name):
def map_func(df, *others):
other = pandas.concat(others, axis=axis ^ 1)
return apply_func(df, **{other_name: other})

partitions = []
for i, block in enumerate(self.list_of_blocks):
partitions += deploy_ray_func._remote(
args=(map_func, block)
+ tuple(
rt_axis_parts[i].list_of_blocks
if axis
else rt_axis_parts.list_of_blocks
),
num_returns=4,
)

return self._wrap_partitions(partitions)

@classmethod
def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
Expand Down
1 change: 0 additions & 1 deletion modin/engines/ray/pandas_on_ray/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ def deserialize(obj):
# we absolutely have to.
except ValueError:
result = func(partition.copy(), **kwargs)

return (
result,
len(result) if hasattr(result, "__len__") else 0,
Expand Down
28 changes: 20 additions & 8 deletions modin/engines/ray/pandas_on_ray/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from .partition import PandasOnRayFramePartition
from modin.error_message import ErrorMessage
import pandas

import ray

Expand All @@ -36,6 +37,7 @@ class PandasOnRayFrameManager(RayFrameManager):
def get_indices(cls, axis, partitions, index_func=None):
"""
This gets the internal indices stored in the partitions.
Parameters
----------
axis : 0 or 1
Expand All @@ -44,10 +46,12 @@ def get_indices(cls, axis, partitions, index_func=None):
The array of partitions from which need to extract the labels.
index_func : callable
The function to be used to extract the function.
Returns
-------
Index
A Pandas Index object.
Notes
-----
These are the global indices of the object. This is mostly useful
Expand All @@ -74,16 +78,24 @@ def get_indices(cls, axis, partitions, index_func=None):

@classmethod
def broadcast_apply(cls, axis, apply_func, left, right, other_name="r"):
lt_axis_parts = cls.axis_partition(left, 1)
def map_func(df, others):
other = pandas.concat(ray.get(others), axis=axis ^ 1)
return apply_func(df, **{other_name: other})

rt_axis_parts = cls.axis_partition(right, axis ^ 1)
return np.array(
[
lt_axis_part.broadcast_apply(
rt_axis_parts if axis else rt_axis_parts[i],
axis,
apply_func,
other_name,
)
for i, lt_axis_part in enumerate(lt_axis_parts)
[
part.apply(
map_func,
**{
"others": rt_axis_parts[col_idx].list_of_blocks
if axis
else rt_axis_parts[row_idx].list_of_blocks
},
)
for col_idx, part in enumerate(left[row_idx])
]
for row_idx in range(len(left))
]
)

0 comments on commit 259eb3e

Please sign in to comment.