Skip to content

Commit

Permalink
FIX-modin-project#2642: Use partition IPs as an experimental feature
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev committed Jan 25, 2021
1 parent 2f880c1 commit 7f29166
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 57 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import modin

project = u"Modin"
copyright = u"2018-2020, Modin"
copyright = u"2018-2021, Modin"
author = u"Modin contributors"

# The short X.Y version
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ nature, you get a fast DataFrame at 1MB and 1TB+.

using_modin
out_of_core
partition_ips

.. toctree::
:caption: Examples
Expand Down
64 changes: 64 additions & 0 deletions docs/partition_ips.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
Partition IPs in Modin (experimental)
======================================

If you are working with Modin DataFrame and would like to unwrap its remote partitions
for your needs (pass them to another function that will be processed on a concrete node of the cluster,
for instance), you can use IPs of the remote partitions. In that case you can pass the partitions
having needed IPs to your function. It can help with minimazing of data movement between nodes. However,
it worth noticing that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match
actual locations if the partitions are lower 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store
of the calling process. We can't get IPs for such objects with saving good performance. So, you should keep in mind this
when unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in
``How to handle objects that are lower 100 kB`` section. Wherein, there is no such issue for Modin on ``Dask`` engine
with ``pandas`` backend because ``Dask`` saves any objects in the worker process that processes a function.
Please let us know what you think!

Install Modin Partition IPs
----------------------------

Modin now comes with all the dependencies for partitions IPs functionality by default! See
the `installation page`_ for more information on installing Modin.

Starting Modin with Partition IPs enabled
------------------------------------------

Partition IPs is detected from an environment variable set in bash.

.. code-block:: bash
export MODIN_ENABLE_PARTITIONS_API=true
How to handle objects that are lower 100 kB
-------------------------------------------

* If you are sure that each of the remote partitions unwrapped is higher 100 kB, you can just import Modin or perform ``ray.init()`` manually.

* If you don't know partitions size you can pass the option ``_system_config={"max_direct_call_object_size": <nbytes>,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable:

.. code-block:: bash
export MODIN_ON_RAY_PARTITION_THRESHOLD=<nbytes>
When specifying ``nbytes`` is equal to 0, all the objects will be saved to shared-memory object store (plasma).

* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":<nbytes>}'``.

Note that when specifying the threshold the performance of some Modin operations may change.

Running an example with Partition IPs
--------------------------------------

Before you run this, please make sure you follow the instructions listed above.

.. code-block:: python
import modin.pandas as pd
from modin.api import unwrap_partitions, create_df_from_partitions
df = pd.read_csv("/path/to/your/file")
partitions = unwrap_partitions(df, axis=0, bind_ip=True)
print(partitions)
# Also, you can create Modin DataFrame from remote partitions including their IPs
new_df = create_df_from_partitions(partitions, 0)
print(new_df)
.. _`installation page`: installation.rst
103 changes: 99 additions & 4 deletions modin/api/partition_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import numpy as np

from modin.backends.pandas.query_compiler import PandasQueryCompiler
from modin.config import EnablePartitionIPs
from modin.pandas.dataframe import DataFrame


def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
"""
Expand Down Expand Up @@ -42,20 +48,23 @@ def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead."
)

if bind_ip and EnablePartitionIPs.get() is False:
ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)

if axis is None:

def _unwrap_partitions(oid):
if bind_ip:
return [
(partition.ip, getattr(partition, oid))
[(partition.ip, getattr(partition, oid)) for partition in row]
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]
else:
return [
getattr(partition, oid)
[getattr(partition, oid) for partition in row]
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]

actual_engine = type(
Expand All @@ -78,3 +87,89 @@ def _unwrap_partitions(oid):
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
for part in partitions
]


def create_df_from_partitions(partitions, axis):
"""
Create DataFrame from remote partitions.
Parameters
----------
partitions : list
List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used.
Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions
and partitions itself in depend of the engine used.
axis : None, 0 or 1
The `axis` parameter is used to identify what are the partitions passed.
You have to set:
- `axis` to 0 if you want to create DataFrame from row partitions.
- `axis` to 1 if you want to create DataFrame from column partitions.
- `axis` to None if you want to create DataFrame from 2D list of partitions.
Returns
-------
DataFrame
DataFrame instance created from remote partitions.
"""
from modin.data_management.factories.dispatcher import EngineDispatcher

factory = EngineDispatcher.get_engine()

partition_class = factory.io_cls.frame_cls._frame_mgr_cls._partition_class
partition_frame_class = factory.io_cls.frame_cls
partition_mgr_class = factory.io_cls.frame_cls._frame_mgr_cls

# When collecting partitions to NumPy array they will be kept row-wise
if axis is None:
if isinstance(partitions[0][0], tuple):
if EnablePartitionIPs.get() is False:
raise ValueError(
"Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
parts = np.array(
[
[partition_class(partition, ip=ip) for ip, partition in row]
for row in partitions
]
)
else:
parts = np.array(
[
[partition_class(partition) for partition in row]
for row in partitions
]
)
# When collecting partitions to NumPy array they will be kept row-wise
elif axis == 0:
if isinstance(partitions[0], tuple):
if EnablePartitionIPs.get() is False:
raise ValueError(
"Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
parts = np.array(
[[partition_class(partition, ip=ip)] for ip, partition in partitions]
)
else:
parts = np.array([[partition_class(partition)] for partition in partitions])
# When collecting partitions to NumPy array they will be kept column-wise
elif axis == 1:
if isinstance(partitions[0], tuple):
if EnablePartitionIPs.get() is False:
raise ValueError(
"Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
parts = np.array(
[[partition_class(partition, ip=ip) for ip, partition in partitions]]
)
else:
parts = np.array([[partition_class(partition) for partition in partitions]])
else:
raise ValueError(
f"Got unacceptable value of axis {axis}. Possible values are {0}, {1} or {None}."
)

index = partition_mgr_class.get_indices(0, parts, lambda df: df.axes[0])
columns = partition_mgr_class.get_indices(1, parts, lambda df: df.axes[1])
return DataFrame(
query_compiler=PandasQueryCompiler(partition_frame_class(parts, index, columns))
)
20 changes: 20 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ def _get_default(cls):
return CpuCount.get()


class EnablePartitionIPs(EnvironmentVariable, type=bool):
"""
Whether to enable use of IPs for remote partitions of not
"""

varname = "MODIN_ENABLE_PARTITIONS_IPS"

@classmethod
def _get_default(cls):
return False


class RayPartitionThreshold(EnvironmentVariable, type=int):
"""
What objects can be saved in in-process store (in bytes)
"""

varname = "MODIN_ON_RAY_PARTITION_THRESHOLD"


class RayPlasmaDir(EnvironmentVariable, type=ExactStr):
"""
Path to Plasma storage for Ray
Expand Down
15 changes: 13 additions & 2 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from abc import ABC
import pandas
import numpy as np
from modin.config import EnablePartitionIPs
from modin.data_management.utils import split_result_of_axis_func_pandas


Expand Down Expand Up @@ -136,12 +137,22 @@ def unwrap(self, squeeze=False, bind_ip=False):
"""
if squeeze and len(self.list_of_blocks) == 1:
if bind_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
if EnablePartitionIPs.get():
return self.list_of_ips[0], self.list_of_blocks[0]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
else:
return self.list_of_blocks[0]
else:
if bind_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
if EnablePartitionIPs.get():
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
else:
return self.list_of_blocks

Expand Down
48 changes: 35 additions & 13 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.config import EnablePartitionIPs
from modin.engines.base.frame.axis_partition import PandasFrameAxisPartition
from .partition import PandasOnDaskFramePartition

Expand All @@ -27,7 +28,12 @@ def __init__(self, list_of_blocks, bind_ip=False):
obj.drain_call_queue()
self.list_of_blocks = [obj.future for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]
if EnablePartitionIPs.get():
self.list_of_ips = [obj.ip for obj in list_of_blocks]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)

partition_type = PandasOnDaskFramePartition
instance_type = Future
Expand All @@ -51,12 +57,13 @@ def deploy_axis_func(

lengths = kwargs.get("_lengths", None)
result_num_splits = len(lengths) if lengths else num_splits
factor = 4 if EnablePartitionIPs.get() else 3

# We have to do this to split it back up. It is already split, but we need to
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(result_num_splits * 4)
for i in range(result_num_splits * factor)
]

@classmethod
Expand All @@ -76,18 +83,25 @@ def deploy_func_between_two_axis_partitions(
*partitions,
pure=False,
)
factor = 4 if EnablePartitionIPs.get() else 3
# We have to do this to split it back up. It is already split, but we need to
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(num_splits * 4)
for i in range(num_splits * factor)
]

def _wrap_partitions(self, partitions):
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]
if EnablePartitionIPs.get():
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]
else:
return [
self.partition_type(future, length, width)
for (future, length, width) in zip(*[iter(partitions)] * 3)
]


class PandasOnDaskFrameColumnPartition(PandasOnDaskFrameAxisPartition):
Expand Down Expand Up @@ -122,10 +136,18 @@ def deploy_dask_func(func, *args):
The result of the function `func`.
"""
result = func(*args)
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
if EnablePartitionIPs.get():
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns)
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns)]]
else:
return [i for r in result for i in [r, None, None]]
Loading

0 comments on commit 7f29166

Please sign in to comment.