Skip to content

Commit

Permalink
FIX-modin-project#2642: Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Vasily Litvinov <[email protected]>
Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev and vnlitvinov committed Jan 25, 2021
1 parent 7f29166 commit 8282dd1
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 44 deletions.
18 changes: 9 additions & 9 deletions docs/partition_ips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ If you are working with Modin DataFrame and would like to unwrap its remote part
for your needs (pass them to another function that will be processed on a concrete node of the cluster,
for instance), you can use IPs of the remote partitions. In that case you can pass the partitions
having needed IPs to your function. It can help with minimazing of data movement between nodes. However,
it worth noticing that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match
actual locations if the partitions are lower 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store
of the calling process. We can't get IPs for such objects with saving good performance. So, you should keep in mind this
when unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in
it is worth noting that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match
actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store
of the calling process. We can't get IPs for such objects while maintaining good performance. So, you should keep in mind this
for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in
``How to handle objects that are lower 100 kB`` section. Wherein, there is no such issue for Modin on ``Dask`` engine
with ``pandas`` backend because ``Dask`` saves any objects in the worker process that processes a function.
Please let us know what you think!
Expand All @@ -22,24 +22,24 @@ the `installation page`_ for more information on installing Modin.
Starting Modin with Partition IPs enabled
------------------------------------------

Partition IPs is detected from an environment variable set in bash.
Partition IPs is detected from an environment variable.

.. code-block:: bash
export MODIN_ENABLE_PARTITIONS_API=true
How to handle objects that are lower 100 kB
How to handle objects that are lower than 100 kB
-------------------------------------------

* If you are sure that each of the remote partitions unwrapped is higher 100 kB, you can just import Modin or perform ``ray.init()`` manually.
* If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ``ray.init()`` manually.

* If you don't know partitions size you can pass the option ``_system_config={"max_direct_call_object_size": <nbytes>,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable:
* If you don't know partition sizes you can pass the option ``_system_config={"max_direct_call_object_size": <nbytes>,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable:

.. code-block:: bash
export MODIN_ON_RAY_PARTITION_THRESHOLD=<nbytes>
When specifying ``nbytes`` is equal to 0, all the objects will be saved to shared-memory object store (plasma).
When specifying ``nbytes`` equal to 0, all the objects will be saved to shared-memory object store (plasma).

* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":<nbytes>}'``.

Expand Down
24 changes: 11 additions & 13 deletions modin/api/partition_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead."
)

if bind_ip and EnablePartitionIPs.get() is False:
ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
)
if bind_ip and not EnablePartitionIPs.get():
raise ValueError("Passed `bind_ip=True` but partition IPs API was not enabled.")

if axis is None:

Expand Down Expand Up @@ -96,9 +94,9 @@ def create_df_from_partitions(partitions, axis):
Parameters
----------
partitions : list
List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used.
Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions
and partitions itself in depend of the engine used.
List of Ray.ObjectRef/Dask.Future referencing partitions depending on the engine used.
Or list of tuples of Ray.ObjectRef/Dask.Future referencing ip addresses of partitions
and partitions themselves depending on the engine used.
axis : None, 0 or 1
The `axis` parameter is used to identify what are the partitions passed.
You have to set:
Expand All @@ -122,9 +120,9 @@ def create_df_from_partitions(partitions, axis):
# When collecting partitions to NumPy array they will be kept row-wise
if axis is None:
if isinstance(partitions[0][0], tuple):
if EnablePartitionIPs.get() is False:
if not EnablePartitionIPs.get():
raise ValueError(
"Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `partitions` with IPs but partition IPs API was not enabled."
)
parts = np.array(
[
Expand All @@ -142,9 +140,9 @@ def create_df_from_partitions(partitions, axis):
# When collecting partitions to NumPy array they will be kept row-wise
elif axis == 0:
if isinstance(partitions[0], tuple):
if EnablePartitionIPs.get() is False:
if not EnablePartitionIPs.get():
raise ValueError(
"Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `partitions` with IPs but partition IPs API was not enabled."
)
parts = np.array(
[[partition_class(partition, ip=ip)] for ip, partition in partitions]
Expand All @@ -154,9 +152,9 @@ def create_df_from_partitions(partitions, axis):
# When collecting partitions to NumPy array they will be kept column-wise
elif axis == 1:
if isinstance(partitions[0], tuple):
if EnablePartitionIPs.get() is False:
if not EnablePartitionIPs.get():
raise ValueError(
"Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `partitions` with IPs but partition IPs API was not enabled."
)
parts = np.array(
[[partition_class(partition, ip=ip) for ip, partition in partitions]]
Expand Down
5 changes: 1 addition & 4 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ class EnablePartitionIPs(EnvironmentVariable, type=bool):
"""

varname = "MODIN_ENABLE_PARTITIONS_IPS"

@classmethod
def _get_default(cls):
return False
default = False


class RayPartitionThreshold(EnvironmentVariable, type=int):
Expand Down
4 changes: 2 additions & 2 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def unwrap(self, squeeze=False, bind_ip=False):
return self.list_of_ips[0], self.list_of_blocks[0]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `bind_ip=True` but partition IPs API was not enabled."
)
else:
return self.list_of_blocks[0]
Expand All @@ -151,7 +151,7 @@ def unwrap(self, squeeze=False, bind_ip=False):
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `bind_ip=True` but partition IPs API was not enabled."
)
else:
return self.list_of_blocks
Expand Down
2 changes: 1 addition & 1 deletion modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, list_of_blocks, bind_ip=False):
self.list_of_ips = [obj.ip for obj in list_of_blocks]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `bind_ip=True` but partition IPs API was not enabled."
)

partition_type = PandasOnDaskFramePartition
Expand Down
5 changes: 4 additions & 1 deletion modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def apply_list_of_funcs(funcs, df):
if isinstance(func, bytes):
func = pkl.loads(func)
df = func(df, **kwargs)
return df, get_ip() if EnablePartitionIPs.get() else df
if EnablePartitionIPs.get():
return df, get_ip()
else:
return df


class PandasOnDaskFramePartition(BaseFramePartition):
Expand Down
7 changes: 2 additions & 5 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
from .partition import PandasOnRayFramePartition

import ray


if EnablePartitionIPs.get():
from ray.services import get_node_ip_address
from ray.services import get_node_ip_address


class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
Expand All @@ -35,7 +32,7 @@ def __init__(self, list_of_blocks, bind_ip=False):
self.list_of_ips = [obj.ip for obj in list_of_blocks]
else:
raise ValueError(
"Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported."
"Passed `bind_ip=True` but partition IPs API was not enabled."
)

partition_type = PandasOnRayFramePartition
Expand Down
41 changes: 32 additions & 9 deletions modin/engines/ray/pandas_on_ray/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@

import ray
from ray.worker import RayTaskError


if EnablePartitionIPs.get():
from ray.services import get_node_ip_address
from ray.services import get_node_ip_address


class PandasOnRayFramePartition(BaseFramePartition):
Expand Down Expand Up @@ -66,11 +63,24 @@ def apply(self, func, **kwargs):
"""
oid = self.oid
call_queue = self.call_queue + [(func, kwargs)]
num_returns = 4 if EnablePartitionIPs.get() else 3
if EnablePartitionIPs.get():
result, length, width, ip = deploy_ray_func.remote(call_queue, oid)
result, length, width, ip = deploy_ray_func._remote(
args=(
call_queue,
oid,
),
num_returns=num_returns,
)
return PandasOnRayFramePartition(result, length, width, ip)
else:
result, length, width = deploy_ray_func.remote(call_queue, oid)
result, length, width = deploy_ray_func._remote(
args=(
call_queue,
oid,
),
num_returns=num_returns,
)
return PandasOnRayFramePartition(result, length, width)

def add_to_apply_calls(self, func, **kwargs):
Expand All @@ -83,19 +93,32 @@ def drain_call_queue(self):
return
oid = self.oid
call_queue = self.call_queue
num_returns = 4 if EnablePartitionIPs.get() else 3
if EnablePartitionIPs.get():
(
self.oid,
self._length_cache,
self._width_cache,
self.ip,
) = deploy_ray_func.remote(call_queue, oid)
) = deploy_ray_func._remote(
args=(
call_queue,
oid,
),
num_returns=num_returns,
)
else:
(
self.oid,
self._length_cache,
self._width_cache,
) = deploy_ray_func.remote(call_queue, oid)
) = deploy_ray_func._remote(
args=(
call_queue,
oid,
),
num_returns=num_returns,
)
self.call_queue = []

def __copy__(self):
Expand Down Expand Up @@ -222,7 +245,7 @@ def get_index_and_columns(df):
return len(df.index), len(df.columns)


@ray.remote(num_returns=4 if EnablePartitionIPs.get() else 3)
@ray.remote
def deploy_ray_func(call_queue, partition): # pragma: no cover
def deserialize(obj):
if isinstance(obj, ray.ObjectID):
Expand Down

0 comments on commit 8282dd1

Please sign in to comment.