Skip to content

Commit

Permalink
FEAT-modin-project#2606: Support creating DataFrame from remote parti…
Browse files Browse the repository at this point in the history
…tions

Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev committed Jan 20, 2021
1 parent 2f880c1 commit 0ec20aa
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 6 deletions.
4 changes: 2 additions & 2 deletions modin/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from .partition_api import unwrap_partitions
from .partition_api import unwrap_partitions, create_df_from_partitions

__all__ = ["unwrap_partitions"]
__all__ = ["unwrap_partitions", "create_df_from_partitions"]
118 changes: 114 additions & 4 deletions modin/api/partition_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import numpy as np

from modin.pandas.dataframe import DataFrame
from modin.backends.pandas.query_compiler import PandasQueryCompiler


def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
"""
Expand Down Expand Up @@ -47,15 +52,13 @@ def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
def _unwrap_partitions(oid):
if bind_ip:
return [
(partition.ip, getattr(partition, oid))
[(partition.ip, getattr(partition, oid)) for partition in row]
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]
else:
return [
getattr(partition, oid)
[getattr(partition, oid) for partition in row]
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]

actual_engine = type(
Expand All @@ -78,3 +81,110 @@ def _unwrap_partitions(oid):
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
for part in partitions
]


def create_df_from_partitions(partitions, axis, engine):
"""
Create DataFrame from remote partitions.
Parameters
----------
partitions : list
List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used.
Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions
and partitions itself in depend of the engine used.
axis : None, 0 or 1
The `axis` parameter is used to identify what is the partitions passed.
You have to set:
- `axis` to 0 if you want to create DataFrame from row partitions.
- `axis` to 1 if you want to create DataFrame from column partitions.
- `axis` to None if you want to create DataFrame from 2D list of partitions.
engine : int
The parameter is used to choose the engine for remote partitions which DataFrame will be created from.
Possible variants: ("PandasOnRayFramePartition", "PandasOnDaskFramePartition").
You have to specify index of the set: 0 - PandasOnRayFramePartition, 1 - PandasOnDaskFramePartition.
Returns
-------
DataFrame
DataFrame instance created from remote partitions.
"""
if engine == 0:
from modin.engines.ray.pandas_on_ray.frame.partition import (
PandasOnRayFramePartition,
)
from modin.engines.ray.pandas_on_ray.frame.partition_manager import (
PandasOnRayFrameManager,
)
from modin.engines.ray.pandas_on_ray.frame.partition_manager import (
PandasOnRayFrame,
)

partition_class = PandasOnRayFramePartition
partition_frame_class = PandasOnRayFrame
partition_mgr_class = PandasOnRayFrameManager
elif engine == 1:
from modin.engines.dask.pandas_on_dask.frame.partition import (
PandasOnDaskFramePartition,
)
from modin.engines.dask.pandas_on_dask.frame.partition_manager import (
DaskFrameManager,
)
from modin.engines.dask.pandas_on_dask.frame.data import PandasOnDaskFrame

partition_class = PandasOnDaskFramePartition
partition_frame_class = PandasOnDaskFrame
partition_mgr_class = DaskFrameManager
else:
raise ValueError(
f"Got unacceptable index {engine} of the engines' list. Possible variants are {0} or {1}."
)

if axis is None:
if isinstance(partitions[0][0], tuple):
parts = np.array(
[
[partition_class(partition, ip=ip) for ip, partition in row]
for row in partitions
]
)
else:
parts = np.array(
[
[partition_class(partition) for partition in row]
for row in partitions
]
)
else:
if axis == 0:
if isinstance(partitions[0], tuple):
parts = np.array(
[
[partition_class(partition, ip=ip)]
for ip, partition in partitions
]
)
else:
parts = np.array(
[[partition_class(partition)] for partition in partitions]
)
elif axis == 1:
if isinstance(partitions[0], tuple):
parts = np.array(
[
[
partition_class(partition, ip=ip)
for ip, partition in partitions
]
]
)
else:
parts = np.array(
[[partition_class(partition) for partition in partitions]]
)

index = partition_mgr_class.get_indices(0, parts, lambda df: df.axes[0])
columns = partition_mgr_class.get_indices(1, parts, lambda df: df.axes[1])
return DataFrame(
query_compiler=PandasQueryCompiler(partition_frame_class(parts, index, columns))
)

0 comments on commit 0ec20aa

Please sign in to comment.