Skip to content

Commit

Permalink
Fix case
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Nov 2, 2023
1 parent e9b79da commit b55771d
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 34 deletions.
2 changes: 1 addition & 1 deletion mars/dataframe/base/bloom_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _build_dataframe_filter(cls, in_data: pd.DataFrame, op: "DataFrameBloomFilte
def _convert_to_hashable_dtypes(cls, dtypes: pd.Series):
dtypes = dict(
(name, dtype) if np.issubdtype(dtype, int) else (name, str)
for name, dtype in dtypes.iteritems()
for name, dtype in dtypes.items()
)
return dtypes

Expand Down
41 changes: 29 additions & 12 deletions mars/dataframe/reduction/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ def _calc_result_shape(self, df):
self.output_types = [OutputType.scalar]
return np.array(result_df).dtype, None, 0

def _get_reduced_dim_unit(self, in_ndim, out_ndim):
"""
If rows can be reduced into multiple columns, return nan,
otherwise returns 1
"""
if not isinstance(self.raw_func, str) and isinstance(self.raw_func, Iterable):
return 1
return 1 if in_ndim != out_ndim else np.nan

def __call__(self, df, output_type=None, dtypes=None, index=None):
self._output_types = df.op.output_types
normalize_reduction_funcs(self, ndim=df.ndim)
Expand All @@ -154,9 +163,7 @@ def __call__(self, df, output_type=None, dtypes=None, index=None):
else:
out_ndim = 0

reduced_len = (
1 if df.ndim != out_ndim or isinstance(self.raw_func, list) else np.nan
)
reduced_len = self._get_reduced_dim_unit(df.ndim, out_ndim)
if self.output_types[0] == OutputType.dataframe:
if self.axis == 0:
new_shape = (len(index) * reduced_len, len(dtypes))
Expand Down Expand Up @@ -213,7 +220,7 @@ def _safe_append(d, key, val):
@classmethod
def _gen_map_chunks(
cls,
op,
op: "DataFrameAggregate",
in_df,
out_df,
func_infos: List[ReductionSteps],
Expand All @@ -232,9 +239,7 @@ def _gen_map_chunks(

agg_chunks = np.empty(agg_chunks_shape, dtype=object)
dtypes_cache = dict()
reduced_len = (
1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan
)
reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim)
for chunk in in_df.chunks:
input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0
if input_index not in input_index_to_output:
Expand Down Expand Up @@ -438,9 +443,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
chunks = cls._gen_map_chunks(
op, in_df, out_df, axis_func_infos, input_index_to_output
)
reduced_len = (
1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan
)
reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim)
while chunks.shape[axis] > combine_size:
if axis == 0:
new_chunks_shape = (
Expand Down Expand Up @@ -715,6 +718,8 @@ def _do_predefined_agg(cls, op: "DataFrameAggregate", input_obj, func_name, kwds
raise NotImplementedError("numeric_only not implemented under cudf")
if isinstance(input_obj, pd.Index):
kwds.pop("skipna", None)
if getattr(input_obj, "ndim", 0) > 1:
kwds["axis"] = op.axis
return getattr(input_obj, func_name)(**kwds)

@classmethod
Expand Down Expand Up @@ -865,7 +870,19 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"):
ser_index = None
if agg_series_ndim < out.ndim:
ser_index = [func_name]
aggs.append(cls._wrap_df(op, agg_series, index=ser_index))
if (
isinstance(agg_series, np.ndarray)
and getattr(func_inputs[0], "ndim", 0) >= 1
and hasattr(func_inputs[0], "index")
):
agg_series = cls._wrap_df(op, agg_series, index=ser_index)
if op.axis == 0:
agg_series.columns = func_inputs[0].index
else:
agg_series.index = func_inputs[0].index
else:
agg_series = cls._wrap_df(op, agg_series, index=ser_index)
aggs.append(agg_series)

# concatenate to produce final result
concat_df = xdf.concat(aggs, axis=axis)
Expand Down Expand Up @@ -931,7 +948,7 @@ def execute(cls, ctx, op: "DataFrameAggregate"):
):
result = op.func[0](in_data)
else:
result = in_data.agg(op.raw_func, axis=op.axis)
result = in_data.agg(op.raw_func, axis=op.axis, **op.raw_func_kw)
if op.outputs[0].ndim == 1:
result = result.astype(op.outputs[0].dtype, copy=False)

Expand Down
14 changes: 9 additions & 5 deletions mars/dataframe/reduction/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ def __init__(
self._numeric_only = numeric_only
self._dropna = dropna

@staticmethod
def _explode_dict_series(s: pd.Series) -> pd.DataFrame:
exploded = s.apply(pd.Series)
# if exploded.columns.hasnans:
return exploded

def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
xdf = cudf if self.is_gpu() else pd
if isinstance(in_data, xdf.Series):
return in_data.value_counts(dropna=self._dropna)
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
for d, v in in_data.items():
data[d] = [v.value_counts(dropna=self._dropna).to_dict()]
df = xdf.DataFrame(data)
else:
Expand All @@ -64,7 +70,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
for d, v in in_data.items():
data[d] = [v.apply(pd.Series).sum().to_dict()]
df = xdf.DataFrame(data)
else:
Expand All @@ -85,9 +91,7 @@ def _handle_series(s):
if isinstance(in_data, xdf.Series):
return _handle_series(in_data)
else:
in_data_iter = (
in_data.iteritems() if self._axis == 0 else in_data.iterrows()
)
in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows()
s_list = []
for d, v in in_data_iter:
if isinstance(v.dtype, ArrowListDtype):
Expand Down
8 changes: 3 additions & 5 deletions mars/dataframe/reduction/nunique.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
for d, v in in_data.items():
if not self._use_arrow_dtype or xdf is cudf:
data[d] = [v.drop_duplicates().to_list()]
else:
Expand All @@ -82,7 +82,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
else:
if self._axis == 0:
data = dict()
for d, v in in_data.iteritems():
for d, v in in_data.items():
if not self._use_arrow_dtype or xdf is cudf:
data[d] = [v.explode().drop_duplicates().to_list()]
else:
Expand All @@ -103,9 +103,7 @@ def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
if isinstance(in_data, xdf.Series):
return in_data.explode().nunique(dropna=self._dropna)
else:
in_data_iter = (
in_data.iteritems() if self._axis == 0 else in_data.iterrows()
)
in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows()
data = dict()
for d, v in in_data_iter:
if isinstance(v.dtype, ArrowListDtype):
Expand Down
35 changes: 28 additions & 7 deletions mars/dataframe/reduction/tests/test_reduction_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
cp = lazy_import("cupy", rename="cp")
_agg_size_as_series = pd_release_version >= (1, 3)
_support_kw_agg = pd_release_version >= (1, 1)
_drop_level_reduction = pd_release_version >= (2, 0)


@pytest.fixture
Expand Down Expand Up @@ -119,6 +120,9 @@ def compute(data, **kwargs):
np.testing.assert_equal(r.execute().fetch(), compute(data))


@pytest.mark.skipif(
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
)
@pytest.mark.parametrize("func_name,func_opts", reduction_functions)
def test_series_level_reduction(setup, func_name, func_opts: FunctionOptions):
def compute(data, **kwargs):
Expand Down Expand Up @@ -162,6 +166,9 @@ def compute(data, **kwargs):
)


@pytest.mark.skipif(
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
)
@pytest.mark.parametrize("func_name,func_opts", reduction_functions)
def test_dataframe_reduction(
setup, check_ref_counts, func_name, func_opts: FunctionOptions
Expand Down Expand Up @@ -255,6 +262,9 @@ def compute(data, **kwargs):
)


@pytest.mark.skipif(
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
)
@pytest.mark.parametrize("func_name,func_opts", reduction_functions)
def test_dataframe_level_reduction(
setup, check_ref_counts, func_name, func_opts: FunctionOptions
Expand Down Expand Up @@ -403,6 +413,9 @@ def compute(data, **kwargs):
assert r.execute().fetch() is True


@pytest.mark.skipif(
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
)
@pytest.mark.parametrize("func_name", bool_reduction_functions)
def test_series_bool_level_reduction(setup, check_ref_counts, func_name):
def compute(data, **kwargs):
Expand Down Expand Up @@ -510,6 +523,9 @@ def compute(data, **kwargs):
)


@pytest.mark.skipif(
_drop_level_reduction, reason="Level reduction not supported for pandas>=2.0"
)
@pytest.mark.parametrize("func_name", bool_reduction_functions)
def test_dataframe_bool_level_reduction(setup, check_ref_counts, func_name):
def compute(data, **kwargs):
Expand Down Expand Up @@ -685,18 +701,20 @@ def test_mode(setup, check_ref_counts):
config_kw = {
"extra_config": {
"check_shape": False,
"check_dtypes": False,
"check_columns_value": False,
"check_index_value": False,
}
}
data1 = pd.Series(np.random.randint(0, 5, size=(20,)))

series = md.Series(data1)
result = series.mode().execute().fetch()
result = series.mode().execute(**config_kw).fetch(**config_kw)
expected = data1.mode()
pd.testing.assert_series_equal(result, expected)

series = md.Series(data1, chunk_size=6)
result = series.mode().execute().fetch()
result = series.mode().execute(**config_kw).fetch(**config_kw)
expected = data1.mode()
pd.testing.assert_series_equal(result, expected)

Expand All @@ -705,7 +723,7 @@ def test_mode(setup, check_ref_counts):
data2[[2, 9, 18]] = np.nan

series = md.Series(data2)
result = series.mode().execute().fetch()
result = series.mode().execute(**config_kw).fetch(**config_kw)
expected = data2.mode()
pd.testing.assert_series_equal(result, expected)

Expand All @@ -720,7 +738,7 @@ def test_mode(setup, check_ref_counts):
columns=["c" + str(i) for i in range(20)],
)
df = md.DataFrame(data1)
result = df.mode().execute().fetch()
result = df.mode().execute(**config_kw).fetch(**config_kw)
expected = data1.mode()
pd.testing.assert_frame_equal(result, expected)

Expand All @@ -730,7 +748,7 @@ def test_mode(setup, check_ref_counts):
pd.testing.assert_frame_equal(result, expected)

df = md.DataFrame(data1)
result = df.mode(axis=1).execute().fetch()
result = df.mode(axis=1).execute(**config_kw).fetch(**config_kw)
expected = data1.mode(axis=1)
pd.testing.assert_frame_equal(result, expected)

Expand All @@ -744,7 +762,7 @@ def test_mode(setup, check_ref_counts):
data2.iloc[[2, 9, 18], [2, 9, 18]] = np.nan

df = md.DataFrame(data2)
result = df.mode().execute().fetch()
result = df.mode().execute(**config_kw).fetch(**config_kw)
expected = data2.mode()
pd.testing.assert_frame_equal(result, expected)

Expand Down Expand Up @@ -1008,7 +1026,10 @@ def test_dataframe_aggregate(setup, check_ref_counts):
mean_9=NamedAgg(9, "mean"),
)
result = df.agg(**agg_kw)
pd.testing.assert_frame_equal(result.execute().fetch(), data.agg(**agg_kw))
pd.testing.assert_frame_equal(
result.execute().fetch(extra_config={"check_shape": False}),
data.agg(**agg_kw),
)


def test_series_aggregate(setup, check_ref_counts):
Expand Down
2 changes: 2 additions & 0 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import copy
import itertools
import logging
import json
Expand Down Expand Up @@ -355,6 +356,7 @@ async def _run_in_background(
async def execute(self, *tileables, **kwargs) -> ExecutionInfo:
if self._closed:
raise RuntimeError("Session closed already")
kwargs = copy.deepcopy(kwargs)
fuse_enabled: bool = kwargs.pop("fuse_enabled", None)
extra_config: dict = kwargs.pop("extra_config", None)
warn_duplicated_execution: bool = kwargs.pop("warn_duplicated_execution", False)
Expand Down
2 changes: 2 additions & 0 deletions mars/deploy/oscar/tests/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import copy
import inspect
import os
import uuid
Expand Down Expand Up @@ -62,6 +63,7 @@ def _process_result(self, tileable, result):
return super()._process_result(tileable, result)

async def fetch(self, *tileables, **kwargs):
kwargs = copy.deepcopy(kwargs)
extra_config = kwargs.pop("extra_config", dict())
if kwargs:
unexpected_keys = ", ".join(list(kwargs.keys()))
Expand Down
6 changes: 2 additions & 4 deletions mars/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_iter(setup):
raw_data = pd.DataFrame(np.random.randint(1000, size=(20, 10)))
df = md.DataFrame(raw_data, chunk_size=5)

for col, series in df.iteritems():
for col, series in df.items():
pd.testing.assert_series_equal(series.execute().fetch(), raw_data[col])

for i, batch in enumerate(df.iterbatch(batch_size=15)):
Expand Down Expand Up @@ -331,9 +331,7 @@ def test_iter(setup):
pd.testing.assert_series_equal(batch, raw_data.iloc[i * 15 : (i + 1) * 15])

i = 0
for result_item, expect_item in zip(
s.iteritems(batch_size=15), raw_data.iteritems()
):
for result_item, expect_item in zip(s.items(batch_size=15), raw_data.items()):
assert result_item[0] == expect_item[0]
assert result_item[1] == expect_item[1]
i += 1
Expand Down

0 comments on commit b55771d

Please sign in to comment.