Skip to content

Commit

Permalink
FEAT-#2664: Add to_labels algebraic operator (#2666)
Browse files Browse the repository at this point in the history
Resolves #2664

This add the algebraic operator for `to_labels`, which enables Modin to
better optimize the movement of data to metadata. See more in the paper
about the algebraic operator:
http://www.vldb.org/pvldb/vol13/p2033-petersohn.pdf

Co-authored-by: William Ma <[email protected]>

Signed-off-by: Devin Petersohn <[email protected]>
  • Loading branch information
devin-petersohn authored Feb 2, 2021
1 parent e99b629 commit 5ad5fa3
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 52 deletions.
25 changes: 25 additions & 0 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pandas.core.resample
import pandas
import numpy as np
from typing import List, Hashable


def _get_axis(axis):
Expand Down Expand Up @@ -484,6 +485,30 @@ def reset_index(self, **kwargs):
"""
return DataFrameDefault.register(pandas.DataFrame.reset_index)(self, **kwargs)

def set_index_from_columns(
self, keys: List[Hashable], drop: bool = True, append: bool = False
):
"""Create new row labels from a list of columns.
Parameters
----------
keys : list of hashable
The list of column names that will become the new index.
drop : boolean
Whether or not to drop the columns provided in the `keys` argument.
append : boolean
Whether or not to add the columns in `keys` as new levels appended to the
existing index.
Returns
-------
PandasQueryCompiler
A new QueryCompiler with updated index.
"""
return DataFrameDefault.register(pandas.DataFrame.set_index)(
self, keys=keys, drop=drop, append=append
)

# END Abstract reindex/reset_index

# Full Reduce operations
Expand Down
52 changes: 52 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import pandas
from pandas.core.common import is_bool_indexer
from pandas.core.indexing import check_bool_indexer
from pandas.core.indexes.api import ensure_index_from_sequences
from pandas.core.dtypes.common import (
is_list_like,
is_numeric_dtype,
is_datetime_or_timedelta_dtype,
)
from pandas.core.base import DataError
from collections.abc import Iterable, Container
from typing import List, Hashable
import warnings


Expand Down Expand Up @@ -538,6 +540,56 @@ def reset_index(self, **kwargs):
new_self.index = pandas.RangeIndex(len(new_self.index))
return new_self

def set_index_from_columns(
self, keys: List[Hashable], drop: bool = True, append: bool = False
):
"""Create new row labels from a list of columns.
Parameters
----------
keys : list of hashable
The list of column names that will become the new index.
drop : boolean
Whether or not to drop the columns provided in the `keys` argument
append : boolean
Whether or not to add the columns in `keys` as new levels appended to the
existing index.
Returns
-------
PandasQueryCompiler
A new QueryCompiler with updated index.
"""
new_modin_frame = self._modin_frame.to_labels(keys)
if append:
arrays = []
# Appending keeps the original order of the index levels, then appends the
# new index objects.
names = list(self.index.names)
if isinstance(self.index, pandas.MultiIndex):
for i in range(self.index.nlevels):
arrays.append(self.index._get_level_values(i))
else:
arrays.append(self.index)

# Add the names in the correct order.
names.extend(new_modin_frame.index.names)
if isinstance(new_modin_frame.index, pandas.MultiIndex):
for i in range(new_modin_frame.index.nlevels):
arrays.append(new_modin_frame.index._get_level_values(i))
else:
arrays.append(new_modin_frame.index)
new_modin_frame.index = ensure_index_from_sequences(arrays, names)
if not drop:
# The algebraic operator for this operation always drops the column, but we
# can copy the data in this object and just use the index from the result of
# the query compiler call.
result = self._modin_frame.copy()
result.index = new_modin_frame.index
else:
result = new_modin_frame
return self.__constructor__(result)

# END Reindex/reset_index

# Transpose
Expand Down
24 changes: 24 additions & 0 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pandas
from pandas.core.indexes.api import ensure_index, Index, RangeIndex
from pandas.core.dtypes.common import is_numeric_dtype
from typing import List, Hashable

from modin.backends.pandas.query_compiler import PandasQueryCompiler
from modin.error_message import ErrorMessage
Expand Down Expand Up @@ -597,6 +598,29 @@ def from_labels_executor(df, **kwargs):
result._apply_index_objs(0)
return result

def to_labels(self, column_list: List[Hashable]) -> "BasePandasFrame":
"""Move one or more columns into the row labels. Previous labels are dropped.
Parameters
----------
column_list : list of hashable
The list of column names to place as the new row labels.
Returns
-------
A new BasePandasFrame that has the updated labels.
"""
extracted_columns = self.mask(col_indices=column_list).to_pandas()
if len(column_list) == 1:
new_labels = pandas.Index(extracted_columns.squeeze(axis=1))
else:
new_labels = pandas.MultiIndex.from_frame(extracted_columns)
result = self.mask(
col_indices=[i for i in self.columns if i not in column_list]
)
result.index = new_labels
return result

def reorder_labels(self, row_numeric_idx=None, col_numeric_idx=None):
"""Reorder the column and or rows in this DataFrame.
Expand Down
95 changes: 43 additions & 52 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
is_list_like,
is_numeric_dtype,
)
from pandas.core.indexes.api import ensure_index_from_sequences
from pandas.util._validators import validate_bool_kwarg
from pandas.io.formats.printing import pprint_thing
from pandas._libs.lib import no_default
Expand All @@ -40,7 +39,7 @@
import functools
import numpy as np
import sys
from typing import Optional, Sequence, Tuple, Union, Mapping
from typing import Optional, Sequence, Tuple, Union, Mapping, Iterator
import warnings

from modin.error_message import ErrorMessage
Expand Down Expand Up @@ -1555,59 +1554,51 @@ def set_index(
inplace = validate_bool_kwarg(inplace, "inplace")
if not isinstance(keys, list):
keys = [keys]
if inplace:
frame = self
else:
frame = self.copy()

arrays = []
names = []
if append:
names = [x for x in self.index.names]
if self._query_compiler.has_multiindex():
for i in range(self.index.nlevels):
arrays.append(self.index._get_level_values(i))

if any(
isinstance(col, (pandas.Index, Series, np.ndarray, list, Iterator))
for col in keys
):
# The current implementation cannot mix a list column labels and list like
# objects.
if not all(
isinstance(col, (pandas.Index, Series, np.ndarray, list, Iterator))
for col in keys
):
return self._default_to_pandas(
"set_index",
keys,
drop=drop,
append=append,
inplace=inplace,
verify_integrity=verify_integrity,
)
if inplace:
frame = self
else:
arrays.append(self.index)
to_remove = []
for col in keys:
if isinstance(col, pandas.MultiIndex):
# append all but the last column so we don't have to modify
# the end of this loop
for n in range(col.nlevels - 1):
arrays.append(col._get_level_values(n))

level = col._get_level_values(col.nlevels - 1)
names.extend(col.names)
elif isinstance(col, pandas.Series):
level = col._values
names.append(col.name)
elif isinstance(col, pandas.Index):
level = col
names.append(col.name)
elif isinstance(col, (list, np.ndarray, pandas.Index)):
level = col
names.append(None)
frame = self.copy()
# These are single-threaded objects, so we might as well let pandas do the
# calculation so that it matches.
frame.index = (
pandas.DataFrame(index=self.index)
.set_index(keys, append=append, verify_integrity=verify_integrity)
.index
)
if not inplace:
return frame
else:
level = frame[col]._to_pandas()._values
names.append(col)
if drop:
to_remove.append(col)
arrays.append(level)
index = ensure_index_from_sequences(arrays, names)

if verify_integrity and not index.is_unique:
duplicates = index.get_duplicates()
raise ValueError("Index has duplicate keys: %s" % duplicates)

for c in to_remove:
del frame[c]
# clear up memory usage
index._cleanup()
frame.index = index
return
new_query_compiler = self._query_compiler.set_index_from_columns(
keys, drop=drop, append=append
)

if not inplace:
return frame
if verify_integrity and not new_query_compiler.index.is_unique:
duplicates = new_query_compiler.index[
new_query_compiler.index.duplicated()
].unique()
raise ValueError(f"Index has duplicate keys: {duplicates}")

return self._create_or_update_from_compiler(new_query_compiler, inplace=inplace)

sparse = CachedAccessor("sparse", SparseFrameAccessor)

Expand Down

0 comments on commit 5ad5fa3

Please sign in to comment.