-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataFrame] Refactor GroupBy Methods and Implement Reindex #2101
[DataFrame] Refactor GroupBy Methods and Implement Reindex #2101
Conversation
Test PASSed. |
Test PASSed. |
Test PASSed. |
python/ray/dataframe/dataframe.py
Outdated
# Sometimes we only get a single column or row, which is | ||
# problematic for building blocks from the partitions, so we | ||
# add whatever dimension we're missing from the input. | ||
if self._block_partitions.ndim < 2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make _block_partitions
a property and move the check to there.
Test PASSed. |
Test PASSed. |
python/ray/dataframe/dataframe.py
Outdated
@@ -656,7 +657,10 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, | |||
Returns: | |||
A new DataFrame resulting from the groupby. | |||
""" | |||
from .groupby import DataFrameGroupBy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to just before the DataFrameGroupBy
object is used.
python/ray/dataframe/dataframe.py
Outdated
@@ -656,7 +657,10 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, | |||
Returns: | |||
A new DataFrame resulting from the groupby. | |||
""" | |||
from .groupby import DataFrameGroupBy | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
python/ray/dataframe/dataframe.py
Outdated
axis = pd.DataFrame()._get_axis_number(axis) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
python/ray/dataframe/groupby.py
Outdated
new_df.index = [k for k, v in self._iter] | ||
else: | ||
new_df = concat(result) | ||
new_df = new_df.reindex(self._index, axis=0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer axis=self._axis
python/ray/dataframe/groupby.py
Outdated
new_df.index = self._index | ||
else: | ||
new_df = concat(result, axis=1) | ||
new_df = new_df.reindex(self._columns, axis=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use utils._reindex_helper
to more efficiently reorder the columns/rows. Just make sure you reassign new_df.index
or new_df.columns
depending on the correct reassignment.
python/ray/dataframe/groupby.py
Outdated
|
||
from .concat import concat | ||
if self._axis == 0: | ||
new_df = new_df.reindex(self._index, axis=0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here for utils._reindex_helper
@pytest.fixture | ||
def ray_df_equals_pandas(ray_df, pandas_df): | ||
assert isinstance(ray_df, pd.DataFrame) | ||
assert to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove sort_index()
from this file on checks
return np.array(x) if axis == 0 else np.array(x).T | ||
blocks = np.array(x) if axis == 0 else np.array(x).T | ||
|
||
# Sometimes we only get a single column or row, which is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this next part to a utils function and call from within the _block_partitions
property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't move it to a property because it depends on axis
, but I have moved it to a utils function.
Test PASSed. |
Test PASSed. |
1a05681
to
ec18852
Compare
Test PASSed. |
Test PASSed. |
Test PASSed. |
python/ray/dataframe/utils.py
Outdated
@@ -205,6 +205,20 @@ def _deploy_func(func, dataframe, *args): | |||
return func(dataframe, *args) | |||
|
|||
|
|||
@ray.remote | |||
def _deploy_generic_func(func, *args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know that we need this. I see how you're using it, but for now I would just prefer _deploy_func like everything else and pass in a row/column partition.
python/ray/dataframe/dataframe.py
Outdated
if index is not None: | ||
old_index = self.index | ||
new_blocks = np.array([_deploy_generic_func._submit( | ||
args=(tuple([reindex_helper, old_index, index, 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the tuple([...] + block.tolist())
you can just do (...) + tuple(block.tolist())
. I think it seems more clear this way.
Test PASSed. |
Passes on private-travis. Thanks @kunalgosar! |
* master: [DataFrame] Refactor GroupBy Methods and Implement Reindex (ray-project#2101) Initial Support for Airspeed Velocity (ray-project#2113) Use automatic memory management in Redis modules. (ray-project#1797) [DataFrame] Test bugfixes (ray-project#2111) [DataFrame] Update initializations of IndexMetadata which use outdated APIs (ray-project#2103)
* master: Prototype named actors. (ray-project#2129) Update arrow to latest master (ray-project#2100) [DataFrame] Speed up dtypes (ray-project#2118) do not fetch from dead Plasma Manager (ray-project#2116) [DataFrame] Refactor GroupBy Methods and Implement Reindex (ray-project#2101) Initial Support for Airspeed Velocity (ray-project#2113) Use automatic memory management in Redis modules. (ray-project#1797) [DataFrame] Test bugfixes (ray-project#2111) [DataFrame] Update initializations of IndexMetadata which use outdated APIs (ray-project#2103)
Some of the changes in this PR are:
_block_partitions
was 1Ddf.apply
anddf.agg