From e7e78ceb504030b44713af0c2638a3aea2ff3b0b Mon Sep 17 00:00:00 2001 From: William Ma Date: Wed, 29 Aug 2018 16:54:40 -0700 Subject: [PATCH 01/14] Added type checking and changed how variables were read in from kwargs --- modin/data_management/data_manager.py | 137 +++++++++++++----- .../partitioning/partition_collections.py | 14 +- .../partitioning/remote_partition.py | 6 +- 3 files changed, 110 insertions(+), 47 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index ef27403aa5f..44050697347 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -131,11 +131,12 @@ def _join_index_objects(self, axis, other_index, how, sort=True): return self.index.join(other_index, how=how, sort=sort) def concat(self, axis, other, **kwargs): + ignore_index = kwargs.get("ignore_index", default=False) if axis == 0: if isinstance(other, list): - return self._append_list_of_managers(other, kwargs["ignore_index"]) + return self._append_list_of_managers(other, ignore_index) else: - return self._append_data_manager(other, kwargs["ignore_index"]) + return self._append_data_manager(other, ignore_index) else: if isinstance(other, list): return self._join_list_of_managers(other, **kwargs) @@ -174,9 +175,17 @@ def _append_list_of_managers(self, others, ignore_index): return cls(new_data, new_index, joined_columns) def _join_data_manager(self, other, **kwargs): + assert isinstance(other, type(self)), \ + "This method is for data manager objects only" cls = type(self) - joined_index = self._join_index_objects(1, other.index, kwargs["how"], sort=kwargs["sort"]) + # Uses join's default value (though should not revert to default) + how = kwargs.get("how", default="left") + sort = kwargs.get("sort", default=False) + lsuffix = kwargs.get("lsuffix", default="") + rsuffix = kwargs.get("rsuffix", default="") + + joined_index = self._join_index_objects(1, other.index, how, sort=sort) to_join = other.reindex(0, joined_index).data new_self = self.reindex(0, joined_index).data @@ -187,14 +196,30 @@ def _join_data_manager(self, other, **kwargs): # suffixes. self_proxy = pandas.DataFrame(columns=self.columns) other_proxy = pandas.DataFrame(columns=other.columns) - new_columns = self_proxy.join(other_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns + new_columns = self_proxy.join(other_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns return cls(new_data, joined_index, new_columns) def _join_list_of_managers(self, others, **kwargs): + assert isinstance(others, list), \ + "This method is for lists of DataManager objects only" + assert all(isinstance(other, type(self)) for other in others), \ + "Different Manager objects are being used. This is not allowed" + cls = type(self) + + # Uses join's default value (though should not revert to default) + how = kwargs.get("how", default="left") + sort = kwargs.get("sort", default=False) + lsuffix = kwargs.get("lsuffix", default="") + rsuffix = kwargs.get("rsuffix", default="") + + assert isinstance(others, list), \ + "This method is for lists of DataManager objects only" + assert all(isinstance(other, type(self)) for other in others), \ + "Different Manager objects are being used. This is not allowed" cls = type(self) - joined_index = self._join_index_objects(1, [other.index for other in others], kwargs["how"], sort=kwargs["sort"]) + joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort) to_join = [other.reindex(0, joined_index).data for other in others] new_self = self.reindex(0, joined_index).data @@ -205,7 +230,7 @@ def _join_list_of_managers(self, others, **kwargs): # suffixes. self_proxy = pandas.DataFrame(columns=self.columns) others_proxy = [pandas.DataFrame(columns=other.columns) for other in others] - new_columns = self_proxy.join(others_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns + new_columns = self_proxy.join(others_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns return cls(new_data, joined_index, new_columns) # END Append/Concat/Join (Not Merge) @@ -225,7 +250,7 @@ def inter_manager_operations(self, other, how_to_join, func): new_columns = self._join_index_objects(0, other.columns, how_to_join, sort=False) reindexed_other = other.reindex(0, joined_index).data - reindexed_self = other.reindex(0, joined_index).data + reindexed_self = self.reindex(0, joined_index).data # THere is an interesting serialization anomaly that happens if we do # not use the columns in `inter_data_op_builder` from here (e.g. if we @@ -291,7 +316,7 @@ def reindex_builer(df, axis, old_labels, new_labels, **kwargs): def reset_index(self, **kwargs): cls = type(self) - drop = kwargs["drop"] + drop = kwargs.get("drop", default=False) new_index = pandas.RangeIndex(len(self.index)) if not drop: @@ -350,31 +375,41 @@ def full_reduce(self, axis, map_func, reduce_func=None): return result def count(self, **kwargs): + axis = kwargs.get("axis", default=0) map_func = self._prepare_method(pandas.DataFrame.count, **kwargs) reduce_func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(kwargs["axis"], map_func, reduce_func) + return self.full_reduce(axis, map_func, reduce_func) def max(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.max, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def mean(self, **kwargs): - axis = kwargs["axis"] + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) length = len(self.index) if not axis else len(self.columns) return self.sum(**kwargs) / length def min(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.min, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def prod(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.prod, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def sum(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) # END Full Reduce operations # Map partitions operations @@ -442,12 +477,14 @@ def _post_process_idx_ops(self, axis, intermediate_result): return result def all(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.all, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def any(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.any, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def idxmax(self, **kwargs): @@ -457,11 +494,12 @@ def idxmax_builder(df, **kwargs): df.index = pandas.RangeIndex(len(df.index)) return df.idxmax(**kwargs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(idxmax_builder, **kwargs) - max_result = self.full_axis_reduce(func, kwargs["axis"]) + max_result = self.full_axis_reduce(func, axis) # Because our internal partitions don't track the external index, we # have to do a conversion. - return self._post_process_idx_ops(kwargs["axis"], max_result) + return self._post_process_idx_ops(axis, max_result) def idxmin(self, **kwargs): @@ -471,11 +509,12 @@ def idxmin_builder(df, **kwargs): df.index = pandas.RangeIndex(len(df.index)) return df.idxmin(**kwargs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(idxmin_builder, **kwargs) - min_result = self.full_axis_reduce(func, kwargs["axis"]) + min_result = self.full_axis_reduce(func, axis) # Because our internal partitions don't track the external index, we # have to do a conversion. - return self._post_process_idx_ops(kwargs["axis"], min_result) + return self._post_process_idx_ops(axis, min_result) def first_valid_index(self): @@ -506,30 +545,43 @@ def last_valid_index_builder(df): return self.index[first_result.max()] def median(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.median, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def nunique(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def skew(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.skew, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def std(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.std, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def var(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.var, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def quantile_for_single_value(self, **kwargs): + axis = kwargs.get("axis", default=0) + q = kwargs.get("q", default=0.5) + assert type(q) is float + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) - result = self.full_axis_reduce(func, kwargs["axis"]) - result.name = kwargs["q"] + result = self.full_axis_reduce(func, axis) + result.name = q return result # END Column/Row partitions reduce operations @@ -560,19 +612,23 @@ def query_builder(df): def quantile_for_list_of_values(self, **kwargs): cls = type(self) - q = kwargs["q"] + axis = kwargs.get("axis", default=0) + q = kwargs.get("q", default=0.5) + assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) q_index = pandas.Float64Index(q) - new_data = self.map_across_full_axis(kwargs["axis"], func) - new_columns = self.columns if not kwargs["axis"] else self.index + new_data = self.map_across_full_axis(axis, func) + new_columns = self.columns if not axis else self.index return cls(new_data, q_index, new_columns) def _cumulative_builder(self, func, **kwargs): cls = type(self) + axis = kwargs.get("axis", default=0) func = self._prepare_method(func, **kwargs) - new_data = self.map_across_full_axis(kwargs["axis"], func) + new_data = self.map_across_full_axis(axis, func) return cls(new_data, self.index, self.columns) def cumsum(self, **kwargs): @@ -588,8 +644,10 @@ def cumprod(self, **kwargs): return self._cumulative_builder(pandas.DataFrame.cumprod, **kwargs) def dropna(self, **kwargs): - axis = kwargs["axis"] - subset = kwargs["subset"] + axis = kwargs.get("axis", default=0) + subset = kwargs.get("subset", default=None) + thresh = kwargs.get("thresh", default=None) + how = kwargs.get("how", default="any") # We need to subset the axis that we care about with `subset`. This # will be used to determine the number of values that are NA. if subset is not None: @@ -605,13 +663,12 @@ def dropna(self, **kwargs): # We are building this dictionary first to determine which columns # and rows to drop. This way we do not drop some columns before we # know which rows need to be dropped. - if kwargs["thresh"] is not None: + if thresh is not None: # Count the number of NA values and specify which are higher than # thresh. - thresh = kwargs["thresh"] drop_values = {ax ^ 1: compute_na.isna().sum(axis=ax ^ 1) > thresh for ax in axis} else: - drop_values = {ax ^ 1: getattr(compute_na.isna(), kwargs["how"])(axis=ax ^ 1) for ax in axis} + drop_values = {ax ^ 1: getattr(compute_na.isna(), how)(axis=ax ^ 1) for ax in axis} if 0 not in drop_values: drop_values[0] = None @@ -630,7 +687,7 @@ def dropna(self, **kwargs): def mode(self, **kwargs): cls = type(self) - axis = kwargs["axis"] + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.mode, **kwargs) new_data = self.map_across_full_axis(axis, func) @@ -651,8 +708,8 @@ def mode(self, **kwargs): def fillna(self, **kwargs): cls = type(self) - axis = kwargs["axis"] - value = kwargs["value"] + axis = kwargs.get("axis", default=0) + value = kwargs.get("value", default=None) if isinstance(value, dict): return @@ -675,7 +732,7 @@ def describe(self, **kwargs): def rank(self, **kwargs): cls = type(self) - axis = kwargs["axis"] + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.rank, **kwargs) new_data = self.map_across_full_axis(axis, func) if axis: diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index 48312d1171c..2af2a300148 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -332,8 +332,7 @@ def to_pandas(self, is_transposed=False): @classmethod def from_pandas(cls, dataframe, num_splits): if num_splits is None: - from ...pandas import DEFAULT_NPARTITIONS - num_splits = DEFAULT_NPARTITIONS + num_splits = cls._compute_num_partitions() pass @@ -353,8 +352,9 @@ def get_indices(self, axis=0, old_blocks=None): A Pandas Index object. """ if axis == 0: + func = self.preprocess_func(lambda df: df.index) # We grab the first column of blocks and extract the indices - new_indices = [idx.apply(lambda df: df.index).get() for idx in self.partitions.T[0]] + new_indices = [idx.apply(func).get() for idx in self.partitions.T[0]] # This is important because sometimes we have resized the data. The new # sizes will not be valid if we are trying to compute the index on a # new object that has a different length. @@ -363,7 +363,8 @@ def get_indices(self, axis=0, old_blocks=None): else: cumulative_block_lengths = np.array(self.block_lengths).cumsum() else: - new_indices = [idx.apply(lambda df: df.columns).get() for idx in self.partitions[0]] + func = self.preprocess_func(lambda df: df.columns) + new_indices = [idx.apply(func).get() for idx in self.partitions[0]] if old_blocks is not None: cumulative_block_lengths = np.array(old_blocks.block_widths).cumsum() @@ -538,6 +539,7 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep indices = [indices] partitions_dict = self._get_dict_of_block_index(axis, indices) + preprocessed_func = self.preprocess_func(func) # Since we might be keeping the remaining blocks that are not modified, # we have to also keep the block_partitions object in the correct @@ -551,10 +553,10 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep if not keep_remaining: # See notes in `apply_func_to_select_indices` - result = np.array([partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in partitions_dict]) + result = np.array([partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in partitions_dict]) else: # See notes in `apply_func_to_select_indices` - result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))]) + result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))]) return cls(result.T) if not axis else cls(result) diff --git a/modin/data_management/partitioning/remote_partition.py b/modin/data_management/partitioning/remote_partition.py index b65a6ca0197..3d86d918085 100644 --- a/modin/data_management/partitioning/remote_partition.py +++ b/modin/data_management/partitioning/remote_partition.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +import pandas import ray @@ -167,7 +168,10 @@ def to_pandas(self): Returns: A Pandas DataFrame. """ - return self.get() + dataframe = self.get() + assert type(dataframe) is pandas.DataFrame() + + return dataframe @classmethod def put(cls, obj): From cf9b05d472fbe87071c37705b2f3c49843aba614 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 16:26:28 -0700 Subject: [PATCH 02/14] Updated sample to new architecture --- modin/pandas/dataframe.py | 41 +++++++++------------------------------ 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 04920d8f609..57aafed47b2 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -3303,9 +3303,11 @@ def sample(self, else 0 if axis == 0: - axis_length = len(self._row_metadata) + axis_labels = self._data_manager.index + axis_length = len(axis_labels) else: - axis_length = len(self._col_metadata) + axis_labels = self._data_manager.column + axis_length = len(axis_labels) if weights is not None: @@ -3383,15 +3385,6 @@ def sample(self, columns=[] if axis == 1 else self.columns, index=self.index if axis == 1 else []) - if axis == 1: - axis_labels = self.columns - partition_metadata = self._col_metadata - partitions = self._col_partitions - else: - axis_labels = self.index - partition_metadata = self._row_metadata - partitions = self._row_partitions - if random_state is not None: # Get a random number generator depending on the type of # random_state that is passed in @@ -3407,35 +3400,19 @@ def sample(self, # choose random numbers and then get corresponding labels from # chosen axis sample_indices = random_num_gen.randint( - low=0, high=len(partition_metadata), size=n) + low=0, high=axis_length, size=n) samples = axis_labels[sample_indices] else: # randomly select labels from chosen axis samples = np.random.choice( a=axis_labels, size=n, replace=replace, p=weights) - # create an array of (partition, index_within_partition) tuples for - # each sample - part_ind_tuples = [partition_metadata[sample] for sample in samples] - if axis == 1: - # tup[0] refers to the partition number and tup[1] is the index - # within that partition - new_cols = [ - _deploy_func.remote(lambda df: df.iloc[:, [tup[1]]], - partitions[tup[0]]) - for tup in part_ind_tuples - ] - return DataFrame( - col_partitions=new_cols, columns=samples, index=self.index) + data_manager = self._data_manager.getitem_col_array(samples) + return DataFrame(data_manager=data_manager) else: - new_rows = [ - _deploy_func.remote(lambda df: df.loc[[tup[1]]], - partitions[tup[0]]) - for tup in part_ind_tuples - ] - return DataFrame( - row_partitions=new_rows, columns=self.columns, index=samples) + data_manager = self._data_manager.getitem_row_array(samples) + return DataFrame(data_manager=data_manager) def select(self, crit, axis=0): raise NotImplementedError( From 872495629790538a40e0423afabc0b8f501cf6f5 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 18:56:22 -0700 Subject: [PATCH 03/14] Made test_sample more rigourous --- modin/pandas/test/test_dataframe.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 99db0b9903c..611cc322321 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -63,6 +63,7 @@ def test_int_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -233,6 +234,7 @@ def test_float_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -402,6 +404,7 @@ def test_mixed_dtype_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -569,6 +572,7 @@ def test_nan_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -2865,10 +2869,19 @@ def test_rtruediv(): test_inter_df_math_right_ops("rtruediv") -def test_sample(): - ray_df = create_test_dataframe() - assert len(ray_df.sample(n=4)) == 4 - assert len(ray_df.sample(frac=0.5)) == 2 +@pytest.fixture +def test_sample(ray_df, pd_df): + with pytest.raises(ValueError): + ray_df.sample(n=3, frac=0.4) + + assert ray_df_equals_pandas( + ray_df.sample(frac=0.5, random_state=42), + pd_df.sample(frac=0.5, random_state=42) + ) + assert ray_df_equals_pandas( + ray_df.sample(n=2, random_state=42), + pd_df.sample(n=2, random_state=42) + ) def test_select(): From 2ad1c3b3961f72f0338ed1b2f2fd7fc28fe1f9e7 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 19:41:35 -0700 Subject: [PATCH 04/14] Removed 'default=' from kwargs.get's --- modin/data_management/data_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 6c80861fc19..43c4e922574 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -388,13 +388,13 @@ def count(self, **kwargs): def max(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", default=0) + axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.max, **kwargs) return self.full_reduce(axis, func) def mean(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", default=0) + axis = kwargs.get("axis", 0) length = len(self.index) if not axis else len(self.columns) return self.sum(**kwargs) / length From 502bd0dd8d0252b2e9db4fe437dfb70d048a6e92 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 20:44:25 -0700 Subject: [PATCH 05/14] Updated eval to the new backend --- modin/data_management/data_manager.py | 29 ++++++++++++++++++++++- modin/pandas/dataframe.py | 33 +++++---------------------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 43c4e922574..770399399f2 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -599,7 +599,7 @@ def query(self, expr, **kwargs): cls = type(self) columns = self.columns - def query_builder(df): + def query_builder(df, **kwargs): # This is required because of an Arrow limitation # TODO revisit for Arrow error df = df.copy() @@ -616,6 +616,33 @@ def query_builder(df): return cls(new_data, new_index, self.columns) + def eval(self, expr, **kwargs): + cls = type(self) + columns = self.columns + + def eval_builder(df, **kwargs): + df.columns = columns + result = df.eval(expr, inplace=False, **kwargs) + # If result is a series, expr was not an assignment expression. + if not isinstance(result, pandas.Series): + result.columns = pandas.RangeIndex(0, len(result.columns)) + return result + + func = self._prepare_method(eval_builder, **kwargs) + new_data = self.map_across_full_axis(1, func) + + # eval can update the columns, so we must update columns + columns_copy = pandas.DataFrame(columns=columns) + columns_copy = columns_copy.eval(expr, inplace=False, **kwargs) + if isinstance(columns_copy, pandas.Series): + # To create a data manager, we need the + # columns to be in a list-like + columns = list(columns_copy.name) + else: + columns = columns_copy.columns + + return cls(new_data, self.index, columns) + def quantile_for_list_of_values(self, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 6b983d99f9f..e65c890f7fc 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1525,36 +1525,14 @@ def eval(self, expr, inplace=False, **kwargs): ndarray, numeric scalar, DataFrame, Series """ self._validate_eval_query(expr, **kwargs) - - columns = self.columns - - def eval_helper(df): - df.columns = columns - result = df.eval(expr, inplace=False, **kwargs) - # If result is a series, expr was not an assignment expression. - if not isinstance(result, pandas.Series): - result.columns = pandas.RangeIndex(0, len(result.columns)) - return result - inplace = validate_bool_kwarg(inplace, "inplace") - new_rows = _map_partitions(eval_helper, self._row_partitions) - result_type = ray.get( - _deploy_func.remote(lambda df: type(df), new_rows[0])) - if result_type is pandas.Series: - new_series = pandas.concat(ray.get(new_rows), axis=0, copy=False) - new_series.index = self.index - return new_series - - columns_copy = self._col_metadata._coord_df.copy().T - columns_copy.eval(expr, inplace=True, **kwargs) - columns = columns_copy.columns + data_manager = self._data_manager.eval(expr, **kwargs) if inplace: - self._update_inplace( - row_partitions=new_rows, columns=columns, index=self.index) + self._update_inplace(new_manager=data_manager) else: - return DataFrame(columns=columns, row_partitions=new_rows) + return DataFrame(data_manager=data_manager) def ewm(self, com=None, @@ -2877,6 +2855,7 @@ def query(self, expr, inplace=False, **kwargs): A new DataFrame if inplace=False """ self._validate_eval_query(expr, **kwargs) + inplace = validate_bool_kwarg(inplace, "inplace") new_manager = self._data_manager.query(expr, **kwargs) @@ -3320,8 +3299,8 @@ def sample(self, # choose random numbers and then get corresponding labels from # chosen axis - sample_indices = random_num_gen.randint( - low=0, high=axis_length, size=n) + sample_indices = random_num_gen.choice( + np.arange(0, axis_length), size=n, replace=replace) samples = axis_labels[sample_indices] else: # randomly select labels from chosen axis From 992a8e2ff8f8100240c26f99a8979beecd478eb4 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 20:44:37 -0700 Subject: [PATCH 06/14] Added two more tests for eval --- modin/pandas/test/test_dataframe.py | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 611cc322321..9ae6c2c7ced 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -1533,6 +1533,18 @@ def test_eval_df_use_case(): frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)} df = pandas.DataFrame(frame_data) ray_df = pd.DataFrame(frame_data) + + # Very hacky test to test eval while inplace is not working + tmp_pandas = df.eval( + "e = arctan2(sin(a), b)", + engine='python', + parser='pandas') + tmp_ray = ray_df.eval( + "e = arctan2(sin(a), b)", + engine='python', + parser='pandas') + assert ray_df_equals_pandas(tmp_ray, tmp_pandas) + df.eval( "e = arctan2(sin(a), b)", engine='python', @@ -1559,6 +1571,24 @@ def test_eval_df_arithmetic_subexpression(): assert ray_df_equals_pandas(ray_df, df) +def test_eval_df_series_result(): + frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)} + df = pandas.DataFrame(frame_data) + ray_df = pd.DataFrame(frame_data) + + # Very hacky test to test eval while inplace is not working + tmp_pandas = df.eval( + "arctan2(sin(a), b)", + engine='python', + parser='pandas') + tmp_ray = ray_df.eval( + "arctan2(sin(a), b)", + engine='python', + parser='pandas') + assert ray_df_equals_pandas(tmp_ray, tmp_pandas) + assert isinstance(to_pandas(tmp_ray), pandas.Series) + + def test_ewm(): ray_df = create_test_dataframe() From 7cbb17a800abff57e4d972fd5c54bccd89484ec2 Mon Sep 17 00:00:00 2001 From: William Ma Date: Fri, 31 Aug 2018 17:40:42 -0700 Subject: [PATCH 07/14] Updated memory_usage to new backend --- modin/pandas/dataframe.py | 4 ++-- modin/pandas/test/test_dataframe.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index e65c890f7fc..dea18dc4aef 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -2326,11 +2326,11 @@ def memory_usage(self, index=True, deep=False): def remote_func(df): return df.memory_usage(index=False, deep=deep) - result = self._map_reduce(axis=0, map_func=remote_func) + result = self._data_manager.full_reduce(axis=0, map_func=remote_func) result.index = self.columns if index: - index_value = self._row_metadata.index.memory_usage(deep=deep) + index_value = self.index.memory_usage(deep=deep) return pandas.Series(index_value, index=['Index']).append(result) return result diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 9ae6c2c7ced..e4327b11b46 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -2279,8 +2279,9 @@ def test_melt(): ray_df.melt() -@pytest.fixture -def test_memory_usage(ray_df): +#@pytest.fixture +def test_memory_usage(): + ray_df = create_test_dataframe() assert type(ray_df.memory_usage()) is pandas.core.series.Series assert ray_df.memory_usage(index=True).at['Index'] is not None assert ray_df.memory_usage(deep=True).sum() >= \ From b144b3d853ca68709a46dd0c931a9cfa8ddb73ea Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 18:12:20 -0700 Subject: [PATCH 08/14] Updated info and memory_usage to the new backend --- modin/pandas/dataframe.py | 149 ++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 62 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index dea18dc4aef..fce288f4883 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1896,85 +1896,110 @@ def infer_objects(self): "To contribute to Pandas on Ray, please visit " "github.com/modin-project/modin.") - def info(self, - verbose=None, - buf=None, - max_cols=None, - memory_usage=None, - null_counts=None): - def info_helper(df): - output_buffer = io.StringIO() - df.info( - verbose=verbose, - buf=output_buffer, - max_cols=max_cols, - memory_usage=memory_usage, - null_counts=null_counts) - return output_buffer.getvalue() - - # Combine the per-partition info and split into lines - result = ''.join( - ray.get(_map_partitions(info_helper, self._col_partitions))) - lines = result.split('\n') + def info(self, + verbose=None, + buf=None, + max_cols=None, + memory_usage=None, + null_counts=None): + """Print a concise summary of a DataFrame, which includes the index + dtype and column dtypes, non-null values and memory usage. + Args: + verbose (bool, optional): Whether to print the full summary. Defaults + to true + + buf (writable buffer): Where to send output. Defaults to sys.stdout + + max_cols (int, optional): When to switch from verbose to truncated + output. By defualt, this is 100. + + memory_usage (bool, str, optional): Specifies whether the total memory + usage of the DataFrame elements (including index) should be displayed. + True always show memory usage. False never shows memory usage. A value + of ‘deep’ is equivalent to “True with deep introspection”. Memory usage + is shown in human-readable units (base-2 representation). Without deep + introspection a memory estimation is made based in column dtype and number + of rows assuming values consume the same memory amount for corresponding + dtypes. With deep memory introspection, a real memory usage calculation is + performed at the cost of computational resources. Defaults to True. + + null_counts (bool, optional): Whetehr to show the non-null counts. By default, + this is shown only when the frame is smaller than 100 columns and 1690785 + rows. A value of True always shows the counts and False never shows the + counts. + + Returns: + Prints the summary of a DataFrame and returns None. + """ + index = self._data_manager.index + columns = self._data_manager.columns + dtypes = self.dtypes + + # Set up default values + verbose = True if verbose is None else verbose + buf = sys.stdout if not buf else buf + max_cols = 100 if not max_cols else max_cols + memory_usage = True if memory_usage is None else memory_usage + if not null_counts: + if len(columns) < 100 and len(index) < 1690785: + null_counts = True + else: + null_counts = False + + # Determine if actually verbose + actually_verbose = True if verbose and max_cols > len(columns) else False + + if type(memory_usage) == str and memory_usage == 'deep': + memory_usage_deep = True + else: + memory_usage_deep = False + + # Start putting together output # Class denoted in info() output class_string = '\n' # Create the Index info() string by parsing self.index - index_string = self.index.summary() + '\n' - - # A column header is needed in the inf() output - col_header = 'Data columns (total {0} columns):\n' \ - .format(len(self.columns)) - - # Parse the per-partition values to get the per-column details - # Find all the lines in the output that start with integers - prog = re.compile('^[0-9]+.+') - col_lines = [prog.match(line) for line in lines] - cols = [c.group(0) for c in col_lines if c is not None] - # replace the partition columns names with real column names - columns = [ - "{0}\t{1}\n".format(self.columns[i], cols[i].split(" ", 1)[1]) - for i in range(len(cols)) - ] - col_string = ''.join(columns) + '\n' + index_string = index.summary() + '\n' + + if actually_verbose: + if null_counts: + counts = self.count() + # Create string for verbose output + col_string = 'Data columns (total {0} columns):\n' \ + .format(len(columns)) + for col, dtype in zip(columns, dtypes): + col_string += '{0}\t'.format(col) + if null_counts: + col_string += '{0} not-null '.format(counts[col]) + col_string += '{0}\n'.format(dtype) + else: + # Create string for not verbose output + col_string = 'Columns: {0} entries, {1} to {2}\n'\ + .format(len(columns), columns[0], columns[-1]) # A summary of the dtypes in the dataframe dtypes_string = "dtypes: " - for dtype, count in self.dtypes.value_counts().iteritems(): + for dtype, count in dtypes.value_counts().iteritems(): dtypes_string += "{0}({1}),".format(dtype, count) dtypes_string = dtypes_string[:-1] + '\n' - # Compute the memory usage by summing per-partitions return values - # Parse lines for memory usage number - prog = re.compile('^memory+.+') - mems = [prog.match(line) for line in lines] - mem_vals = [ - float(re.search(r'\d+', m.group(0)).group()) for m in mems - if m is not None - ] - - memory_string = "" - - if len(mem_vals) != 0: - # Sum memory usage from each partition - if memory_usage != 'deep': - memory_string = 'memory usage: {0}+ bytes' \ - .format(sum(mem_vals)) + # Create memory usage string + memory_string = '' + if memory_usage: + mem_data = self.memory_usage(deep=memory_usage_deep) + if memory_usage_deep: + memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) else: - memory_string = 'memory usage: {0} bytes'.format(sum(mem_vals)) + memory_string = 'memory usage: {0}+ bytes'.format(mem_data.sum()) # Combine all the components of the info() output result = ''.join([ - class_string, index_string, col_header, col_string, dtypes_string, - memory_string - ]) + class_string, index_string, col_string, dtypes_string, memory_string + ]) # Write to specified output buffer - if buf: - buf.write(result) - else: - sys.stdout.write(result) + buf.write(result) def insert(self, loc, column, value, allow_duplicates=False): """Insert column into DataFrame at specified location. From 4e7addaf6f4da182018e14e7db837563195f639d Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 18:12:49 -0700 Subject: [PATCH 09/14] Updated info and memory_usage to be standalone tests and updated the tests --- modin/pandas/test/test_dataframe.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index e4327b11b46..933d975e0da 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -3,6 +3,7 @@ from __future__ import print_function import pytest +import io import numpy as np import pandas import pandas.util.testing as tm @@ -2091,12 +2092,20 @@ def test_infer_objects(): ray_df.infer_objects() -@pytest.fixture -def test_info(ray_df): - info_string = ray_df.info() - assert '\n' in info_string - info_string = ray_df.info(memory_usage=True) - assert 'memory_usage: ' in info_string +#@pytest.fixture +def test_info(): + ray_df = create_test_dataframe() + with io.StringIO() as buf: + ray_df.info(buf=buf) + info_string = buf.getvalue() + assert '\n' in info_string + assert 'memory usage: ' in info_string + assert 'Data columns (total 5 columns):' in info_string + with io.StringIO() as buf: + ray_df.info(buf=buf, verbose=False, memory_usage=False) + info_string = buf.getvalue() + assert 'memory usage: ' not in info_string + assert 'Columns: 5 entries, col1 to col5' in info_string @pytest.fixture From 8a69de5393f41b430f06b22cb7d1e13dbf2fc0fe Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 20:25:18 -0700 Subject: [PATCH 10/14] Updated info to do only one pass --- modin/pandas/dataframe.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index fce288f4883..b497de43148 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1962,12 +1962,22 @@ def info(self, # Create the Index info() string by parsing self.index index_string = index.summary() + '\n' - if actually_verbose: + # Package everything into one helper + def info_helper(df): + result = pandas.DataFrame() + if memory_usage: + result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) if null_counts: - counts = self.count() + result['count'] = df.count() + return result + helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) + + if actually_verbose: # Create string for verbose output col_string = 'Data columns (total {0} columns):\n' \ .format(len(columns)) + if null_counts: + counts = helper_result['count'] for col, dtype in zip(columns, dtypes): col_string += '{0}\t'.format(col) if null_counts: @@ -1987,7 +1997,7 @@ def info(self, # Create memory usage string memory_string = '' if memory_usage: - mem_data = self.memory_usage(deep=memory_usage_deep) + mem_data = helper_result['memory'] if memory_usage_deep: memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) else: From 0ea925f99c7e7cff5038d7d9b24a5ee426863804 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 22:08:51 -0700 Subject: [PATCH 11/14] Updated info to do everything in one run with DataFrame --- modin/pandas/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index b497de43148..9a257a32818 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1968,7 +1968,7 @@ def info_helper(df): if memory_usage: result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) if null_counts: - result['count'] = df.count() + result['count'] = df.count(axis=0) return result helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) From 8a7b320f36b8b75570a059c66ba1f0c549c99a42 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 22:12:42 -0700 Subject: [PATCH 12/14] Update info to do everything in one run with Series --- modin/pandas/dataframe.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 9a257a32818..7675c028e86 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1964,24 +1964,28 @@ def info(self, # Package everything into one helper def info_helper(df): - result = pandas.DataFrame() + result = pandas.Series() if memory_usage: - result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) + memory = df.memory_usage(index=False, deep=memory_usage_deep) + memory = memory.add_suffix("_memory") + result = result.append(memory) if null_counts: - result['count'] = df.count(axis=0) + count = df.count(axis=0) + count = count.add_suffix("_count") + result = result.append(count) return result helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) + counts = helper_result.filter(regex='_count', axis=0) + mem_data = helper_result.filter(regex='_memory', axis=0) if actually_verbose: # Create string for verbose output col_string = 'Data columns (total {0} columns):\n' \ .format(len(columns)) - if null_counts: - counts = helper_result['count'] for col, dtype in zip(columns, dtypes): col_string += '{0}\t'.format(col) if null_counts: - col_string += '{0} not-null '.format(counts[col]) + col_string += '{0} not-null '.format(counts[col+"_count"]) col_string += '{0}\n'.format(dtype) else: # Create string for not verbose output @@ -1997,7 +2001,6 @@ def info_helper(df): # Create memory usage string memory_string = '' if memory_usage: - mem_data = helper_result['memory'] if memory_usage_deep: memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) else: From 8585f8f5f570ed84aafe6d758f7218b67a4d0a2f Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 22:08:51 -0700 Subject: [PATCH 13/14] Updated info to do everything in one run with DataFrame --- modin/pandas/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index b497de43148..9a257a32818 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1968,7 +1968,7 @@ def info_helper(df): if memory_usage: result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) if null_counts: - result['count'] = df.count() + result['count'] = df.count(axis=0) return result helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) From e27328811230db8a7523a2206de1716a47cf8c0c Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 23:34:13 -0700 Subject: [PATCH 14/14] Updated to get everything working and moved appropriate parts to DataManager --- modin/data_management/data_manager.py | 29 ++++++++++++++++ modin/pandas/dataframe.py | 48 +++++++++++++++++---------- modin/pandas/test/test_dataframe.py | 12 +++++-- 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 770399399f2..55dd28d5098 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -522,6 +522,27 @@ def idxmin_builder(df, **kwargs): # have to do a conversion. return self._post_process_idx_ops(axis, min_result) + def info(self, **kwargs): + def info_builder(df, **kwargs): + result = pandas.DataFrame() + if memory_usage: + result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) + if null_counts: + result['count'] = df.count(axis=0) + return result + + memory_usage = kwargs.get('memory_usage', True) + null_counts = kwargs.get('null_counts', True) + + if type(memory_usage) == str and memory_usage == 'deep': + memory_usage_deep = True + else: + memory_usage_deep = False + + func = self._prepare_method(info_builder, **kwargs) + return self.full_axis_reduce(func, 0) + + def first_valid_index(self): # It may be possible to incrementally check each partition, but this @@ -556,6 +577,14 @@ def median(self, **kwargs): func = self._prepare_method(pandas.DataFrame.median, **kwargs) return self.full_axis_reduce(func, axis) + def memory_usage(self, **kwargs): + def memory_usage_builder(df, **kwargs): + return df.memory_usage(index=False, deep=deep) + + deep = kwargs.get('deep', False) + func = self._prepare_method(memory_usage_builder, **kwargs) + return self.full_axis_reduce(func, 0) + def nunique(self, **kwargs): axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 9a257a32818..11fb03209c2 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1962,26 +1962,31 @@ def info(self, # Create the Index info() string by parsing self.index index_string = index.summary() + '\n' - # Package everything into one helper - def info_helper(df): - result = pandas.DataFrame() - if memory_usage: - result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) + if memory_usage or null_counts: + results_data = self._data_manager.info( + verbose=actually_verbose, + buf=buf, + max_cols=max_cols, + memory_usage=memory_usage, + null_counts=null_counts + ) if null_counts: - result['count'] = df.count(axis=0) - return result - helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) + # For some reason, the counts table has a shape of (columns, columns) + counts = results_data['count'] + counts.columns = columns + if memory_usage: + # For some reason, the memory table has a shape of (columns, columns) + # but it doesn't matter because the cells not on the diagonal are NaN + memory_usage_data = results_data['memory'].sum() + index.memory_usage(deep=memory_usage_deep) if actually_verbose: # Create string for verbose output col_string = 'Data columns (total {0} columns):\n' \ .format(len(columns)) - if null_counts: - counts = helper_result['count'] for col, dtype in zip(columns, dtypes): col_string += '{0}\t'.format(col) if null_counts: - col_string += '{0} not-null '.format(counts[col]) + col_string += '{0} not-null '.format(counts.loc[col, col]) col_string += '{0}\n'.format(dtype) else: # Create string for not verbose output @@ -1997,11 +2002,10 @@ def info_helper(df): # Create memory usage string memory_string = '' if memory_usage: - mem_data = helper_result['memory'] if memory_usage_deep: - memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) + memory_string = 'memory usage: {0} bytes'.format(memory_usage_data) else: - memory_string = 'memory usage: {0}+ bytes'.format(mem_data.sum()) + memory_string = 'memory usage: {0}+ bytes'.format(memory_usage_data) # Combine all the components of the info() output result = ''.join([ @@ -2358,10 +2362,20 @@ def melt(self, "github.com/modin-project/modin.") def memory_usage(self, index=True, deep=False): - def remote_func(df): - return df.memory_usage(index=False, deep=deep) + """Returns the memory usage of each column in bytes - result = self._data_manager.full_reduce(axis=0, map_func=remote_func) + Args: + index (bool): Whether to include the memory usage of the DataFrame's + index in returned Series. Defaults to True + deep (bool): If True, introspect the data deeply by interrogating + objects dtypes for system-level memory consumption. Defaults to False + + Returns: + A Series where the index are the column names and the values are + the memory usage of each of the columns in bytes. If `index=true`, + then the first value of the Series will be 'Index' with its memory usage. + """ + result = self._data_manager.memory_usage(index=index, deep=deep) result.index = self.columns if index: diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 933d975e0da..8fa5ca258e8 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -2094,18 +2094,24 @@ def test_infer_objects(): #@pytest.fixture def test_info(): - ray_df = create_test_dataframe() + ray_df = pd.DataFrame({ + 'col1': [1, 2, 3, np.nan], + 'col2': [4, 5, np.nan, 7], + 'col3': [8, np.nan, 10, 11], + 'col4': [np.nan, 13, 14, 15] + }) + ray_df.info(memory_usage='deep') with io.StringIO() as buf: ray_df.info(buf=buf) info_string = buf.getvalue() assert '\n' in info_string assert 'memory usage: ' in info_string - assert 'Data columns (total 5 columns):' in info_string + assert 'Data columns (total 4 columns):' in info_string with io.StringIO() as buf: ray_df.info(buf=buf, verbose=False, memory_usage=False) info_string = buf.getvalue() assert 'memory usage: ' not in info_string - assert 'Columns: 5 entries, col1 to col5' in info_string + assert 'Columns: 4 entries, col1 to col4' in info_string @pytest.fixture