Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Refactor GroupBy Methods and Implement Reindex #2101

Merged
merged 23 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ matrix:
# - python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_concat.py
- python -m pytest python/ray/dataframe/test/test_io.py
# - python -m pytest python/ray/dataframe/test/test_groupby.py

# ray tune tests
# - python python/ray/tune/test/dependency_test.py
Expand Down Expand Up @@ -199,6 +200,7 @@ script:
- python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_concat.py
- python -m pytest python/ray/dataframe/test/test_io.py
- python -m pytest python/ray/dataframe/test/test_groupby.py

# ray tune tests
- python python/ray/tune/test/dependency_test.py
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dataframe/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
all_series = all(isinstance(obj, pandas.Series)
for obj in objs)
if all_series:
return pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
return DataFrame(pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy))

if isinstance(objs, dict):
raise NotImplementedError(
Expand Down
166 changes: 114 additions & 52 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import sys
import re

from .groupby import DataFrameGroupBy
from .utils import (
_deploy_func,
_deploy_generic_func,
_map_partitions,
_partition_pandas_dataframe,
to_pandas,
create_blocks_helper,
_blocks_to_col,
_blocks_to_row,
_create_block_partitions,
Expand All @@ -43,7 +44,8 @@
_co_op_helper,
_match_partitioning,
_concat_index,
_correct_column_dtypes)
_correct_column_dtypes,
fix_blocks_dimensions)
from . import get_npartitions
from .index_metadata import _IndexMetadata
from .iterator import PartitionIterator
Expand Down Expand Up @@ -119,9 +121,12 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
"for internal DataFrame creations"

if block_partitions is not None:
axis = 0
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
axis = 0
self._block_partitions = \
fix_blocks_dimensions(self._block_partitions, axis)

else:
if row_partitions is not None:
axis = 0
Expand All @@ -144,13 +149,6 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
_create_block_partitions(partitions, axis=axis,
length=axis_length)

# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
if self._block_partitions.ndim < 2:
self._block_partitions = np.expand_dims(self._block_partitions,
axis=axis ^ 1)

assert self._block_partitions.ndim == 2, "Block Partitions must be 2D."

# Create the row and column index objects for using our partitioning.
Expand Down Expand Up @@ -674,6 +672,7 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
elif mismatch:
raise KeyError(next(x for x in by if x not in self))

from .groupby import DataFrameGroupBy
return DataFrameGroupBy(self, by, axis, level, as_index, sort,
group_keys, squeeze, **kwargs)

Expand Down Expand Up @@ -984,36 +983,23 @@ def _string_function(self, func, *args, **kwargs):
raise ValueError("{} is an unknown string function".format(func))

def _callable_function(self, func, axis, *args, **kwargs):
if axis == 0:
partitions = self._col_partitions
else:
partitions = self._row_partitions

if axis == 1:
kwargs['axis'] = axis
kwargs['temp_columns'] = self.columns
else:
kwargs['temp_index'] = self.index
kwargs['axis'] = axis

def agg_helper(df, arg, *args, **kwargs):
if 'temp_index' in kwargs:
df.index = kwargs.pop('temp_index', None)
else:
df.columns = kwargs.pop('temp_columns', None)
def agg_helper(df, arg, index, columns, *args, **kwargs):
df.index = index
df.columns = columns
is_transform = kwargs.pop('is_transform', False)
new_df = df.agg(arg, *args, **kwargs)

is_series = False
index = None
columns = None

if isinstance(new_df, pd.Series):
is_series = True
index = None
columns = None
else:
index = new_df.index \
if not isinstance(new_df.index, pd.RangeIndex) \
else None
columns = new_df.columns
index = new_df.index
new_df.columns = pd.RangeIndex(0, len(new_df.columns))
new_df.reset_index(drop=True, inplace=True)

Expand All @@ -1024,13 +1010,37 @@ def agg_helper(df, arg, *args, **kwargs):

return is_series, new_df, index, columns

remote_result = \
[_deploy_func._submit(args=(lambda df: agg_helper(df,
func,
*args,
**kwargs),
part), num_return_vals=4)
for part in partitions]
if axis == 0:
index = self.index
columns = [self._col_metadata.partition_series(i).index
for i in range(len(self._col_partitions))]

remote_result = \
[_deploy_func._submit(args=(
lambda df: agg_helper(df,
func,
index,
cols,
*args,
**kwargs),
part), num_return_vals=4)
for cols, part in zip(columns, self._col_partitions)]

if axis == 1:
indexes = [self._row_metadata.partition_series(i).index
for i in range(len(self._row_partitions))]
columns = self.columns

remote_result = \
[_deploy_func._submit(args=(
lambda df: agg_helper(df,
func,
index,
columns,
*args,
**kwargs),
part), num_return_vals=4)
for index, part in zip(indexes, self._row_partitions)]

# This magic transposes the list comprehension returned from remote
is_series, new_parts, index, columns = \
Expand All @@ -1053,21 +1063,22 @@ def agg_helper(df, arg, *args, **kwargs):
# remote objects. We build a Ray DataFrame from the Pandas partitions.
elif axis == 0:
new_index = ray.get(index[0])
columns = ray.get(columns)
columns = columns[0].append(columns[1:])
# This does not handle the Multi Index case
new_columns = ray.get(columns)
new_columns = new_columns[0].append(new_columns[1:])

return DataFrame(col_partitions=new_parts,
columns=columns,
index=self.index if new_index is None
else new_index)
columns=new_columns,
index=new_index)
else:
new_index = ray.get(index[0])
columns = ray.get(columns)
columns = columns[0].append(columns[1:])
new_columns = ray.get(columns[0])
# This does not handle the Multi Index case
new_index = ray.get(index)
new_index = new_index[0].append(new_index[1:])

return DataFrame(row_partitions=new_parts,
columns=columns,
index=self.index if new_index is None
else new_index)
columns=new_columns,
index=new_index)

def align(self, other, join='outer', axis=None, level=None, copy=True,
fill_value=None, method=None, limit=None, fill_axis=0,
Expand Down Expand Up @@ -1234,7 +1245,7 @@ def as_matrix(self, columns=None):
Returns:
values: ndarray
"""
# TODO this is very inneficient, also see __array__
# TODO this is very inefficient, also see __array__
return to_pandas(self).as_matrix(columns)

def asfreq(self, freq, method=None, how=None, normalize=False,
Expand Down Expand Up @@ -3323,9 +3334,60 @@ def rdiv(self, other, axis='columns', level=None, fill_value=None):
def reindex(self, labels=None, index=None, columns=None, axis=None,
method=None, copy=True, level=None, fill_value=np.nan,
limit=None, tolerance=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if level is not None:
raise NotImplementedError(
"Multilevel Index not Implemented. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
else 0
if axis == 0 and labels is not None:
index = labels
elif labels is not None:
columns = labels

def reindex_helper(old_index, new_index, axis, npartitions, *df):
df = pd.concat(df, axis=axis ^ 1)
if axis == 1:
df.index = old_index
else:
df.columns = old_index

df = df.reindex(new_index, copy=False, axis=axis ^ 1,
method=method, fill_value=fill_value,
limit=limit, tolerance=tolerance)
return create_blocks_helper(df, npartitions, axis)

new_blocks = self._block_partitions
if index is not None:
old_index = self.index
new_blocks = np.array([_deploy_generic_func._submit(
args=(tuple([reindex_helper, old_index, index, 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the tuple([...] + block.tolist()) you can just do (...) + tuple(block.tolist()). I think it seems more clear this way.

len(new_blocks)] + block.tolist())),
num_return_vals=len(new_blocks))
for block in new_blocks.T]).T
else:
index = self.index

if columns is not None:
old_columns = self.columns
new_blocks = np.array([_deploy_generic_func._submit(
args=tuple([reindex_helper, old_columns, columns, 0,
new_blocks.shape[1]] + block.tolist()),
num_return_vals=new_blocks.shape[1])
for block in new_blocks])
else:
columns = self.columns

if copy:
return DataFrame(block_partitions=new_blocks,
index=index,
columns=columns)

self._update_inplace(block_partitions=new_blocks,
index=index,
columns=columns)

def reindex_axis(self, labels, axis=0, method=None, level=None, copy=True,
limit=None, fill_value=np.nan):
Expand Down
Loading