Skip to content

Commit

Permalink
REFACTOR-modin-project#2642: Change the way of ip getting
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev committed Feb 18, 2021
1 parent dd20058 commit 9bcfe55
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 40 deletions.
16 changes: 7 additions & 9 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from distributed.client import get_client
from distributed import Future
from distributed.utils import get_ip
import pandas


Expand Down Expand Up @@ -56,7 +55,7 @@ def deploy_axis_func(
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(result_num_splits * 4)
for i in range(result_num_splits * 3)
]

@classmethod
Expand All @@ -80,13 +79,13 @@ def deploy_func_between_two_axis_partitions(
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(num_splits * 4)
for i in range(num_splits * 3)
]

def _wrap_partitions(self, partitions):
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
self.partition_type(future, length, width)
for (future, length, width) in zip(*[iter(partitions)] * 3)
]


Expand Down Expand Up @@ -122,10 +121,9 @@ def deploy_dask_func(func, *args):
The result of the function `func`.
"""
result = func(*args)
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
return result, len(result), len(result.columns)
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
return [i for r in result for i in [r, len(r), len(r.columns)]]
else:
return [i for r in result for i in [r, None, None, ip]]
return [i for r in result for i in [r, None, None]]
24 changes: 13 additions & 11 deletions modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
from modin.data_management.utils import length_fn_pandas, width_fn_pandas
from modin.engines.base.frame.partition import BaseFramePartition

from distributed.client import get_client
from distributed.client import get_client, wait
from distributed import Future
from distributed.utils import get_ip
import cloudpickle as pkl


Expand All @@ -27,7 +26,7 @@ def apply_list_of_funcs(funcs, df):
if isinstance(func, bytes):
func = pkl.loads(func)
df = func(df, **kwargs)
return df, get_ip()
return df


class PandasOnDaskFramePartition(BaseFramePartition):
Expand Down Expand Up @@ -84,8 +83,7 @@ def apply(self, func, **kwargs):
future = get_client().submit(
apply_list_of_funcs, call_queue, self.future, pure=False
)
futures = [get_client().submit(lambda l: l[i], future) for i in range(2)]
return PandasOnDaskFramePartition(futures[0], ip=futures[1])
return PandasOnDaskFramePartition(future)

def add_to_apply_calls(self, func, **kwargs):
return PandasOnDaskFramePartition(
Expand All @@ -95,9 +93,7 @@ def add_to_apply_calls(self, func, **kwargs):
def drain_call_queue(self):
if len(self.call_queue) == 0:
return
new_partition = self.apply(lambda x: x)
self.future = new_partition.future
self._ip_cache = new_partition._ip_cache
self.future = self.apply(lambda x: x).future
self.call_queue = []

def mask(self, row_indices, col_indices):
Expand Down Expand Up @@ -210,9 +206,15 @@ def width(self):

def ip(self):
if self._ip_cache is None:
self._ip_cache = self.apply(lambda df: df)._ip_cache
if isinstance(self._ip_cache, Future):
self._ip_cache = self._ip_cache.result()
if len(self.call_queue):
self.drain_call_queue()
# Since `wait` returns a named tuple of completed and not completed futures as sets
# we need to retrive only completed future via `iter` and `next` from completed set.
future = next(iter(wait([self.future]).done))
ip = get_client().who_has(future)[future.key][0]
# Since dask uses ip addresses like `tcp://<ip>:<port>`
# we need to retrieve `ip` part
self._ip_cache = ip[6:].split(":")[0]
return self._ip_cache

@classmethod
Expand Down
16 changes: 7 additions & 9 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from .partition import PandasOnRayFramePartition

import ray
from ray.services import get_node_ip_address


class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
Expand Down Expand Up @@ -47,7 +46,7 @@ def deploy_axis_func(
maintain_partitioning,
)
+ tuple(partitions),
num_returns=num_splits * 4 if lengths is None else len(lengths) * 4,
num_returns=num_splits * 3 if lengths is None else len(lengths) * 3,
)

@classmethod
Expand All @@ -65,13 +64,13 @@ def deploy_func_between_two_axis_partitions(
kwargs,
)
+ tuple(partitions),
num_returns=num_splits * 4,
num_returns=num_splits * 3,
)

def _wrap_partitions(self, partitions):
return [
self.partition_type(object_id, length, width, ip)
for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4)
self.partition_type(object_id, length, width)
for (object_id, length, width) in zip(*[iter(partitions)] * 3)
]


Expand Down Expand Up @@ -112,10 +111,9 @@ def deploy_ray_func(func, *args): # pragma: no cover
Ray functions are not detected by codecov (thus pragma: no cover)
"""
result = func(*args)
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
return result, len(result), len(result.columns)
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
return [i for r in result for i in [r, len(r), len(r.columns)]]
else:
return [i for r in result for i in [r, None, None, ip]]
return [i for r in result for i in [r, None, None]]
26 changes: 15 additions & 11 deletions modin/engines/ray/pandas_on_ray/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def apply(self, func, **kwargs):
"""
oid = self.oid
call_queue = self.call_queue + [(func, kwargs)]
result, length, width, ip = deploy_ray_func.remote(call_queue, oid)
return PandasOnRayFramePartition(result, length, width, ip)
result, length, width = deploy_ray_func.remote(call_queue, oid)
return PandasOnRayFramePartition(result, length, width)

def add_to_apply_calls(self, func, **kwargs):
return PandasOnRayFramePartition(
Expand All @@ -79,7 +79,6 @@ def drain_call_queue(self):
self.oid,
self._length_cache,
self._width_cache,
self._ip_cache,
) = deploy_ray_func.remote(call_queue, oid)
self.call_queue = []

Expand Down Expand Up @@ -197,13 +196,19 @@ def ip(self):
if self._ip_cache is None:
if len(self.call_queue):
self.drain_call_queue()
# Since `ray.wait` returns a tuple of completed and not completed object refs as lists
# we need to retrive only completed object ref via `[0][0]` indexing.
oid = ray.wait([self.oid])[0][0]
locations = ray.objects(oid.hex()).get("Locations", None)
# `oid` is stored in plasma store
if locations:
for node in ray.nodes():
if locations[0] == node["NodeID"]:
self._ip_cache = node["NodeManagerAddress"]
break
# `oid` is stored in in-process store
else:
self._ip_cache = self.apply(lambda df: df)._ip_cache
if isinstance(self._ip_cache, ray.ObjectID):
try:
self._ip_cache = ray.get(self._ip_cache)
except RayTaskError as e:
handle_ray_task_error(e)
self._ip_cache = get_node_ip_address()
return self._ip_cache

@classmethod
Expand All @@ -224,7 +229,7 @@ def get_index_and_columns(df):
return len(df.index), len(df.columns)


@ray.remote(num_returns=4)
@ray.remote(num_returns=3)
def deploy_ray_func(call_queue, partition): # pragma: no cover
def deserialize(obj):
if isinstance(obj, ray.ObjectRef):
Expand Down Expand Up @@ -253,5 +258,4 @@ def deserialize(obj):
result,
len(result) if hasattr(result, "__len__") else 0,
len(result.columns) if hasattr(result, "columns") else 0,
get_node_ip_address(),
)

0 comments on commit 9bcfe55

Please sign in to comment.