diff --git a/docs/partition_ips.rst b/docs/partition_ips.rst index 3fa73839722..46ad67a849f 100644 --- a/docs/partition_ips.rst +++ b/docs/partition_ips.rst @@ -5,10 +5,10 @@ If you are working with Modin DataFrame and would like to unwrap its remote part for your needs (pass them to another function that will be processed on a concrete node of the cluster, for instance), you can use IPs of the remote partitions. In that case you can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. However, -it worth noticing that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match -actual locations if the partitions are lower 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store -of the calling process. We can't get IPs for such objects with saving good performance. So, you should keep in mind this -when unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in +it is worth noting that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match +actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store +of the calling process. We can't get IPs for such objects while maintaining good performance. So, you should keep in mind this +for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in ``How to handle objects that are lower 100 kB`` section. Wherein, there is no such issue for Modin on ``Dask`` engine with ``pandas`` backend because ``Dask`` saves any objects in the worker process that processes a function. Please let us know what you think! @@ -22,24 +22,24 @@ the `installation page`_ for more information on installing Modin. Starting Modin with Partition IPs enabled ------------------------------------------ -Partition IPs is detected from an environment variable set in bash. +Partition IPs is detected from an environment variable. .. code-block:: bash export MODIN_ENABLE_PARTITIONS_API=true -How to handle objects that are lower 100 kB +How to handle objects that are lower than 100 kB ------------------------------------------- -* If you are sure that each of the remote partitions unwrapped is higher 100 kB, you can just import Modin or perform ``ray.init()`` manually. +* If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ``ray.init()`` manually. -* If you don't know partitions size you can pass the option ``_system_config={"max_direct_call_object_size": ,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable: +* If you don't know partition sizes you can pass the option ``_system_config={"max_direct_call_object_size": ,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable: .. code-block:: bash export MODIN_ON_RAY_PARTITION_THRESHOLD= -When specifying ``nbytes`` is equal to 0, all the objects will be saved to shared-memory object store (plasma). +When specifying ``nbytes`` equal to 0, all the objects will be saved to shared-memory object store (plasma). * You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":}'``. diff --git a/modin/api/partition_api.py b/modin/api/partition_api.py index 2dcaec3e0b0..9236f0152ef 100644 --- a/modin/api/partition_api.py +++ b/modin/api/partition_api.py @@ -48,10 +48,8 @@ def unwrap_partitions(api_layer_object, axis=None, bind_ip=False): f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead." ) - if bind_ip and EnablePartitionIPs.get() is False: - ValueError( - "Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." - ) + if bind_ip and not EnablePartitionIPs.get(): + raise ValueError("Passed `bind_ip=True` but partition IPs API was not enabled.") if axis is None: @@ -96,9 +94,9 @@ def create_df_from_partitions(partitions, axis): Parameters ---------- partitions : list - List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used. - Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions - and partitions itself in depend of the engine used. + List of Ray.ObjectRef/Dask.Future referencing partitions depending on the engine used. + Or list of tuples of Ray.ObjectRef/Dask.Future referencing ip addresses of partitions + and partitions themselves depending on the engine used. axis : None, 0 or 1 The `axis` parameter is used to identify what are the partitions passed. You have to set: @@ -122,9 +120,9 @@ def create_df_from_partitions(partitions, axis): # When collecting partitions to NumPy array they will be kept row-wise if axis is None: if isinstance(partitions[0][0], tuple): - if EnablePartitionIPs.get() is False: + if not EnablePartitionIPs.get(): raise ValueError( - "Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `partitions` with IPs but partition IPs API was not enabled." ) parts = np.array( [ @@ -142,9 +140,9 @@ def create_df_from_partitions(partitions, axis): # When collecting partitions to NumPy array they will be kept row-wise elif axis == 0: if isinstance(partitions[0], tuple): - if EnablePartitionIPs.get() is False: + if not EnablePartitionIPs.get(): raise ValueError( - "Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `partitions` with IPs but partition IPs API was not enabled." ) parts = np.array( [[partition_class(partition, ip=ip)] for ip, partition in partitions] @@ -154,9 +152,9 @@ def create_df_from_partitions(partitions, axis): # When collecting partitions to NumPy array they will be kept column-wise elif axis == 1: if isinstance(partitions[0], tuple): - if EnablePartitionIPs.get() is False: + if not EnablePartitionIPs.get(): raise ValueError( - "Passed `partitions` with IPs but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `partitions` with IPs but partition IPs API was not enabled." ) parts = np.array( [[partition_class(partition, ip=ip) for ip, partition in partitions]] diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 6c01fdb17d9..af637acf36f 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -161,10 +161,7 @@ class EnablePartitionIPs(EnvironmentVariable, type=bool): """ varname = "MODIN_ENABLE_PARTITIONS_IPS" - - @classmethod - def _get_default(cls): - return False + default = False class RayPartitionThreshold(EnvironmentVariable, type=int): diff --git a/modin/engines/base/frame/axis_partition.py b/modin/engines/base/frame/axis_partition.py index b493d071cb7..42abf2c0da1 100644 --- a/modin/engines/base/frame/axis_partition.py +++ b/modin/engines/base/frame/axis_partition.py @@ -141,7 +141,7 @@ def unwrap(self, squeeze=False, bind_ip=False): return self.list_of_ips[0], self.list_of_blocks[0] else: raise ValueError( - "Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `bind_ip=True` but partition IPs API was not enabled." ) else: return self.list_of_blocks[0] @@ -151,7 +151,7 @@ def unwrap(self, squeeze=False, bind_ip=False): return list(zip(self.list_of_ips, self.list_of_blocks)) else: raise ValueError( - "Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `bind_ip=True` but partition IPs API was not enabled." ) else: return self.list_of_blocks diff --git a/modin/engines/dask/pandas_on_dask/frame/axis_partition.py b/modin/engines/dask/pandas_on_dask/frame/axis_partition.py index 4f8c29c12d4..fae7651014b 100644 --- a/modin/engines/dask/pandas_on_dask/frame/axis_partition.py +++ b/modin/engines/dask/pandas_on_dask/frame/axis_partition.py @@ -32,7 +32,7 @@ def __init__(self, list_of_blocks, bind_ip=False): self.list_of_ips = [obj.ip for obj in list_of_blocks] else: raise ValueError( - "Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `bind_ip=True` but partition IPs API was not enabled." ) partition_type = PandasOnDaskFramePartition diff --git a/modin/engines/dask/pandas_on_dask/frame/partition.py b/modin/engines/dask/pandas_on_dask/frame/partition.py index 8a9cf4768b5..c1663c78666 100644 --- a/modin/engines/dask/pandas_on_dask/frame/partition.py +++ b/modin/engines/dask/pandas_on_dask/frame/partition.py @@ -27,7 +27,10 @@ def apply_list_of_funcs(funcs, df): if isinstance(func, bytes): func = pkl.loads(func) df = func(df, **kwargs) - return df, get_ip() if EnablePartitionIPs.get() else df + if EnablePartitionIPs.get(): + return df, get_ip() + else: + return df class PandasOnDaskFramePartition(BaseFramePartition): diff --git a/modin/engines/ray/pandas_on_ray/frame/axis_partition.py b/modin/engines/ray/pandas_on_ray/frame/axis_partition.py index fcabcfe1d40..9f3ff613d9b 100644 --- a/modin/engines/ray/pandas_on_ray/frame/axis_partition.py +++ b/modin/engines/ray/pandas_on_ray/frame/axis_partition.py @@ -18,10 +18,7 @@ from .partition import PandasOnRayFramePartition import ray - - -if EnablePartitionIPs.get(): - from ray.services import get_node_ip_address +from ray.services import get_node_ip_address class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition): @@ -35,7 +32,7 @@ def __init__(self, list_of_blocks, bind_ip=False): self.list_of_ips = [obj.ip for obj in list_of_blocks] else: raise ValueError( - "Passed `bind_ip=True` but `MODIN_ENABLE_PARTITIONS_API` env var was not exported." + "Passed `bind_ip=True` but partition IPs API was not enabled." ) partition_type = PandasOnRayFramePartition diff --git a/modin/engines/ray/pandas_on_ray/frame/partition.py b/modin/engines/ray/pandas_on_ray/frame/partition.py index a06294b929e..d69c49266a0 100644 --- a/modin/engines/ray/pandas_on_ray/frame/partition.py +++ b/modin/engines/ray/pandas_on_ray/frame/partition.py @@ -20,10 +20,7 @@ import ray from ray.worker import RayTaskError - - -if EnablePartitionIPs.get(): - from ray.services import get_node_ip_address +from ray.services import get_node_ip_address class PandasOnRayFramePartition(BaseFramePartition): @@ -66,11 +63,24 @@ def apply(self, func, **kwargs): """ oid = self.oid call_queue = self.call_queue + [(func, kwargs)] + num_returns = 4 if EnablePartitionIPs.get() else 3 if EnablePartitionIPs.get(): - result, length, width, ip = deploy_ray_func.remote(call_queue, oid) + result, length, width, ip = deploy_ray_func._remote( + args=( + call_queue, + oid, + ), + num_returns=num_returns, + ) return PandasOnRayFramePartition(result, length, width, ip) else: - result, length, width = deploy_ray_func.remote(call_queue, oid) + result, length, width = deploy_ray_func._remote( + args=( + call_queue, + oid, + ), + num_returns=num_returns, + ) return PandasOnRayFramePartition(result, length, width) def add_to_apply_calls(self, func, **kwargs): @@ -83,19 +93,32 @@ def drain_call_queue(self): return oid = self.oid call_queue = self.call_queue + num_returns = 4 if EnablePartitionIPs.get() else 3 if EnablePartitionIPs.get(): ( self.oid, self._length_cache, self._width_cache, self.ip, - ) = deploy_ray_func.remote(call_queue, oid) + ) = deploy_ray_func._remote( + args=( + call_queue, + oid, + ), + num_returns=num_returns, + ) else: ( self.oid, self._length_cache, self._width_cache, - ) = deploy_ray_func.remote(call_queue, oid) + ) = deploy_ray_func._remote( + args=( + call_queue, + oid, + ), + num_returns=num_returns, + ) self.call_queue = [] def __copy__(self): @@ -222,7 +245,7 @@ def get_index_and_columns(df): return len(df.index), len(df.columns) -@ray.remote(num_returns=4 if EnablePartitionIPs.get() else 3) +@ray.remote def deploy_ray_func(call_queue, partition): # pragma: no cover def deserialize(obj): if isinstance(obj, ray.ObjectID):