Skip to content

Commit

Permalink
FIX-modin-project#2642: Use partition IPs as an experimental feature
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev committed Feb 10, 2021
1 parent 5cb3283 commit 8bc8ce7
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 52 deletions.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ nature, you get a fast DataFrame at 1MB and 1TB+.
using_modin
out_of_core
modin_xgboost
partition_ips

.. toctree::
:caption: Examples
Expand Down
64 changes: 64 additions & 0 deletions docs/partition_ips.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
Partition IPs in Modin (experimental)
======================================

If you are working with Modin DataFrame and would like to unwrap its remote partitions
for your needs (pass them to another function that will be processed on a concrete node of the cluster,
for instance), you can use IPs of the remote partitions. In that case you can pass the partitions
having needed IPs to your function. It can help with minimazing of data movement between nodes. However,
it worth noticing that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match
actual locations if the partitions are lower 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store
of the calling process. We can't get IPs for such objects with saving good performance. So, you should keep in mind this
when unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in
``How to handle objects that are lower 100 kB`` section. Wherein, there is no such issue for Modin on ``Dask`` engine
with ``pandas`` backend because ``Dask`` saves any objects in the worker process that processes a function.
Please let us know what you think!

Install Modin Partition IPs
----------------------------

Modin now comes with all the dependencies for partitions IPs functionality by default! See
the `installation page`_ for more information on installing Modin.

Starting Modin with Partition IPs enabled
------------------------------------------

Partition IPs is detected from an environment variable set in bash.

.. code-block:: bash
export MODIN_ENABLE_PARTITIONS_API=true
How to handle objects that are lower 100 kB
-------------------------------------------

* If you are sure that each of the remote partitions unwrapped is higher 100 kB, you can just import Modin or perform ``ray.init()`` manually.

* If you don't know partitions size you can pass the option ``_system_config={"max_direct_call_object_size": <nbytes>,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable:

.. code-block:: bash
export MODIN_ON_RAY_PARTITION_THRESHOLD=<nbytes>
When specifying ``nbytes`` is equal to 0, all the objects will be saved to shared-memory object store (plasma).

* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":<nbytes>}'``.

Note that when specifying the threshold the performance of some Modin operations may change.

Running an example with Partition IPs
--------------------------------------

Before you run this, please make sure you follow the instructions listed above.

.. code-block:: python
import modin.pandas as pd
from modin.api import unwrap_partitions, create_df_from_partitions
df = pd.read_csv("/path/to/your/file")
partitions = unwrap_partitions(df, axis=0, bind_ip=True)
print(partitions)
# Also, you can create Modin DataFrame from remote partitions including their IPs
new_df = create_df_from_partitions(partitions, 0)
print(new_df)
.. _`installation page`: installation.rst
20 changes: 20 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ def _get_default(cls):
return CpuCount.get()


class EnablePartitionIPs(EnvironmentVariable, type=bool):
"""
Whether to enable use of IPs for remote partitions of not
"""

varname = "MODIN_ENABLE_PARTITIONS_IPS"

@classmethod
def _get_default(cls):
return False


class RayPartitionThreshold(EnvironmentVariable, type=int):
"""
What objects can be saved in in-process store (in bytes)
"""

varname = "MODIN_ON_RAY_PARTITION_THRESHOLD"


class RayPlasmaDir(EnvironmentVariable, type=ExactStr):
"""
Path to Plasma storage for Ray
Expand Down
15 changes: 13 additions & 2 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from abc import ABC
import pandas
import numpy as np
from modin.config import EnablePartitionIPs
from modin.data_management.utils import split_result_of_axis_func_pandas


Expand Down Expand Up @@ -136,12 +137,22 @@ def unwrap(self, squeeze=False, bind_ip=False):
"""
if squeeze and len(self.list_of_blocks) == 1:
if bind_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
if EnablePartitionIPs.get():
return self.list_of_ips[0], self.list_of_blocks[0]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
else:
return self.list_of_blocks[0]
else:
if bind_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
if EnablePartitionIPs.get():
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
else:
return self.list_of_blocks

Expand Down
48 changes: 35 additions & 13 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.config import EnablePartitionIPs
from modin.engines.base.frame.axis_partition import PandasFrameAxisPartition
from .partition import PandasOnDaskFramePartition

Expand All @@ -27,7 +28,12 @@ def __init__(self, list_of_blocks, bind_ip=False):
obj.drain_call_queue()
self.list_of_blocks = [obj.future for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]
if EnablePartitionIPs.get():
self.list_of_ips = [obj.ip for obj in list_of_blocks]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)

partition_type = PandasOnDaskFramePartition
instance_type = Future
Expand All @@ -51,12 +57,13 @@ def deploy_axis_func(

lengths = kwargs.get("_lengths", None)
result_num_splits = len(lengths) if lengths else num_splits
factor = 4 if EnablePartitionIPs.get() else 3

# We have to do this to split it back up. It is already split, but we need to
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(result_num_splits * 4)
for i in range(result_num_splits * factor)
]

@classmethod
Expand All @@ -76,18 +83,25 @@ def deploy_func_between_two_axis_partitions(
*partitions,
pure=False,
)
factor = 4 if EnablePartitionIPs.get() else 3
# We have to do this to split it back up. It is already split, but we need to
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(num_splits * 4)
for i in range(num_splits * factor)
]

def _wrap_partitions(self, partitions):
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]
if EnablePartitionIPs.get():
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]
else:
return [
self.partition_type(future, length, width)
for (future, length, width) in zip(*[iter(partitions)] * 3)
]


class PandasOnDaskFrameColumnPartition(PandasOnDaskFrameAxisPartition):
Expand Down Expand Up @@ -122,10 +136,18 @@ def deploy_dask_func(func, *args):
The result of the function `func`.
"""
result = func(*args)
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
if EnablePartitionIPs.get():
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns)
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns)]]
else:
return [i for r in result for i in [r, None, None]]
19 changes: 13 additions & 6 deletions modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

import pandas

from modin.engines.base.frame.partition import BaseFramePartition
from modin.config import EnablePartitionIPs
from modin.data_management.utils import length_fn_pandas, width_fn_pandas
from modin.engines.base.frame.partition import BaseFramePartition

from distributed.client import get_client
from distributed.utils import get_ip
Expand All @@ -26,7 +27,7 @@ def apply_list_of_funcs(funcs, df):
if isinstance(func, bytes):
func = pkl.loads(func)
df = func(df, **kwargs)
return df, get_ip()
return df, get_ip() if EnablePartitionIPs.get() else df


class PandasOnDaskFramePartition(BaseFramePartition):
Expand Down Expand Up @@ -83,8 +84,11 @@ def apply(self, func, **kwargs):
future = get_client().submit(
apply_list_of_funcs, call_queue, self.future, pure=False
)
futures = [get_client().submit(lambda l: l[i], future) for i in range(2)]
return PandasOnDaskFramePartition(futures[0], ip=futures[1])
if EnablePartitionIPs.get():
futures = [get_client().submit(lambda l: l[i], future) for i in range(2)]
return PandasOnDaskFramePartition(futures[0], ip=futures[1])
else:
return PandasOnDaskFramePartition(future)

def add_to_apply_calls(self, func, **kwargs):
return PandasOnDaskFramePartition(
Expand All @@ -95,8 +99,11 @@ def drain_call_queue(self):
if len(self.call_queue) == 0:
return
new_partition = self.apply(lambda x: x)
self.future = new_partition.future
self.ip = new_partition.ip
if EnablePartitionIPs.get():
self.future = new_partition.future
self.ip = new_partition.ip
else:
self.future = new_partition.future
self.call_queue = []

def mask(self, row_indices, col_indices):
Expand Down
55 changes: 41 additions & 14 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

import pandas

from modin.config import EnablePartitionIPs
from modin.engines.base.frame.axis_partition import PandasFrameAxisPartition
from .partition import PandasOnRayFramePartition

import ray
from ray.services import get_node_ip_address


if EnablePartitionIPs.get():
from ray.services import get_node_ip_address


class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
Expand All @@ -27,7 +31,12 @@ def __init__(self, list_of_blocks, bind_ip=False):
obj.drain_call_queue()
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]
if EnablePartitionIPs.get():
self.list_of_ips = [obj.ip for obj in list_of_blocks]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)

partition_type = PandasOnRayFramePartition
instance_type = ray.ObjectRef
Expand All @@ -37,6 +46,7 @@ def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
):
lengths = kwargs.get("_lengths", None)
factor = 4 if EnablePartitionIPs.get() else 3
return deploy_ray_func._remote(
args=(
PandasFrameAxisPartition.deploy_axis_func,
Expand All @@ -47,13 +57,16 @@ def deploy_axis_func(
maintain_partitioning,
)
+ tuple(partitions),
num_returns=num_splits * 4 if lengths is None else len(lengths) * 4,
num_returns=num_splits * factor
if lengths is None
else len(lengths) * factor,
)

@classmethod
def deploy_func_between_two_axis_partitions(
cls, axis, func, num_splits, len_of_left, other_shape, kwargs, *partitions
):
factor = 4 if EnablePartitionIPs.get() else 3
return deploy_ray_func._remote(
args=(
PandasFrameAxisPartition.deploy_func_between_two_axis_partitions,
Expand All @@ -65,14 +78,20 @@ def deploy_func_between_two_axis_partitions(
kwargs,
)
+ tuple(partitions),
num_returns=num_splits * 4,
num_returns=num_splits * factor,
)

def _wrap_partitions(self, partitions):
return [
self.partition_type(object_id, length, width, ip)
for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4)
]
if EnablePartitionIPs.get():
return [
self.partition_type(object_id, length, width, ip)
for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4)
]
else:
return [
self.partition_type(object_id, length, width)
for (object_id, length, width) in zip(*[iter(partitions)] * 3)
]


class PandasOnRayFrameColumnPartition(PandasOnRayFrameAxisPartition):
Expand Down Expand Up @@ -112,10 +131,18 @@ def deploy_ray_func(func, *args): # pragma: no cover
Ray functions are not detected by codecov (thus pragma: no cover)
"""
result = func(*args)
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
if EnablePartitionIPs.get():
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns)
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns)]]
else:
return [i for r in result for i in [r, None, None]]
Loading

0 comments on commit 8bc8ce7

Please sign in to comment.