Skip to content

Commit

Permalink
Rewrite loc (#20)
Browse files Browse the repository at this point in the history
* Rewrite the rewrite

Finish implement loc/iloc

Remove debug lines, fix typo

Removing unused imports

* Removing dead code

* Changing naming of clone

* Formatting and removing dead code

* Moving imports for matching pandas
  • Loading branch information
devin-petersohn authored Sep 17, 2018
1 parent a3e7f3e commit fd24d0f
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 748 deletions.
294 changes: 209 additions & 85 deletions modin/data_management/data_manager.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions modin/data_management/partitioning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
## Implementation Note

### Object Hierarchy

- `remote_partition.py` contains `RemotePartition` interface and its implementations.
- `partition_collections.py` contains `BlockPartitions` interface and its implementations.
- `BlockPartitions` manages 2D-array of `RemotePartition` object
- `axis_partition.py` contains `AxisPartition` and with the following hierarchy:
```
AxisPartition -> RayAxisPartition -> {RayColumnPartition, RayRowPartition}
```
- `AxisPartition` is a high level view onto BlockPartitions' data. It is more
convient to operate on `AxisPartition` sometimes.
94 changes: 93 additions & 1 deletion modin/data_management/partitioning/partition_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from __future__ import division
from __future__ import print_function

from typing import Tuple

import numpy as np
import ray
import pandas

from .remote_partition import RayRemotePartition
from .axis_partition import RayColumnPartition, RayRowPartition
from .utils import compute_chunksize
from .utils import compute_chunksize, _get_nan_block_id


class BlockPartitions(object):
Expand Down Expand Up @@ -120,6 +122,10 @@ def block_widths(self):
self._widths_cache = [obj.width().get() for obj in self.partitions[0]]
return self._widths_cache

@property
def shape(self) -> Tuple[int, int]:
return int(np.sum(self.block_lengths)), int(np.sum(self.block_widths))

def full_reduce(self, map_func, reduce_func, axis):
"""Perform a full reduce on the data.
Expand Down Expand Up @@ -173,6 +179,15 @@ def map_across_blocks(self, map_func):
new_partitions = np.array([[part.apply(preprocessed_map_func) for part in row_of_parts] for row_of_parts in self.partitions])
return cls(new_partitions)

def lazy_map_across_blocks(self, map_func, kwargs):
cls = type(self)
preprocessed_map_func = self.preprocess_func(map_func)
new_partitions = np.array(
[[part.add_to_apply_calls(preprocessed_map_func, kwargs) for part in row_of_parts]
for row_of_parts in self.partitions])
return cls(new_partitions)


def map_across_full_axis(self, axis, map_func):
"""Applies `map_func` to every partition.
Expand Down Expand Up @@ -633,6 +648,65 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep

return cls(result.T) if not axis else cls(result)


def apply_func_to_indices_both_axis(self, func, row_indices, col_indices,
lazy=False, keep_remaining=True, mutate=False,
item_to_distribute=None):
"""
Apply a function to along both axis
Important: For your func to operate directly on the indices provided,
it must use `row_internal_indices, col_internal_indices` as keyword arguments.
"""
cls = type(self)

if not mutate:
partition_copy = self.partitions.copy()
else:
partition_copy = self.partitions

operation_mask = np.full(self.partitions.shape, False)

row_position_counter = 0
for row_blk_idx, row_internal_idx in self._get_dict_of_block_index(1, row_indices).items():
col_position_counter = 0
for col_blk_idx, col_internal_idx in self._get_dict_of_block_index(0, col_indices).items():
remote_part = partition_copy[row_blk_idx, col_blk_idx]

if item_to_distribute is not None:
item = item_to_distribute[
row_position_counter:row_position_counter+len(row_internal_idx),
col_position_counter:col_position_counter+len(col_internal_idx)
]
item = {'item': item}
else:
item = dict()

if lazy:
result = remote_part.add_to_apply_calls(func,
row_internal_indices=row_internal_idx,
col_internal_indices=col_internal_idx,
**item)
else:
result = remote_part.apply(func,
row_internal_indices=row_internal_idx,
col_internal_indices=col_internal_idx,
**item)

partition_copy[row_blk_idx, col_blk_idx] = result
operation_mask[row_blk_idx, col_blk_idx] = True

col_position_counter += len(col_internal_idx)
row_position_counter += len(row_internal_idx)

column_idx = np.where(np.any(operation_mask, axis=0))[0]
row_idx = np.where(np.any(operation_mask, axis=1))[0]
if not keep_remaining:
partition_copy = partition_copy[row_idx][:, column_idx]

return cls(partition_copy)


def inter_data_operation(self, axis, func, other):
"""Apply a function that requires two BlockPartitions objects.
Expand Down Expand Up @@ -686,6 +760,24 @@ def __getitem__(self, key):
def __len__(self):
return sum(self.block_lengths)

def enlarge_partitions(self, n_rows=None, n_cols=None):
data = self.partitions
block_partitions_cls = type(self)

if n_rows:
n_cols_lst = self.block_widths
nan_oids_lst = [self._partition_class(_get_nan_block_id(self._partition_class, n_rows, n_cols_)) for n_cols_ in n_cols_lst]
new_chunk = block_partitions_cls(np.array([nan_oids_lst]))
data = self.concat(axis=0, other_blocks=new_chunk)

if n_cols:
n_rows_lst = self.block_lengths
nan_oids_lst = [self._partition_class(_get_nan_block_id(self._partition_class, n_rows_, n_cols)) for n_rows_ in n_rows_lst]
new_chunk = block_partitions_cls(np.array([nan_oids_lst]).T)
data = self.concat(axis=1, other_blocks=new_chunk)

return data


class RayBlockPartitions(BlockPartitions):
"""This method implements the interface in `BlockPartitions`."""
Expand Down
50 changes: 48 additions & 2 deletions modin/data_management/partitioning/remote_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def apply(self, func, **kwargs):
"""
raise NotImplementedError("Must be implemented in child class")

def add_to_apply_calls(self, func, **kwargs):
"""Add the function to the apply function call stack.
This function will be executed when apply is called. It will be executed
in the order inserted; apply's func operates the last and return
"""
raise NotImplementedError("Must be implemented in child class")

def to_pandas(self):
"""Convert the object stored in this partition to a Pandas DataFrame.
Expand Down Expand Up @@ -128,20 +136,28 @@ def width(self):
self._width_cache = self.apply(preprocessed_func)
return self._width_cache

@classmethod
def empty(cls):
raise NotImplementedError("To be implemented in the child class!")


class RayRemotePartition(RemotePartition):

def __init__(self, object_id):
assert type(object_id) is ray.ObjectID

self.oid = object_id
self.call_queue = []

def get(self):
"""Gets the object out of the plasma store.
Returns:
The object from the plasma store.
"""
if len(self.call_queue):
return self.apply(lambda x: x).get()

return ray.get(self.oid)

def apply(self, func, **kwargs):
Expand All @@ -157,8 +173,34 @@ def apply(self, func, **kwargs):
Returns:
A RayRemotePartition object.
"""
new_oid = deploy_ray_func.remote(func, self.oid, kwargs)
return RayRemotePartition(new_oid)
oid = self.oid
self.call_queue.append((func, kwargs))

def call_queue_closure(oid_obj, call_queues):

for func, kwargs in call_queues:
if isinstance(func, ray.ObjectID):
func = ray.get(func)
if isinstance(kwargs, ray.ObjectID):
kwargs = ray.get(kwargs)

oid_obj = func(oid_obj, **kwargs)

return oid_obj

oid = deploy_ray_func.remote(call_queue_closure, oid, kwargs={'call_queues': self.call_queue})
self.call_queue = []

return RayRemotePartition(oid)


def add_to_apply_calls(self, func, **kwargs):
self.call_queue.append((func, kwargs))
return self


def __copy__(self):
return RayRemotePartition(object_id=self.oid)

def to_pandas(self):
"""Convert the object stored in this partition to a Pandas DataFrame.
Expand Down Expand Up @@ -203,6 +245,10 @@ def length_extraction_fn(cls):
def width_extraction_fn(cls):
return width_fn_pandas

@classmethod
def empty(cls):
return cls.put(pandas.DataFrame())


def length_fn_pandas(df):
assert isinstance(df, (pandas.DataFrame, pandas.Series))
Expand Down
26 changes: 26 additions & 0 deletions modin/data_management/partitioning/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,34 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas


def compute_chunksize(length, num_splits):
# We do this to avoid zeros and having an extremely large last partition
return length // num_splits if length % num_splits == 0 \
else length // num_splits + 1


def _get_nan_block_id(partition_class, n_row=1, n_col=1, transpose=False):
"""A memory efficient way to get a block of NaNs.
Args:
partition_class (RemotePartition): The class to use to put the object
in the remote format.
n_row(int): The number of rows.
n_col(int): The number of columns.
transpose(bool): If true, swap rows and columns.
Returns:
ObjectID of the NaN block.
"""
global _NAN_BLOCKS
if transpose:
n_row, n_col = n_col, n_row
shape = (n_row, n_col)
if shape not in _NAN_BLOCKS:
arr = np.tile(np.array(np.NaN), shape)
# TODO Not use pandas.DataFrame here, but something more general.
_NAN_BLOCKS[shape] = partition_class.put(pandas.DataFrame(data=arr))
return _NAN_BLOCKS[shape]
29 changes: 8 additions & 21 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
import ray

from .. import __git_revision__, __version__
from .concat import concat
from .dataframe import DataFrame
from .datetimes import to_datetime
from .io import (
read_csv, read_parquet, read_json, read_html, read_clipboard, read_excel,
read_hdf, read_feather, read_msgpack, read_stata, read_sas, read_pickle,
read_sql)
from .reshape import get_dummies

try:
if threading.current_thread().name == "MainThread":
Expand All @@ -30,27 +38,6 @@
num_cpus = ray.global_state.cluster_resources()['CPU']
DEFAULT_NPARTITIONS = int(num_cpus)


def set_npartition_default(n):
global DEFAULT_NPARTITIONS
DEFAULT_NPARTITIONS = n


def get_npartitions():
return DEFAULT_NPARTITIONS


# We import these file after above two function
# because they depend on npartitions.
from .concat import concat # noqa: 402
from .dataframe import DataFrame # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .io import ( # noqa: 402
read_csv, read_parquet, read_json, read_html, read_clipboard, read_excel,
read_hdf, read_feather, read_msgpack, read_stata, read_sas, read_pickle,
read_sql)
from .reshape import get_dummies # noqa: 402

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "read_json",
"read_html", "read_clipboard", "read_excel", "read_hdf", "read_feather",
Expand Down
8 changes: 4 additions & 4 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4396,8 +4396,8 @@ def loc(self):
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
from .indexing import _Loc_Indexer
return _Loc_Indexer(self)
from .indexing import _LocIndexer
return _LocIndexer(self)

@property
def is_copy(self):
Expand All @@ -4422,8 +4422,8 @@ def iloc(self):
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
from .indexing import _iLoc_Indexer
return _iLoc_Indexer(self)
from .indexing import _iLocIndexer
return _iLocIndexer(self)

def _create_dataframe_from_manager(self, new_manager, inplace=False):
"""Returns or updates a DataFrame given new data_manager"""
Expand Down
4 changes: 0 additions & 4 deletions modin/pandas/index_metadata.py

This file was deleted.

Loading

0 comments on commit fd24d0f

Please sign in to comment.