Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#2642: Refactor partition API #2643

Merged
merged 15 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/developer/pandas/partition_api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
Partition API in Modin
======================

When you are working with a Modin Dataframe, you can unwrap its remote partitions
to get the raw futures objects compatible with the execution engine (e.g. ``ray.ObjectRef`` for Ray).
In addition to unwrapping of the remote partitions we also provide an API to construct a ``modin.pandas.DataFrame``
from raw futures objects.

Partition IPs
-------------
For finer grained placement control, Modin also provides an API to get the IP addresses of the nodes that hold each partition.
You can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes.

unwrap_partitions
-----------------

.. automodule:: modin.distributed.dataframe.pandas
:noindex:
:members: unwrap_partitions

from_partitions
---------------

.. automodule:: modin.distributed.dataframe.pandas
:noindex:
:members: from_partitions

Example
-------

.. code-block:: python

import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions
import numpy as np
data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8))
df = pd.DataFrame(data)
partitions = unwrap_partitions(df, axis=0, get_ip=True)
print(partitions)
new_df = from_partitions(partitions, axis=0)
print(new_df)

Ray engine
----------
However, it is worth noting that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match
actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store
of the calling process (please, refer to `Ray documentation`_ for more information). We can't get IPs for such objects while maintaining good performance.
So, you should keep in mind this for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in
``How to handle Ray objects that are lower 100 kB`` section.

Dask engine
-----------
There is no mentioned above issue for Modin on ``Dask`` engine with ``pandas`` backend because ``Dask`` saves any objects
in the worker process that processes a function (please, refer to `Dask documentation`_ for more information).

How to handle Ray objects that are lower than 100 kB
----------------------------------------------------

* If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ``ray.init()`` manually.

* If you don't know partition sizes you can pass the option ``_system_config={"max_direct_call_object_size": <nbytes>,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()``.

* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":<nbytes>}'``.

Note that when specifying the threshold the performance of some Modin operations may change.

.. _`Ray documentation`: https://docs.ray.io/en/master/index.html#
.. _`Dask documentation`: https://distributed.dask.org/en/latest/index.html
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ nature, you get a fast DataFrame at 1MB and 1TB+.

contributing
developer/architecture
developer/pandas/partition_api

.. toctree::
:caption: Engines, Backends, and APIs
Expand Down
45 changes: 26 additions & 19 deletions modin/distributed/dataframe/pandas/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,35 @@

import numpy as np

from modin.pandas.dataframe import DataFrame
from modin.backends.pandas.query_compiler import PandasQueryCompiler
from modin.pandas.dataframe import DataFrame


def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
def unwrap_partitions(api_layer_object, axis=None, get_ip=False):
"""
Unwrap partitions of the `api_layer_object`.
Unwrap partitions of the ``api_layer_object``.

Parameters
----------
api_layer_object : DataFrame or Series
The API layer object.
axis : None, 0 or 1. Default is None
The axis to unwrap partitions for (0 - row partitions, 1 - column partitions).
If axis is None, all the partitions of the API layer object are unwrapped.
bind_ip : boolean. Default is False
Whether to bind node ip address to each partition or not.
If ``axis is None``, the partitions are unwrapped as they are currently stored.
get_ip : boolean. Default is False
Whether to get node ip address to each partition or not.

Returns
-------
list
A list of Ray.ObjectRef/Dask.Future to partitions of the `api_layer_object`
A list of Ray.ObjectRef/Dask.Future to partitions of the ``api_layer_object``
if Ray/Dask is used as an engine.

Notes
-----
In case bind_ip=True, a list containing tuples of Ray.ObjectRef/Dask.Future to node ip addresses
and partitions of the `api_layer_object`, respectively, is returned if Ray/Dask is used as an engine.
If ``get_ip=True``, a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and
partitions of the ``api_layer_object``, respectively, is returned if Ray/Dask is used as an engine
(i.e. ``[(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]``).
"""
if not hasattr(api_layer_object, "_query_compiler"):
raise ValueError(
Expand All @@ -50,9 +51,12 @@ def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
if axis is None:

def _unwrap_partitions(oid):
if bind_ip:
if get_ip:
return [
[(partition.ip, getattr(partition, oid)) for partition in row]
[
(partition._ip_cache, getattr(partition, oid))
for partition in row
]
for row in api_layer_object._query_compiler._modin_frame._partitions
]
else:
Expand All @@ -78,7 +82,9 @@ def _unwrap_partitions(oid):
)
)
return [
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
part.force_materialization(get_ip=get_ip).unwrap(
squeeze=True, get_ip=get_ip
)
for part in partitions
]

Expand All @@ -90,15 +96,16 @@ def from_partitions(partitions, axis):
Parameters
----------
partitions : list
List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used.
Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions
and partitions itself in depend of the engine used.
A list of Ray.ObjectRef/Dask.Future to partitions depending on the engine used.
Or a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and partitions
depending on the engine used (i.e. ``[(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]``).
axis : None, 0 or 1
The `axis` parameter is used to identify what are the partitions passed.
The ``axis`` parameter is used to identify what are the partitions passed.
You have to set:
- `axis` to 0 if you want to create DataFrame from row partitions.
- `axis` to 1 if you want to create DataFrame from column partitions.
- `axis` to None if you want to create DataFrame from 2D list of partitions.

* ``axis=0`` if you want to create DataFrame from row partitions
* ``axis=1`` if you want to create DataFrame from column partitions
* ``axis=None`` if you want to create DataFrame from 2D list of partitions

Returns
-------
Expand Down
34 changes: 18 additions & 16 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,52 +95,54 @@ def shuffle(self, func, lengths, **kwargs):
def _wrap_partitions(self, partitions):
return [self.partition_type(obj) for obj in partitions]

def coalesce(self, bind_ip=False):
def force_materialization(self, get_ip=False):
"""
Coalesce the axis partitions into a single partition.
Materialize axis partitions into a single partition.

Parameters
----------
bind_ip : boolean, default False
Whether to bind node ip address to a single partition or not.
get_ip : boolean, default False
Whether to get node ip address to a single partition or not.

Returns
-------
BaseFrameAxisPartition
An axis partition containing only a single coalesced partition.
An axis partition containing only a single materialized partition.
"""
coalesced = self.apply(lambda x: x, num_splits=1, maintain_partitioning=False)
return type(self)(coalesced, bind_ip=bind_ip)
materialized = self.apply(
lambda x: x, num_splits=1, maintain_partitioning=False
)
return type(self)(materialized, get_ip=get_ip)

def unwrap(self, squeeze=False, bind_ip=False):
def unwrap(self, squeeze=False, get_ip=False):
"""
Unwrap partitions from axis partition.

Parameters
----------
squeeze : boolean, default False
The flag used to unwrap only one partition.
bind_ip : boolean, default False
Whether to bind node ip address to each partition or not.
get_ip : boolean, default False
Whether to get node ip address to each partition or not.

Returns
-------
list
List of partitions from axis partition.
A list of partitions from axis partition.

Notes
-----
In case bind_ip=True, list containing tuples of Ray.ObjectRef/Dask.Future
to node ip addresses and unwrapped partitions, respectively, is returned
if Ray/Dask is used as an engine.
If `get_ip=True`, a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and
unwrapped partitions, respectively, is returned if Ray/Dask is used as an engine
(i.e. [(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]).
"""
if squeeze and len(self.list_of_blocks) == 1:
if bind_ip:
if get_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
else:
return self.list_of_blocks[0]
else:
if bind_ip:
if get_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
return self.list_of_blocks
Expand Down
6 changes: 3 additions & 3 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@


class PandasOnDaskFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks, bind_ip=False):
def __init__(self, list_of_blocks, get_ip=False):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.future for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]
if get_ip:
self.list_of_ips = [obj._ip_cache for obj in list_of_blocks]
williamma12 marked this conversation as resolved.
Show resolved Hide resolved

partition_type = PandasOnDaskFramePartition
instance_type = Future
Expand Down
24 changes: 18 additions & 6 deletions modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

import pandas

from modin.engines.base.frame.partition import BaseFramePartition
from modin.data_management.utils import length_fn_pandas, width_fn_pandas
from modin.engines.base.frame.partition import BaseFramePartition

from distributed.client import get_client
from distributed import Future
from distributed.utils import get_ip
import cloudpickle as pkl

Expand Down Expand Up @@ -48,7 +49,7 @@ def __init__(self, future, length=None, width=None, ip=None, call_queue=None):
self.call_queue = call_queue
self._length_cache = length
self._width_cache = width
self.ip = ip
self._ip_cache = ip

def get(self):
"""Flushes the call_queue and returns the data.
Expand Down Expand Up @@ -96,7 +97,7 @@ def drain_call_queue(self):
return
new_partition = self.apply(lambda x: x)
self.future = new_partition.future
self.ip = new_partition.ip
self._ip_cache = new_partition._ip_cache
self.call_queue = []

def mask(self, row_indices, col_indices):
Expand All @@ -111,7 +112,11 @@ def mask(self, row_indices, col_indices):

def __copy__(self):
return PandasOnDaskFramePartition(
self.future, self._length_cache, self._width_cache
self.future,
length=self._length_cache,
width=self._width_cache,
ip=self._ip_cache,
call_queue=self.call_queue,
)

def to_pandas(self):
Expand Down Expand Up @@ -192,17 +197,24 @@ def width_extraction_fn(cls):
def length(self):
if self._length_cache is None:
self._length_cache = self.apply(lambda df: len(df)).future
if isinstance(self._length_cache, type(self.future)):
if isinstance(self._length_cache, Future):
self._length_cache = self._length_cache.result()
return self._length_cache

def width(self):
if self._width_cache is None:
self._width_cache = self.apply(lambda df: len(df.columns)).future
if isinstance(self._width_cache, type(self.future)):
if isinstance(self._width_cache, Future):
self._width_cache = self._width_cache.result()
return self._width_cache

def ip(self):
if self._ip_cache is None:
self._ip_cache = self.apply(lambda df: df)._ip_cache
if isinstance(self._ip_cache, Future):
self._ip_cache = self._ip_cache.result()
return self._ip_cache

@classmethod
def empty(cls):
return cls(pandas.DataFrame(), 0, 0)
6 changes: 3 additions & 3 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@


class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks, bind_ip=False):
def __init__(self, list_of_blocks, get_ip=False):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]
if get_ip:
self.list_of_ips = [obj._ip_cache for obj in list_of_blocks]

partition_type = PandasOnRayFramePartition
instance_type = ray.ObjectRef
Expand Down
Loading