Skip to content

Commit

Permalink
FIX-#2596: Update pandas version to 1.2.1 (#2600)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexey Prutskov <[email protected]>
Co-authored-by: Devin Petersohn <[email protected]>
Co-authored-by: Dmitry Chigarev <[email protected]>
Co-authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
5 people authored Feb 9, 2021
1 parent 8a50c4a commit 5cb3283
Show file tree
Hide file tree
Showing 26 changed files with 512 additions and 257 deletions.
4 changes: 2 additions & 2 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: modin
channels:
- conda-forge
dependencies:
- pandas==1.1.5
- pandas==1.2.1
- numpy>=1.16.5,<1.20 # pandas gh-39513
- pyarrow>=1.0.0
- dask[complete]>=2.12.0,<=2.19.0
Expand All @@ -16,7 +16,7 @@ dependencies:
- feather-format
- lxml
- openpyxl
- xlrd<=1.2.0
- xlrd
- matplotlib<=3.2.2
- sqlalchemy
- pandas-gbq
Expand Down
11 changes: 5 additions & 6 deletions modin/backends/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ def parse(fname, **kwargs):
from openpyxl.worksheet.worksheet import Worksheet
from pandas.core.dtypes.common import is_list_like
from pandas.io.excel._util import (
_fill_mi_header,
_maybe_convert_usecols,
fill_mi_header,
maybe_convert_usecols,
)
from pandas.io.parsers import TextParser
import re
Expand Down Expand Up @@ -308,7 +308,7 @@ def update_row_nums(match):
# Attach cells to worksheet object
reader.bind_cells()
data = PandasExcelParser.get_sheet_data(ws, kwargs.pop("convert_float", True))
usecols = _maybe_convert_usecols(kwargs.pop("usecols", None))
usecols = maybe_convert_usecols(kwargs.pop("usecols", None))
header = kwargs.pop("header", 0)
index_col = kwargs.pop("index_col", None)
# skiprows is handled externally
Expand All @@ -321,7 +321,7 @@ def update_row_nums(match):
control_row = [True] * len(data[0])

for row in header:
data[row], control_row = _fill_mi_header(data[row], control_row)
data[row], control_row = fill_mi_header(data[row], control_row)
# Handle MultiIndex for row Index if necessary
if is_list_like(index_col):
# Forward fill values for MultiIndex index.
Expand All @@ -339,7 +339,6 @@ def update_row_nums(match):
data[row][col] = last
else:
last = data[row][col]

parser = TextParser(
data,
header=header,
Expand All @@ -352,7 +351,7 @@ def update_row_nums(match):
# In excel if you create a row with only a border (no values), this parser will
# interpret that as a row of NaN values. Pandas discards these values, so we
# also must discard these values.
pandas_df = parser.read().dropna(how="all")
pandas_df = parser.read()
# Since we know the number of rows that occur before this partition, we can
# correctly assign the index in cases of RangeIndex. If it is not a RangeIndex,
# the index is already correct because it came from the data.
Expand Down
47 changes: 40 additions & 7 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,6 @@ def is_monotonic_increasing(df):
return self.default_to_pandas(is_monotonic_increasing)

count = MapReduceFunction.register(pandas.DataFrame.count, pandas.DataFrame.sum)
max = MapReduceFunction.register(pandas.DataFrame.max)
min = MapReduceFunction.register(pandas.DataFrame.min)
sum = MapReduceFunction.register(pandas.DataFrame.sum)
prod = MapReduceFunction.register(pandas.DataFrame.prod)
any = MapReduceFunction.register(pandas.DataFrame.any, pandas.DataFrame.any)
Expand All @@ -662,6 +660,34 @@ def is_monotonic_increasing(df):
axis=0,
)

def max(self, axis, **kwargs):
def map_func(df, **kwargs):
return pandas.DataFrame.max(df, **kwargs)

def reduce_func(df, **kwargs):
if kwargs.get("numeric_only", False):
kwargs = kwargs.copy()
kwargs["numeric_only"] = False
return pandas.DataFrame.max(df, **kwargs)

return MapReduceFunction.register(map_func, reduce_func)(
self, axis=axis, **kwargs
)

def min(self, axis, **kwargs):
def map_func(df, **kwargs):
return pandas.DataFrame.min(df, **kwargs)

def reduce_func(df, **kwargs):
if kwargs.get("numeric_only", False):
kwargs = kwargs.copy()
kwargs["numeric_only"] = False
return pandas.DataFrame.min(df, **kwargs)

return MapReduceFunction.register(map_func, reduce_func)(
self, axis=axis, **kwargs
)

def mean(self, axis, **kwargs):
if kwargs.get("level") is not None:
return self.default_to_pandas(pandas.DataFrame.mean, axis=axis, **kwargs)
Expand Down Expand Up @@ -2530,11 +2556,18 @@ def compute_groupby(df, drop=False, partition_idx=0):
for x in df[internal_by_cols].dtypes
)

cols_to_insert = (
internal_by_cols.intersection(result_cols)
if keep_index_levels
else internal_by_cols.difference(result_cols)
)
if internal_by_cols.nlevels != result_cols.nlevels:
cols_to_insert = (
pandas.Index([])
if keep_index_levels
else internal_by_cols.copy()
)
else:
cols_to_insert = (
internal_by_cols.intersection(result_cols)
if keep_index_levels
else internal_by_cols.difference(result_cols)
)

if keep_index_levels:
result.drop(
Expand Down
3 changes: 1 addition & 2 deletions modin/data_management/functions/binary_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ def caller(query_compiler, other, *args, **kwargs):
)
else:
if isinstance(other, (list, np.ndarray, pandas.Series)):
new_columns = query_compiler.columns
new_modin_frame = query_compiler._modin_frame._apply_full_axis(
axis,
lambda df: func(df, other, *args, **kwargs),
new_index=query_compiler.index,
new_columns=new_columns,
new_columns=query_compiler.columns,
)
else:
new_modin_frame = query_compiler._modin_frame._map(
Expand Down
5 changes: 4 additions & 1 deletion modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ def reduce(
drop=False,
**kwargs,
):
by_part = list(df.index.names)
# Wrapping names into an Index should be unnecessary, however
# there is a bug in pandas with intersection that forces us to do so:
# https://github.com/pandas-dev/pandas/issues/39699
by_part = pandas.Index(df.index.names)
if drop and len(df.columns.intersection(by_part)) > 0:
df.drop(columns=by_part, errors="ignore", inplace=True)

Expand Down
29 changes: 23 additions & 6 deletions modin/engines/base/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def from_arrow(cls, at):
return cls.query_compiler_cls.from_arrow(at, cls.frame_cls)

@classmethod
def read_parquet(cls, path, engine, columns, **kwargs):
def read_parquet(cls, path, engine, columns, use_nullable_dtypes, **kwargs):
"""Load a parquet object from the file path, returning a Modin DataFrame.
Modin only supports pyarrow engine for now.
Expand All @@ -51,7 +51,9 @@ def read_parquet(cls, path, engine, columns, **kwargs):
https://arrow.apache.org/docs/python/parquet.html
"""
ErrorMessage.default_to_pandas("`read_parquet`")
return cls.from_pandas(pandas.read_parquet(path, engine, columns, **kwargs))
return cls.from_pandas(
pandas.read_parquet(path, engine, columns, use_nullable_dtypes, **kwargs)
)

@classmethod
def read_csv(
Expand Down Expand Up @@ -105,6 +107,7 @@ def read_csv(
low_memory=True,
memory_map=False,
float_precision=None,
storage_options=None,
):
kwargs = {
"filepath_or_buffer": filepath_or_buffer,
Expand Down Expand Up @@ -156,6 +159,7 @@ def read_csv(
"low_memory": low_memory,
"memory_map": memory_map,
"float_precision": float_precision,
"storage_options": storage_options,
}
ErrorMessage.default_to_pandas("`read_csv`")
return cls._read(**kwargs)
Expand Down Expand Up @@ -199,6 +203,7 @@ def read_json(
chunksize=None,
compression="infer",
nrows: Optional[int] = None,
storage_options=None,
):
ErrorMessage.default_to_pandas("`read_json`")
kwargs = {
Expand All @@ -217,6 +222,7 @@ def read_json(
"chunksize": chunksize,
"compression": compression,
"nrows": nrows,
"storage_options": storage_options,
}
return cls.from_pandas(pandas.read_json(**kwargs))

Expand Down Expand Up @@ -407,10 +413,15 @@ def read_hdf(
)

@classmethod
def read_feather(cls, path, columns=None, use_threads=True):
def read_feather(cls, path, columns=None, use_threads=True, storage_options=None):
ErrorMessage.default_to_pandas("`read_feather`")
return cls.from_pandas(
pandas.read_feather(path, columns=columns, use_threads=use_threads)
pandas.read_feather(
path,
columns=columns,
use_threads=use_threads,
storage_options=storage_options,
)
)

@classmethod
Expand All @@ -426,6 +437,7 @@ def read_stata(
order_categoricals=True,
chunksize=None,
iterator=False,
storage_options=None,
):
ErrorMessage.default_to_pandas("`read_stata`")
kwargs = {
Expand All @@ -439,6 +451,7 @@ def read_stata(
"order_categoricals": order_categoricals,
"chunksize": chunksize,
"iterator": iterator,
"storage_options": storage_options,
}
return cls.from_pandas(pandas.read_stata(**kwargs))

Expand All @@ -465,10 +478,14 @@ def read_sas(
)

@classmethod
def read_pickle(cls, filepath_or_buffer, compression="infer"):
def read_pickle(cls, filepath_or_buffer, compression="infer", storage_options=None):
ErrorMessage.default_to_pandas("`read_pickle`")
return cls.from_pandas(
pandas.read_pickle(filepath_or_buffer, compression=compression)
pandas.read_pickle(
filepath_or_buffer,
compression=compression,
storage_options=storage_options,
)
)

@classmethod
Expand Down
19 changes: 17 additions & 2 deletions modin/engines/base/io/text/excel_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _read(cls, io, **kwargs):
kwargs["fname"] = io
# Skiprows will be used to inform a partition how many rows come before it.
kwargs["skiprows"] = 0
row_count = 0
rows_to_skip = 0
data_ids = []
index_ids = []
dtypes_ids = []
Expand All @@ -168,7 +168,7 @@ def _read(cls, io, **kwargs):

while f.tell() < total_bytes:
args = kwargs
args["skiprows"] = row_count + args["skiprows"]
args["skiprows"] = rows_to_skip
args["start"] = f.tell()
chunk = f.read(chunk_size)
# This edge case can happen when we have reached the end of the data
Expand All @@ -190,6 +190,21 @@ def _read(cls, io, **kwargs):
# If there is no data, exit before triggering computation.
if b"</row>" not in chunk and b"</sheetData>" in chunk:
break
# We need to make sure we include all rows, even those that have no
# data. Getting the number of the last row will turn into the number of
# skipped rows, so if there are any rows missing between the last row
# seen here and the first row the next partition reads, the parser will
# have to include those rows in that specific partition to match the
# expected behavior. We subtract 1 here because the header is included
# in the skip values, and we do not want to skip the header.
rows_to_skip = (
int(
chunk[: last_index + len(row_close_tag)]
.split(b'<row r="')[-1]
.split(b'"')[0]
)
- 1
)
remote_results_list = cls.deploy(cls.parse, num_splits + 2, args)
data_ids.append(remote_results_list[:-2])
index_ids.append(remote_results_list[-2])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
)

from collections import abc
from pandas.core.dtypes.common import _get_dtype
from pandas.core.dtypes.common import get_dtype


class CalciteBuilder:
Expand Down Expand Up @@ -94,7 +94,7 @@ def gen_agg_exprs(self):

def gen_reduce_expr(self):
count_expr = self._builder._ref(self._arg.modin_frame, self._count_name)
count_expr._dtype = _get_dtype(int)
count_expr._dtype = get_dtype(int)
sum_expr = self._builder._ref(self._arg.modin_frame, self._sum_name)
sum_expr._dtype = self._sum_dtype
qsum_expr = self._builder._ref(self._arg.modin_frame, self._quad_sum_name)
Expand Down Expand Up @@ -161,7 +161,7 @@ def gen_agg_exprs(self):

def gen_reduce_expr(self):
count_expr = self._builder._ref(self._arg.modin_frame, self._count_name)
count_expr._dtype = _get_dtype(int)
count_expr._dtype = get_dtype(int)
sum_expr = self._builder._ref(self._arg.modin_frame, self._sum_name)
sum_expr._dtype = self._sum_dtype
qsum_expr = self._builder._ref(self._arg.modin_frame, self._quad_sum_name)
Expand Down Expand Up @@ -473,7 +473,7 @@ def _process_join(self, op):
""" Join, only equal-join supported """
cmps = [self._ref(left, c).eq(self._ref(right, c)) for c in op.on]
if len(cmps) > 1:
condition = OpExpr("AND", cmps, _get_dtype(bool))
condition = OpExpr("AND", cmps, get_dtype(bool))
else:
condition = cmps[0]
node = CalciteJoinNode(
Expand Down
14 changes: 7 additions & 7 deletions modin/experimental/engines/omnisci_on_ray/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .partition_manager import OmnisciOnRayFrameManager

from pandas.core.index import ensure_index, Index, MultiIndex, RangeIndex
from pandas.core.dtypes.common import _get_dtype, is_list_like, is_bool_dtype
from pandas.core.dtypes.common import get_dtype, is_list_like, is_bool_dtype
from modin.error_message import ErrorMessage
import pandas as pd

Expand Down Expand Up @@ -143,7 +143,7 @@ def __init__(
def id_str(self):
return f"frame${self.id}"

def _get_dtype(self, col):
def get_dtype(self, col):
# If we search for an index column type in a MultiIndex then we need to
# extend index column names to tuples.
if isinstance(self._dtypes, MultiIndex) and not isinstance(col, tuple):
Expand All @@ -152,8 +152,8 @@ def _get_dtype(self, col):

def ref(self, col):
if col == "__rowid__":
return InputRefExpr(self, col, _get_dtype(int))
return InputRefExpr(self, col, self._get_dtype(col))
return InputRefExpr(self, col, get_dtype(int))
return InputRefExpr(self, col, self.get_dtype(col))

def mask(
self,
Expand Down Expand Up @@ -604,7 +604,7 @@ def _union_all(
assert index_width == 1, "unexpected index width"
aligned_index = ["__index__"]
exprs["__index__"] = frame.ref("__rowid__")
aligned_index_dtypes = [_get_dtype(int)]
aligned_index_dtypes = [get_dtype(int)]
uses_rowid = True
aligned_dtypes = aligned_index_dtypes + new_dtypes
else:
Expand Down Expand Up @@ -781,10 +781,10 @@ def cat_codes(self):
col = self.columns[-1]
exprs = self._index_exprs()
col_expr = self.ref(col)
code_expr = OpExpr("KEY_FOR_STRING", [col_expr], _get_dtype("int32"))
code_expr = OpExpr("KEY_FOR_STRING", [col_expr], get_dtype("int32"))
null_val = LiteralExpr(np.int32(-1))
exprs[col] = build_if_then_else(
col_expr.is_null(), null_val, code_expr, _get_dtype("int32")
col_expr.is_null(), null_val, code_expr, get_dtype("int32")
)

return self.__constructor__(
Expand Down
Loading

0 comments on commit 5cb3283

Please sign in to comment.