From 20653a4220d91c5f138b309f3edc3305da86f0ca Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Tue, 6 Sep 2022 09:18:19 -0500 Subject: [PATCH] FEAT-#4931: Create a query compiler that can connect to a service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Devin Petersohn Fixes to pass CI + docs for io.py Update implementation Signed-off-by: Devin Petersohn Fix some things Signed-off-by: Devin Petersohn Lint fixes Fix put Signed-off-by: Devin Petersohn Clean up and add new details Signed-off-by: Devin Petersohn Use fsspec to get full path and allow URLs Signed-off-by: Devin Petersohn Add lazy loc Signed-off-by: Devin Petersohn fixes for tests porting more tests more fixes moar fixes Raise exception Signed-off-by: Devin Petersohn Lint fixes Return Python as the default modin engine Handle indexing case for client qc Call fast path for __getitem__ if not lazy Remove user warning for Python-engine fall back Add init Signed-off-by: Devin Petersohn Implement free as a no-op Signed-off-by: Devin Petersohn Add support for replace - client side Fix a couple of issues with Client Signed-off-by: Devin Petersohn Throw errors on to_pandas Signed-off-by: Devin Petersohn Do not default to pandas for str_repeat Add support for 18 datetime functions/properties Fix columns caching when renaming columns Fix test_query: put backticks back for col names Add support for astype -- client side hard coded changes for functions Client support for str_(en/de)code, to_datetime Add all missing query compiler methods. Signed-off-by: mvashishtha Fix getitem_column_array and take_2d. Signed-off-by: mvashishtha Fix getitem_column_array and take_2d. Signed-off-by: mvashishtha Fix again. Signed-off-by: mvashishtha Fix more bugs. Signed-off-by: mvashishtha More fixes. Signed-off-by: mvashishtha Fix more bugs-- pushdown tests test_dates and test_pivot still broken due to service bugs. Signed-off-by: mvashishtha Fix typo. Note drop() broken because service requires you to specify both argument and client QC at base of this PR uses default Nones. Signed-off-by: mvashishtha Add query compiler class. Signed-off-by: mvashishtha Testing a commit Initial changes for adding support for Expanding FEAT Support for rolling.sem FEAT support for Expanding sum, min, max, mean, var, std, count, sem Removing extratenous comment REFACTOR: Remove defaults to pandas at API layer and add some corresponding client QC methods. Signed-off-by: mvashishtha Add more methods. Signed-off-by: mvashishtha Fix expanding. Signed-off-by: mvashishtha Add ewm. Signed-off-by: mvashishtha Revert whitespace. Signed-off-by: mvashishtha Fix to_numpy by making it like to_pandas. Signed-off-by: mvashishtha Remove extra to_numpy. Signed-off-by: mvashishtha Pass kwargs Signed-off-by: mvashishtha Fix DataFrame import for isin. Signed-off-by: mvashishtha Fix again. Signed-off-by: mvashishtha Remove breakpoint Signed-off-by: mvashishtha Tell if series. Signed-off-by: mvashishtha Fix client qc. Signed-off-by: mvashishtha Add self_is_series. Signed-off-by: mvashishtha FIX: Set numeric_only to True in groupby quantile Add some comments Fix str_cat/fullmatch/removeprefix/removesuffix/translate/wrap (#44) * Fix str_cat/fullmatch/removeprefix/removesuffix/translate/wrap * Update modin/core/storage_formats/base/query_compiler.py Co-authored-by: Mahesh Vashishtha * Update modin/pandas/series_utils.py Co-authored-by: Mahesh Vashishtha * Update modin/core/storage_formats/base/query_compiler.py Co-authored-by: Mahesh Vashishtha Co-authored-by: Mahesh Vashishtha FEAT Support expanding.aggregate (#45) Fix at_time and between_time. (#43) Signed-off-by: mvashishtha Signed-off-by: mvashishtha Add QC method for groupby.sem (#47) * FEAT: Add partial support for groupby.sem() * Add sem changes to groupby Fix nlargest and nsmallest Series support (#46) * Fix nlargest and smallest support Signed-off-by: Naren Krishna Remove client query compiler's columnarize. (#48) Signed-off-by: mvashishtha Signed-off-by: mvashishtha Fix info and set memory_usage=False. (#49) Signed-off-by: mvashishtha Signed-off-by: mvashishtha POND-815 fixes for 21 column dataset (#50) * POND-815 fixes for 21 column dataset * Update modin/pandas/base.py Co-authored-by: helmeleegy <40042062+helmeleegy@users.noreply.github.com> --------- Co-authored-by: helmeleegy <40042062+helmeleegy@users.noreply.github.com> Bring in upstream series binary operation fix 6d5545f4a132f0efce02db6… (#52) * Bring in upstream series binary operation fix 6d5545f4a132f0efce02db66a6f5d515d4000812. Signed-off-by: mvashishtha * Update modin/pandas/series.py Co-authored-by: Karthik Velayutham --------- Signed-off-by: mvashishtha Co-authored-by: Karthik Velayutham Support groupby first/last (#53) Signed-off-by: Naren Krishna FEAT: Add initial partial support for groupby.cumcount() (#54) * FEAT: Add partial support for cumcount * Remove the set_index_name * Squeeze the result * Write cumcount name to None * Can't set dtype to int64 Fix resample sum, prod, size (#56) Signed-off-by: Naren Krishna POND-184: fix describe and simplify query compiler interface (#55) * Fix describe Signed-off-by: mvashishtha * Pass datetime_is_numeric. Signed-off-by: mvashishtha --------- Signed-off-by: mvashishtha Fix dt_day_of_week/day_of_year, str_cat/extract/partition/replace/rpartition (#51) * Fix dt_day_of_week/day_of_year, str_partition/replace/rpartition * Fix str_extract Revert "Fix dt_day_of_week/day_of_year, str_cat/extract/partition/replace/rpartition (#51)" (#58) This reverts commit f7a31abe48a47fe1c0642b4c4f59653f96308262. Revert "Revert "Fix dt_day_of_week/day_of_year, str_cat/extract/partition/replace/rpartition (#51)" (#58)" (#60) This reverts commit ad9231d66eefc0bb9eea79e9d08af1c8f312b79b. Add query compiler method for groupby.prod() (#57) Signed-off-by: Naren Krishna FEAT: Add support for groupby.head and groupby.tail (#61) * FEAT: Add support for groupby.head and groupby.tail * Change _change_index FEAT: Add partial support for groupby.nth (#62) FIX: Push first and last down to query compiler. (#64) * FIX: Push first and last down to query compiler. Signed-off-by: mvashishtha * Fix last. Signed-off-by: mvashishtha --------- Signed-off-by: mvashishtha FEAT: Add partial support for groupby.ngroup (#65) * FEAT: Add partial support for groupby.ngroup * Name of result should be none for now Add client support for SeriesGroupby unique, nsmallest, nlargest (#63) * Add client support for SeriesGroupby unique, nsmallest, nlargest Signed-off-by: Naren Krishna --------- Signed-off-by: Naren Krishna Push memory_usage entirely to query compiler [change is not to be upstreamed to Modin] (#66) * Fix dataframe memory usage. Signed-off-by: mvashishtha * Fix series memory_usage() the same way. Signed-off-by: mvashishtha --------- Signed-off-by: mvashishtha FIX: allow updating backend query compilers in place. (#67) * FIX: Mutate client query compiler columns and index in the service. Motivation: Align axis update semantics across query compilers. In the base query compiler and even our service's query compiler, you can update the index and columns in place. However, the service gives no way to update axes of a query compiler. Right now, for inplace updates, service exposes an extra method rename(), and client query compiler uses this to get the id of a new compiler with updated axis, and then updates its id ID of the new query compiler. This change might be the first to make the service present a mutable interface for a backend query compiler. That seems safe to me, except I had to make copy() get a new query compiler copied from the old query compiler, because we can't let updates to the new query compiler change the original (or vice versa). Signed-off-by: mvashishtha * Add a comment. Signed-off-by: mvashishtha --------- Signed-off-by: mvashishtha FEAT replace groupby.fillna with a simpler logic (#68) * FEAT Support expanding.aggregate * Replaced groupby.fillna logic with a simpler one * Fix in groupby.fillna. Work object was causing problems. * Only need to change _check_index_name to _check_index * Removed commented out code. --- check_client_query_compiler_methods.py | 33 + modin/_compat/pandas_api/abc/window.py | 20 + modin/_compat/pandas_api/classes.py | 2 + modin/_compat/pandas_api/latest/__init__.py | 3 +- modin/_compat/pandas_api/latest/series.py | 9 +- modin/_compat/pandas_api/latest/window.py | 24 +- modin/config/envvars.py | 11 +- modin/core/execution/client/__init__.py | 0 modin/core/execution/client/io.py | 124 +++ modin/core/execution/client/query_compiler.py | 979 ++++++++++++++++++ .../dispatching/factories/factories.py | 10 + .../storage_formats/base/query_compiler.py | 284 +++++ .../storage_formats/pandas/query_compiler.py | 53 + modin/pandas/__init__.py | 8 +- modin/pandas/base.py | 407 ++++---- modin/pandas/dataframe.py | 222 +--- modin/pandas/ewm.py | 68 ++ modin/pandas/general.py | 32 +- modin/pandas/groupby.py | 125 ++- modin/pandas/indexing.py | 10 +- modin/pandas/io.py | 6 - modin/pandas/resample.py | 34 +- modin/pandas/series.py | 171 +-- modin/pandas/series_utils.py | 88 +- modin/pandas/window.py | 94 +- modin/utils.py | 2 +- 26 files changed, 2230 insertions(+), 589 deletions(-) create mode 100644 check_client_query_compiler_methods.py create mode 100644 modin/core/execution/client/__init__.py create mode 100644 modin/core/execution/client/io.py create mode 100644 modin/core/execution/client/query_compiler.py create mode 100644 modin/pandas/ewm.py diff --git a/check_client_query_compiler_methods.py b/check_client_query_compiler_methods.py new file mode 100644 index 00000000000..2d0e9870603 --- /dev/null +++ b/check_client_query_compiler_methods.py @@ -0,0 +1,33 @@ +import inspect + +from modin.core.execution.client.query_compiler import ClientQueryCompiler +from modin.core.storage_formats.base import BaseQueryCompiler + + +KNOWN_MISSING = frozenset( + [ + # conj no longer exists in pandas + "conj", + # no need to make service implement get_axis, which I think + # is used internally in a few places. + "get_axis", + # we don't need to forward these two either + "get_index_name", + "get_index_names", + # Base QC can do this for us + "has_multiindex", + # Base QC can do this for us + "is_series_like", + # Let Base QC call drop() for us + "delitem", + ] +) + +print( + "base query compiler methods that are not in client query compiler, but should be:" +) +for name, f in inspect.getmembers(ClientQueryCompiler, predicate=inspect.isfunction): + if name in KNOWN_MISSING: + continue + if f == getattr(BaseQueryCompiler, name, None): + print(name) diff --git a/modin/_compat/pandas_api/abc/window.py b/modin/_compat/pandas_api/abc/window.py index 51699702646..72bacc744b7 100644 --- a/modin/_compat/pandas_api/abc/window.py +++ b/modin/_compat/pandas_api/abc/window.py @@ -56,3 +56,23 @@ def _init(self, dataframe, rolling_args, axis): The axis to build Rolling against. """ pass + +class BaseCompatibleExpanding(ClassLogger): + """Interface for class compatibility layer for Rolling.""" + + def _init(self, dataframe, expanding_args, axis): + """ + Initialize the Rolling object for real. + + Utilize translated potentially pandas-specific arguments. + + Parameters + ---------- + dataframe : DataFrame + The dataframe object to apply rolling functions against. + rolling_args : sequence + The arguments to be passed to .Rolling() except dataframe. + axis : {0, 1} + The axis to build Rolling against. + """ + pass diff --git a/modin/_compat/pandas_api/classes.py b/modin/_compat/pandas_api/classes.py index 63e59d9ccaa..b3786addca7 100644 --- a/modin/_compat/pandas_api/classes.py +++ b/modin/_compat/pandas_api/classes.py @@ -35,6 +35,7 @@ from .latest import LatestCompatibleSeriesGroupBy as SeriesGroupByCompat from .latest import LatestCompatibleWindow as WindowCompat from .latest import LatestCompatibleRolling as RollingCompat + from .latest import LatestCompatibleExpanding as ExpandingCompat __all__ = [ "BasePandasDatasetCompat", @@ -44,4 +45,5 @@ "SeriesGroupByCompat", "WindowCompat", "RollingCompat", + "ExpandingCompat" ] diff --git a/modin/_compat/pandas_api/latest/__init__.py b/modin/_compat/pandas_api/latest/__init__.py index 8e80a367c3d..a18d0623cc4 100644 --- a/modin/_compat/pandas_api/latest/__init__.py +++ b/modin/_compat/pandas_api/latest/__init__.py @@ -15,7 +15,7 @@ from .dataframe import LatestCompatibleDataFrame from .series import LatestCompatibleSeries from .groupby import LatestCompatibleDataFrameGroupBy, LatestCompatibleSeriesGroupBy -from .window import LatestCompatibleWindow, LatestCompatibleRolling +from .window import LatestCompatibleWindow, LatestCompatibleRolling, LatestCompatibleExpanding __all__ = [ "LatestCompatibleBasePandasDataset", @@ -25,4 +25,5 @@ "LatestCompatibleSeriesGroupBy", "LatestCompatibleWindow", "LatestCompatibleRolling", + "LatestCompatibleExpanding" ] diff --git a/modin/_compat/pandas_api/latest/series.py b/modin/_compat/pandas_api/latest/series.py index 816771624cc..f15b5e4cbdf 100644 --- a/modin/_compat/pandas_api/latest/series.py +++ b/modin/_compat/pandas_api/latest/series.py @@ -17,6 +17,7 @@ import numpy as np import pandas +from pandas.io.formats.info import SeriesInfo from pandas.util._validators import validate_bool_kwarg from pandas._libs.lib import no_default, NoDefault from pandas._typing import Axis @@ -44,12 +45,12 @@ def info( memory_usage: "bool | str | None" = None, show_counts: "bool" = True, ): - return self._default_to_pandas( - pandas.Series.info, - verbose=verbose, + # Can't do memory_usage yet + memory_usage = False + return SeriesInfo(self, memory_usage).render( buf=buf, max_cols=max_cols, - memory_usage=memory_usage, + verbose=verbose, show_counts=show_counts, ) diff --git a/modin/_compat/pandas_api/latest/window.py b/modin/_compat/pandas_api/latest/window.py index a1cb3721f9c..fbc81c9fab8 100644 --- a/modin/_compat/pandas_api/latest/window.py +++ b/modin/_compat/pandas_api/latest/window.py @@ -15,7 +15,7 @@ import pandas.core.window.rolling -from ..abc.window import BaseCompatibleWindow, BaseCompatibleRolling +from ..abc.window import BaseCompatibleWindow, BaseCompatibleRolling, BaseCompatibleExpanding from modin.utils import _inherit_docstrings, append_to_docstring @@ -79,3 +79,25 @@ def __init__( ], axis, ) + +@append_to_docstring("Compatibility layer for 'latest pandas' for Expanding.") +@_inherit_docstrings(pandas.core.window.expanding.Expanding) +class LatestCompatibleExpanding(BaseCompatibleExpanding): + def __init__( + self, + dataframe, + min_periods=1, + center=None, + axis=0, + method="single" + ): + self._init( + dataframe, + [ + min_periods, + center, + axis, + method, + ], + axis + ) diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 72c74258d94..c77792cc61a 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -75,7 +75,7 @@ class Engine(EnvironmentVariable, type=str): """Distribution engine to run queries by.""" varname = "MODIN_ENGINE" - choices = ("Ray", "Dask", "Python", "Native") + choices = ("Ray", "Dask", "Python", "Native", "Client") @classmethod def _get_default(cls) -> str: @@ -131,9 +131,10 @@ def _get_default(cls) -> str: pass else: return "Native" - raise ImportError( - "Please refer to installation documentation page to install an engine" - ) + + # If we can't import any other engines we should go ahead and default to Python being + # the default backend engine. + return "Python" class StorageFormat(EnvironmentVariable, type=str): @@ -141,7 +142,7 @@ class StorageFormat(EnvironmentVariable, type=str): varname = "MODIN_STORAGE_FORMAT" default = "Pandas" - choices = ("Pandas", "Hdk", "Pyarrow", "Cudf") + choices = ("Pandas", "Hdk", "Pyarrow", "Cudf", "") class IsExperimental(EnvironmentVariable, type=bool): diff --git a/modin/core/execution/client/__init__.py b/modin/core/execution/client/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/modin/core/execution/client/io.py b/modin/core/execution/client/io.py new file mode 100644 index 00000000000..248af394c93 --- /dev/null +++ b/modin/core/execution/client/io.py @@ -0,0 +1,124 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""The module holds the factory which performs I/O using pandas on a Client.""" + +from modin.core.io.io import BaseIO +import fsspec +from .query_compiler import ClientQueryCompiler + + +class ClientIO(BaseIO): + """Factory providing methods for performing I/O operations using a given Client as the execution engine.""" + + _server_conn = None + _data_conn = None + query_compiler_cls = ClientQueryCompiler + + @classmethod + def set_server_connection(cls, conn): + """ + Set the server connection for the I/O object. + + Parameters + ---------- + conn : Any + Connection object that implements various methods. + """ + cls._server_conn = conn + + @classmethod + def set_data_connection(cls, conn): + """ + Set the data connection for the I/O object. + + Parameters + ---------- + conn : Any + Connection object that is implementation specific. + """ + cls._data_conn = conn + + @classmethod + def read_csv(cls, filepath_or_buffer, **kwargs): + """ + Read CSV data from given filepath or buffer. + + Parameters + ---------- + filepath_or_buffer : str, path object or file-like object + `filepath_or_buffer` parameter of read functions. + **kwargs : dict + Parameters of ``read_csv`` function. + + Returns + ------- + ClientQueryCompiler + Query compiler with CSV data read in. + """ + if isinstance(filepath_or_buffer, str): + filepath_or_buffer = fsspec.open(filepath_or_buffer).full_name + if filepath_or_buffer.startswith("file://"): + # We will do this so that the backend can know whether this + # is a path or a URL. + filepath_or_buffer = filepath_or_buffer[7:] + else: + raise NotImplementedError("Only filepaths are supported for read_csv") + if cls._server_conn is None: + raise ConnectionError( + "Missing server connection, did you initialize the connection?" + ) + return cls.query_compiler_cls( + cls._server_conn.read_csv(cls._data_conn, filepath_or_buffer, **kwargs) + ) + + @classmethod + def read_sql(cls, sql, con, **kwargs): + """ + Read data from a SQL connection. + + Parameters + ---------- + sql : str or SQLAlchemy Selectable (select or text object) + SQL query to be executed or a table name. + con : SQLAlchemy connectable, str, or sqlite3 connection + Using SQLAlchemy makes it possible to use any DB supported by that + library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible + for engine disposal and connection closure for the SQLAlchemy + connectable; str connections are closed automatically. See + `here `_. + **kwargs : dict + Parameters of ``read_sql`` function. + + Returns + ------- + ClientQueryCompiler + Query compiler with data read in from SQL connection. + """ + if isinstance(con, str) and con.lower() == "auto" and cls._data_conn is None: + raise ConnectionError( + "Cannot connect with parameter 'auto' because connection is not set. Did you initialize it?" + ) + if cls._data_conn is None: + cls._data_conn = con + if cls._server_conn is None: + raise ConnectionError( + "Missing server connection, did you initialize the connection?" + ) + return cls.query_compiler_cls( + cls._server_conn.read_sql(sql, cls._data_conn, **kwargs) + ) + + @classmethod + def to_sql(cls, qc, **kwargs): + qc.to_sql(**kwargs) diff --git a/modin/core/execution/client/query_compiler.py b/modin/core/execution/client/query_compiler.py new file mode 100644 index 00000000000..588e2c81bed --- /dev/null +++ b/modin/core/execution/client/query_compiler.py @@ -0,0 +1,979 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler +import numpy as np +import inspect +from pandas._libs.lib import no_default, NoDefault +from pandas.api.types import is_list_like +from pandas.core.computation.parsing import tokenize_string + +from typing import Any + + +class ClientQueryCompiler(BaseQueryCompiler): + @classmethod + def set_server_connection(cls, conn): + cls._service = conn + + def __init__(self, id): + assert ( + id is not None + ), "Make sure the client is properly connected and returns and ID" + if isinstance(id, Exception): + raise id + self._id = id + + def _set_columns(self, new_columns): + # N.B. almost every query compiler method creates a new compiler, but index and + # columns are mutable properties of every query compiler, including the one + # we're using in our service now. The service interface allows updating columns + # and index in place with set_columns() and set_index(). Maybe in the future + # we'd want to cache service results by query compiler ID, in which case + # mutating a backend query compiler would not be allowed. + self._service.set_columns(self._id, new_columns) + self._columns_cache = self._service.columns(self._id) + + def _get_columns(self): + if self._columns_cache is None: + self._columns_cache = self._service.columns(self._id) + return self._columns_cache + + def _set_index(self, new_index): + self._service.set_index(self._id, new_index) + + def _get_index(self): + return self._service.index(self._id) + + columns = property(_get_columns, _set_columns) + _columns_cache = None + index = property(_get_index, _set_index) + _dtypes_cache = None + + @property + def dtypes(self): + if self._dtypes_cache is None: + self._dtypes_cache = self._service.dtypes(self._id) + return self._dtypes_cache + + @classmethod + def from_pandas(cls, df, data_cls): + raise NotImplementedError + + def to_sql( + self, + name, + con, + schema=None, + if_exists="fail", + index=True, + index_label=None, + chunksize=None, + dtype=None, + method=None, + ): + return self._service.to_sql( + self._id, + name, + con, + schema, + if_exists, + index, + index_label, + chunksize, + dtype, + method, + ) + + def to_pandas(self): + value = self._service.to_pandas(self._id) + if isinstance(value, Exception): + raise value + return value + + def to_numpy(self, **kwargs): + value = self._service.to_numpy(self._id, **kwargs) + if isinstance(value, Exception): + raise value + return value + + def default_to_pandas(self, pandas_op, *args, **kwargs): + raise NotImplementedError + + def copy(self): + return self.__constructor__(self._service.copy(self._id)) + + def insert(self, loc, column, value): + if isinstance(value, ClientQueryCompiler): + value = value._id + is_qc = True + else: + is_qc = False + return self.__constructor__( + self._service.insert(self._id, loc, column, value, is_qc) + ) + + def insert_item(self, axis, loc, value, how="inner", replace=False): + value_is_qc = isinstance(value, ClientQueryCompiler) + if value_is_qc: + value = value._id + return self._service.insert_item( + self._id, axis, loc, value, how, replace, value_is_qc + ) + + def setitem(self, axis, key, value): + if isinstance(value, ClientQueryCompiler): + value = value._id + is_qc = True + else: + is_qc = False + return self.__constructor__( + self._service.setitem(self._id, axis, key, value, is_qc) + ) + + def getitem_array(self, key): + if isinstance(key, ClientQueryCompiler): + key = key._id + is_qc = True + else: + is_qc = False + return self.__constructor__(self._service.getitem_array(self._id, key, is_qc)) + + def replace( + self, + to_replace=None, + value=no_default, + inplace=False, + limit=None, + regex=False, + method: "str | NoDefault" = no_default, + ): + if isinstance(to_replace, ClientQueryCompiler): + is_to_replace_qc = True + else: + is_to_replace_qc = False + if isinstance(regex, ClientQueryCompiler): + is_regex_qc = True + else: + is_regex_qc = False + return self.__constructor__( + self._service.replace( + self._id, + to_replace, + value, + inplace, + limit, + regex, + method, + is_to_replace_qc, + is_regex_qc, + ) + ) + + def fillna( + self, + squeeze_self, + squeeze_value, + value=None, + method=None, + axis=None, + inplace=False, + limit=None, + downcast=None, + ): + if isinstance(value, ClientQueryCompiler): + is_qc = True + else: + is_qc = False + return self.__constructor__( + self._service.fillna( + self._id, + squeeze_self, + squeeze_value, + value, + method, + axis, + inplace, + limit, + downcast, + is_qc, + ) + ) + + def concat(self, axis, other, **kwargs): + if is_list_like(other): + other = [o._id for o in other] + else: + other = [other._id] + return self.__constructor__( + self._service.concat(self._id, axis, other, **kwargs) + ) + + def sort_rows_by_column_values( + self, columns, ascending=True, handle_duplicates=None, **kwargs + ): + return self.__constructor__( + self._service.sort_rows_by_column_values( + self._id, columns, ascending=ascending, handle_duplicates=None, **kwargs + ) + ) + + def merge(self, right, **kwargs): + return self.__constructor__(self._service.merge(self._id, right._id, **kwargs)) + + def merge_asof(self, right, **kwargs): + return self.__constructor__( + self._service.merge_asof(self._id, right._id, **kwargs) + ) + + def groupby_mean( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.__constructor__( + self._service.groupby_mean( + self._id, by._id, axis, groupby_kwargs, agg_args, agg_kwargs, drop + ) + ) + + def groupby_count( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.__constructor__( + self._service.groupby_count( + self._id, by._id, axis, groupby_kwargs, agg_args, agg_kwargs, drop + ) + ) + + def groupby_prod( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.__constructor__( + self._service.groupby_prod( + self._id, by._id, axis, groupby_kwargs, agg_args, agg_kwargs, drop + ) + ) + + def groupby_max( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.__constructor__( + self._service.groupby_max( + self._id, by._id, axis, groupby_kwargs, agg_args, agg_kwargs, drop + ) + ) + + def groupby_min( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.__constructor__( + self._service.groupby_min( + self._id, by._id, axis, groupby_kwargs, agg_args, agg_kwargs, drop + ) + ) + + def groupby_sum( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.__constructor__( + self._service.groupby_sum( + self._id, by._id, axis, groupby_kwargs, agg_args, agg_kwargs, drop + ) + ) + + def groupby_agg( + self, + by, + agg_func, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + how="axis_wise", + drop=False, + ): + return self.__constructor__( + self._service.groupby_agg( + self._id, + by._id, + agg_func, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + how, + drop, + ) + ) + + def to_datetime(self, *args, **kwargs): + return self.__constructor__( + self._service.to_datetime(self._id, *args, **kwargs) + ) + + def finalize(self): + raise NotImplementedError + + def free(self): + return + + @classmethod + def from_arrow(cls, at, data_cls): + raise NotImplementedError + + @classmethod + def from_dataframe(cls, df, data_cls): + raise NotImplementedError + + def to_dataframe(self, nan_as_null: bool = False, allow_copy: bool = True): + raise NotImplementedError + + def clip(self, lower, upper, **kwargs): + lower_is_qc = isinstance(lower, type(self)) + upper_is_qc = isinstance(upper, type(self)) + if lower_is_qc: + lower = lower._id + + def isin(self, values, values_is_series: bool, self_is_series: bool): + # isin is unusal because it passes API layer objects to query compiler + # instead of converting them to query compiler objects (Modin issue #3106) + from modin.pandas import DataFrame, Series + + is_qc = isinstance(values, (DataFrame, Series)) + if is_qc: + values = values._query_compiler._id + return self.__constructor__( + self._service.isin( + self._id, values, is_qc, values_is_series, self_is_series + ) + ) + + def where(self, cond, other, **kwargs): + cond_is_qc = isinstance(cond, type(self)) + other_is_qc = isinstance(other, type(self)) + if cond_is_qc: + cond = cond._id + if other_is_qc: + other = other._id + return self.__constructor__( + self._service.where( + self._id, cond, cond_is_qc, other, other_is_qc, **kwargs + ) + ) + + # take_2d is special because service still uses `view`, but modin calls `take_2d` + def take_2d(self, index=None, columns=None): + return self.__constructor__(self._service.view(self._id, index, columns)) + + # The service should define the same default of numeric=False, but it doesn't, + # so we do it here. If we don't define the default here, the service complains + # because it never gets the `numeric` param. + def getitem_column_array(self, key, numeric=False): + return self.__constructor__( + self._service.getitem_column_array(self._id, key, numeric) + ) + + # BUG: cumulative functions are wrong in service. need special treatment here. + # service signature is def exposed_cumsum(self, id, axis, skipna, *args, **kwargs): + # and query compiler container signature is + # def cumsum(self, id, fold_axis, skipna, *args, **kwargs): + # whereas this can take both fold_axis and axis. + # I think we're actually passing axis=fold_axis and skipna=axis and skipna + # as a *arg to the service. + def cummax(self, fold_axis, axis, skipna, *args, **kwargs): + return self.__constructor__( + self._service.cummax(self._id, fold_axis, axis, skipna, *args, **kwargs) + ) + + def cummin(self, fold_axis, axis, skipna, *args, **kwargs): + return self.__constructor__( + self._service.cummin(self._id, fold_axis, axis, skipna, *args, **kwargs) + ) + + def cumsum(self, fold_axis, axis, skipna, *args, **kwargs): + return self.__constructor__( + self._service.cumsum(self._id, fold_axis, axis, skipna, *args, **kwargs) + ) + + def cumprod(self, fold_axis, axis, skipna, *args, **kwargs): + return self.__constructor__( + self._service.cumprod(self._id, fold_axis, axis, skipna, *args, **kwargs) + ) + + # Use this buggy sub which calls rsub because the service expects the bug: + # https://github.com/ponder-org/soda/blob/5aca5483ec24b0fc0bb00a3dcab410da297598b1/pushdown_service/test/snowflake/arithmetic/test_numeric.py#L26-L37 + # need to fix the test in the service. + def sub(self, other, **kwargs): + if isinstance(other, ClientQueryCompiler): + other = other._id + is_qc = True + else: + is_qc = False + return self.__constructor__( + self._service.rsub(self._id, other, is_qc, **kwargs) + ) + + def query(self, expr, **kwargs): + # TODO: Don't need all this; API layer passes local and global vars + # in local_dict and global_dict. But the service is buggy + # and dodesn't use those dicts. So wwe need to keep this for now. + is_variable = False + variable_list = [] + for k, v in tokenize_string(expr): + if v == "" or v == " ": + continue + if is_variable: + frame = inspect.currentframe() + identified = False + while frame: + if v in frame.f_locals: + value = frame.f_locals[v] + if isinstance(value, list): + value = tuple(value) + variable_list.append(str(value)) + identified = True + break + frame = frame.f_back + if not identified: + # TODO this error does not quite match pandas + raise ValueError(f"{v} not found") + is_variable = False + elif v == "@": + is_variable = True + continue + else: + if v in self.columns: + v = f"`{v}`" + variable_list.append(v) + expr = " ".join(variable_list) + return self.__constructor__(self._service.query(self._id, expr, **kwargs)) + + def ewm_cov(self, other=None, *args, **kwargs): + other_is_qc = isinstance(other, type(self)) + if other_is_qc: + other = other._id + return self.__constructor__( + self._service.ewm_cov(self._id, other, other_is_qc, *args, **kwargs) + ) + + def ewm_corr(self, other=None, *args, **kwargs): + other_is_qc = isinstance(other, type(self)) + if other_is_qc: + other = other._id + return self.__constructor__( + self._service.ewm_corr(self._id, other, other_is_qc, *args, **kwargs) + ) + + def expanding_cov(self, other=None, *args, **kwargs): + other_is_qc = isinstance(other, type(self)) + if other_is_qc: + other = other._id + return self.__constructor__( + self._service.expanding_cov(self._id, other, other_is_qc, *args, **kwargs) + ) + + def expanding_corr(self, other=None, *args, **kwargs): + other_is_qc = isinstance(other, type(self)) + if other_is_qc: + other = other._id + return self.__constructor__( + self._service.expanding_corr(self._id, other, other_is_qc, *args, **kwargs) + ) + + def mask(self, cond, other=np.nan, *args, **kwargs): + cond_is_qc = isinstance(cond, type(self)) + if cond_is_qc: + cond = cond._id + other_is_qc = isinstance(other, type(self)) + if other_is_qc: + other = other._id + return self.__constructor__( + self._service.mask( + self._id, cond, cond_is_qc, other, other_is_qc, *args, **kwargs + ) + ) + + +def _set_forwarding_method_for_binary_function(method_name: str) -> None: + """ + Define a binary method that forwards arguments to the service. + Parameters + ---------- + method_name : str + """ + + def forwarding_method( + self: ClientQueryCompiler, + other: Any, + **kwargs, + ): + other_is_qc = isinstance(other, type(self)) + if other_is_qc: + other = other._id + return self.__constructor__( + getattr(self._service, method_name)(self._id, other, other_is_qc, **kwargs) + ) + + setattr(ClientQueryCompiler, method_name, forwarding_method) + + +def _set_forwarding_method_for_single_id(method_name: str) -> None: + """ + Define a method that forwards arguments to the service. + Parameters + ---------- + method_name : str + """ + + def forwarding_method( + self: ClientQueryCompiler, + *args, + **kwargs, + ): + return self.__constructor__( + getattr(self._service, method_name)(self._id, *args, **kwargs) + ) + + setattr(ClientQueryCompiler, method_name, forwarding_method) + + +def _set_forwarding_groupby_method(method_name: str): + """ + Define a groupby method that forwards arguments to the service. + Parameters + ---------- + method_name : str + """ + + def forwarding_method(self, by, *args, **kwargs): + if not isinstance(by, type(self)): + raise NotImplementedError("Must always GroupBy another modin.pandas object") + return self.__constructor__( + getattr(self._service, method_name)(self._id, by._id, *args, **kwargs) + ) + + setattr(ClientQueryCompiler, method_name, forwarding_method) + + +_BINARY_FORWARDING_METHODS = frozenset( + { + "eq", + "lt", + "le", + "gt", + "ge", + "ne", + "__and__", + "__or__", + "add", + "radd", + "truediv", + "rtruediv", + "mod", + "rmod", + "rsub", + "mul", + "rmul", + "floordiv", + "rfloordiv", + "__rand__", + "__ror__", + "__xor__", + "__rxor__", + "pow", + "rpow", + "combine", + "combine_first", + "compare", + "df_update", + "dot", + "join", + "series_update", + "align", + "series_corr", + "divmod", + "reindex_like", + "rdivmod", + "corrwith" "merge_ordered", + } +) + +_SINGLE_ID_FORWARDING_METHODS = frozenset( + { + "abs", + "asfreq", + "transpose", + "getitem_row_array", + "getitem_row_labels_array", + "pivot", + "get_dummies", + "drop", + "isna", + "notna", + "add_prefix", + "add_suffix", + "astype", + "dropna", + "sum", + "prod", + "count", + "mean", + "median", + "std", + "min", + "max", + "any", + "all", + "quantile_for_single_value", + "quantile_for_list_of_values", + "describe", + "set_index_from_columns", + "reset_index", + "sort_rows_by_column_values", + "sort_index", + "dt_nanosecond", + "dt_microsecond", + "dt_second", + "dt_minute", + "dt_hour", + "dt_day", + "dt_day_of_week", + "dt_dayofweek", + "dt_weekday", + "dt_day_name", + "dt_day_of_year", + "dt_dayofyear", + "dt_week", + "dt_weekofyear", + "dt_month", + "dt_month_name", + "dt_quarter", + "dt_year", + "dt_ceil", + "dt_components", + "dt_date", + "dt_days", + "dt_days_in_month", + "dt_daysinmonth", + "dt_end_time", + "dt_floor", + "dt_freq", + "dt_is_leap_year", + "dt_is_month_end", + "dt_is_month_start", + "dt_is_quarter_end", + "dt_is_quarter_start", + "dt_is_year_end", + "dt_is_year_start", + "dt_microseconds", + "dt_nanoseconds", + "dt_normalize", + "dt_qyear", + "dt_round", + "dt_seconds", + "dt_start_time", + "dt_strftime", + "dt_time", + "dt_timetz", + "dt_to_period", + "dt_to_pydatetime", + "dt_to_pytimedelta", + "dt_to_timestamp", + "dt_total_seconds", + "dt_tz", + "dt_tz_convert", + "dt_tz_localize", + "str_capitalize", + "str_isalnum", + "str_isalpha", + "str_isdecimal", + "str_isdigit", + "str_islower", + "str_isnumeric", + "str_isspace", + "str_istitle", + "str_isupper", + "str_len", + "str_lower", + "str_title", + "str_upper", + "str_cat", + "str_center", + "str_contains", + "str_count", + "str_decode", + "str_encode", + "str_endswith", + "str_find", + "str_index", + "str_rfind", + "str_findall", + "str_fullmatch", + "str_get", + "str_join", + "str_lstrip", + "str_ljust", + "str_rjust", + "str_match", + "str_pad", + "str_removeprefix", + "str_removesuffix", + "str_repeat", + "str_split", + "str_rsplit", + "str_rstrip", + "str_slice", + "str_slice_replace", + "str_startswith", + "str_strip", + "str_zfill", + "str_casefold", + "str_getdummies", + "str_extract", + "str_extractall", + "is_monotonic_increasing", + "is_monotonic_decreasing", + "idxmax", + "idxmin", + "apply", + "apply_on_series", + "applymap", + "cat_codes", + "convert_dtypes", + "corr", + "cov", + "diff", + "eval", + "expanding_sum", + "expanding_min", + "expanding_max", + "expanding_mean", + "expanding_var", + "expanding_std", + "expanding_count", + "expanding_sem", + "expanding_count", + "expanding_median", + "expanding_var", + "expanding_skew", + "expanding_kurt", + "expanding_apply", + "expanding_aggregate", + "expanding_quantile", + "expanding_rank", + "explode", + "first_valid_index", + "infer_objects", + "invert", + "kurt", + "last_valid_index", + "mad", + "melt", + "memory_usage", + "mode", + "negative", + "nlargest", + "nsmallest", + "nunique", + "pivot_table", + "rank", + "reindex", + "repeat", + "resample_agg_df", + "resample_agg_ser", + "resample_app_df", + "resample_app_ser", + "resample_asfreq", + "resample_backfill", + "resample_bfill", + "resample_count", + "resample_ffill", + "resample_fillna", + "resample_first", + "resample_get_group", + "resample_interpolate", + "resample_last", + "resample_max", + "resample_mean", + "resample_median", + "resample_min", + "resample_nearest", + "resample_nunique", + "resample_ohlc_df", + "resample_ohlc_ser", + "resample_pad", + "resample_pipe", + "resample_prod", + "resample_quantile", + "resample_sem", + "resample_size", + "resample_std", + "resample_sum", + "resample_transform", + "resample_var", + "rolling_aggregate", + "rolling_apply", + "rolling_corr", + "rolling_count", + "rolling_cov", + "rolling_kurt", + "rolling_max", + "rolling_mean", + "rolling_median", + "rolling_min", + "rolling_quantile", + "rolling_skew", + "rolling_std", + "rolling_sem", + "rolling_sum", + "rolling_var", + "window_mean", + "window_std", + "window_sum", + "window_var", + "round", + "searchsorted", + "series_view", + "sem", + "skew", + "sort_columns_by_row_values", + "stack", + "str___getitem__", + "str_normalize", + "str_partition", + "str_replace", + "str_rindex", + "str_rpartition", + "str_swapcase", + "str_translate", + "str_wrap", + "to_numeric", + "unique", + "unstack", + "var", + "write_items", + "set_index_name", + "set_index_names", + "ewm_mean", + "ewm_sum", + "ewm_std", + "ewm_var", + "pct_change", + "sizeof", + "argsort", + "between", + "factorize", + "dataframe_hist", + "series_hist", + "interpolate", + "nlargest", + "nsmallest", + "swaplevel", + "dataframe_to_dict", + "series_to_dict", + "to_list", + "truncate", + "lookup", + "wide_to_long", + "between_time", + "last", + "first", + } +) + +_GROUPBY_FORWARDING_METHODS = frozenset( + { + "mean", + "count", + "max", + "min", + "sum", + "agg", + "all", + "any", + "size", + "skew", + "cumcount", + "cumsum", + "cummax", + "cummin", + "cumprod", + "head", + "tail", + "nth", + "ngroup", + "std", + "sem", + "rank", + "unique", + "nunique", + "median", + "quantile", + "fillna", + "dtypes", + "shift", + "prod", + "var", + "first", + "last", + "nlargest", + "nsmallest", + } +) + +for method in _BINARY_FORWARDING_METHODS: + _set_forwarding_method_for_binary_function(method) + +for method in _SINGLE_ID_FORWARDING_METHODS: + _set_forwarding_method_for_single_id(method) + +for method in _GROUPBY_FORWARDING_METHODS: + _set_forwarding_groupby_method("groupby_" + method) + +ClientQueryCompiler.prod_min_count = ClientQueryCompiler.prod +ClientQueryCompiler.sum_min_count = ClientQueryCompiler.sum diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index 686832fead0..8c2aa2c5798 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -469,6 +469,16 @@ def prepare(cls): cls.io_cls = PandasOnDaskIO +@doc(_doc_factory_class, execution_name="Client") +class ClientFactory(BaseFactory): + @classmethod + @doc(_doc_factory_prepare_method, io_module_name="`Client`") + def prepare(cls): + from modin.core.execution.client.io import ClientIO + + cls.io_cls = ClientIO + + @doc(_doc_abstract_factory_class, role="experimental") class ExperimentalBaseFactory(BaseFactory): @classmethod diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index b34bfa7eef7..342493aaebe 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -2596,6 +2596,30 @@ def groupby_skew( drop=drop, ) + @doc_utils.doc_groupby_method( + action="compute cumulative count", + result="count of all the previous values", + refer_to="cumcount", + ) + def groupby_cumcount( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="cumcount", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + @doc_utils.doc_groupby_method( action="compute cumulative sum", result="sum of all the previous values", @@ -2714,6 +2738,28 @@ def groupby_std( drop=drop, ) + @doc_utils.doc_groupby_method( + action="compute standard error", result="standard error", refer_to="sem" + ) + def groupby_sem( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="sem", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + @doc_utils.doc_groupby_method( action="compute numerical rank", result="numerical rank", refer_to="rank" ) @@ -2898,6 +2944,150 @@ def groupby_shift( drop=drop, ) + @doc_utils.doc_groupby_method( + action="get first value in group", + result="first value", + refer_to="first", + ) + def groupby_first( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="first", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + + @doc_utils.doc_groupby_method( + action="get last value in group", + result="last value", + refer_to="last", + ) + def groupby_last( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="last", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + + @doc_utils.doc_groupby_method( + action="get first n values of a group", + result="first n values of a group", + refer_to="head", + ) + def groupby_head( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="head", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + + @doc_utils.doc_groupby_method( + action="get last n values in group", + result="last n values", + refer_to="tail", + ) + def groupby_tail( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="tail", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + + @doc_utils.doc_groupby_method( + action="get nth value in group", + result="nth value", + refer_to="nth", + ) + def groupby_nth( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="nth", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + + @doc_utils.doc_groupby_method( + action="get group number of each value", + result="group number of each value", + refer_to="ngroup", + ) + def groupby_ngroup( + self, + by, + axis, + groupby_kwargs, + agg_args, + agg_kwargs, + drop=False, + ): + return self.groupby_agg( + by=by, + agg_func="ngroup", + axis=axis, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=drop, + ) + # END Manual Partitioning methods @doc_utils.add_refer_to("DataFrame.unstack") @@ -4060,12 +4250,29 @@ def resample_var(self, resample_kwargs, ddof, *args, **kwargs): # End of Resample methods + @doc_utils.doc_str_method(refer_to="capitalize", params="") + def str_capitalize(self): + return StrDefault.register(pandas.Series.str.capitalize)(self) + # Str methods @doc_utils.doc_str_method(refer_to="capitalize", params="") def str_capitalize(self): return StrDefault.register(pandas.Series.str.capitalize)(self) + @doc_utils.doc_str_method( + refer_to="cat", + params=""" + others : Series, Index, DataFrame, np.ndarray or list-like + sep : str + na_sep : str + join : {'left', 'right', 'outer', 'inner'}, default: 'left'""", + ) + def str_cat(self, others=None, sep=None, na_rep=None, join="left"): + return StrDefault.register(pandas.Series.str.cat)( + self, others, sep, na_rep, join + ) + @doc_utils.doc_str_method( refer_to="center", params=""" @@ -4130,6 +4337,19 @@ def str_findall(self, pat, flags=0, **kwargs): self, pat, flags, **kwargs ) + @doc_utils.doc_str_method( + refer_to="fullmatch", + params=""" + pat : str + case : bool, default: True + flags : int, default: 0 + na : object, default: None""", + ) + def str_fullmatch(self, pat, case=True, flags=0, na=None): + return StrDefault.register(pandas.Series.str.fullmatch)( + self, pat, case, flags, na + ) + @doc_utils.doc_str_method(refer_to="get", params="i : int") def str_get(self, i): return StrDefault.register(pandas.Series.str.get)(self, i) @@ -4241,6 +4461,14 @@ def str_pad(self, width, side="left", fillchar=" "): def str_partition(self, sep=" ", expand=True): return StrDefault.register(pandas.Series.str.partition)(self, sep, expand) + @doc_utils.doc_str_method(refer_to="removeprefix", params="prefix : str") + def str_removeprefix(self, prefix): + return StrDefault.register(pandas.Series.str.removeprefix)(self, prefix) + + @doc_utils.doc_str_method(refer_to="removesuffix", params="suffix : str") + def str_removesuffix(self, suffix): + return StrDefault.register(pandas.Series.str.removesuffix)(self, suffix) + @doc_utils.doc_str_method(refer_to="repeat", params="repeats : int") def str_repeat(self, repeats): return StrDefault.register(pandas.Series.str.repeat)(self, repeats) @@ -4584,6 +4812,18 @@ def rolling_sum(self, fold_axis, rolling_args, *args, **kwargs): self, rolling_args, *args, **kwargs ) + @doc_utils.doc_window_method( + result="sem", + refer_to="sem", + params=""" + *args : iterable + **kwargs : dict""", + ) + def rolling_sem(self, fold_axis, rolling_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.rolling.Rolling.sem)( + self, rolling_args, *args, **kwargs + ) + @doc_utils.doc_window_method( result="variance", refer_to="var", @@ -4599,6 +4839,50 @@ def rolling_var(self, fold_axis, rolling_args, ddof=1, *args, **kwargs): # End of Rolling methods + # Begin Expanding methods + + def expanding_sum(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.sum)( + self, expanding_args, *args, **kwargs + ) + + def expanding_min(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.min)( + self, expanding_args, *args, **kwargs + ) + + def expanding_max(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.max)( + self, expanding_args, *args, **kwargs + ) + + def expanding_mean(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.mean)( + self, expanding_args, *args, **kwargs + ) + + def expanding_var(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.var)( + self, expanding_args, *args, **kwargs + ) + + def expanding_std(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.std)( + self, expanding_args, *args, **kwargs + ) + + def expanding_count(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.count)( + self, expanding_args, *args, **kwargs + ) + + def expanding_sem(self, fold_axis, expanding_args, *args, **kwargs): + return RollingDefault.register(pandas.core.window.expanding.Expanding.sem)( + self, expanding_args, *args, **kwargs + ) + + # End of Expanding methods + # Window methods @doc_utils.doc_window_method( diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index c56e96b3444..11001da4ee2 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -1054,6 +1054,54 @@ def resample_var(self, resample_kwargs, ddof, *args, **kwargs): def resample_quantile(self, resample_kwargs, q, **kwargs): return self._resample_func(resample_kwargs, "quantile", q=q, **kwargs) + expanding_sum = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).sum(*args, **kwargs) + ) + ) + + expanding_min = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).min(*args, **kwargs) + ) + ) + + expanding_max = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).max(*args, **kwargs) + ) + ) + + expanding_mean = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).mean(*args, **kwargs) + ) + ) + + expanding_var = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).var(*args, **kwargs) + ) + ) + + expanding_std = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).std(*args, **kwargs) + ) + ) + + expanding_count = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).count(*args, **kwargs) + ) + ) + + expanding_sem = Fold.register( + lambda df, expanding_args, *args, **kwargs: pandas.DataFrame( + df.expanding(*expanding_args).sem(*args, **kwargs) + ) + ) + window_mean = Fold.register( lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( df.rolling(*rolling_args).mean(*args, **kwargs) @@ -1082,6 +1130,11 @@ def resample_quantile(self, resample_kwargs, q, **kwargs): df.rolling(*rolling_args).sum(*args, **kwargs) ) ) + rolling_sem = Fold.register( + lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( + df.rolling(*rolling_args).sem(*args, **kwargs) + ) + ) rolling_mean = Fold.register( lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( df.rolling(*rolling_args).mean(*args, **kwargs) diff --git a/modin/pandas/__init__.py b/modin/pandas/__init__.py index 3965e34ae9e..45b85429504 100644 --- a/modin/pandas/__init__.py +++ b/modin/pandas/__init__.py @@ -108,11 +108,12 @@ _is_first_update = {} _NOINIT_ENGINES = { "Python", + "Client", } # engines that don't require initialization, useful for unit tests def _update_engine(publisher: Parameter): - from modin.config import StorageFormat, CpuCount + from modin.config import StorageFormat, CpuCount, Engine from modin.config.envvars import IsExperimental from modin.config.pubsub import ValueSource @@ -130,6 +131,11 @@ def _update_engine(publisher: Parameter): else: is_hdk = False + if Engine.get() == "Client": + if publisher.get_value_source() == ValueSource.DEFAULT: + StorageFormat.put("") + return + if is_hdk and publisher.get_value_source() == ValueSource.DEFAULT: publisher.put("Native") IsExperimental.put(True) diff --git a/modin/pandas/base.py b/modin/pandas/base.py index c68e9dfd7a5..003dff520e9 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -10,13 +10,13 @@ # the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. - """Implement DataFrame/Series public API as pandas does.""" import numpy as np import pandas from pandas.compat import numpy as numpy_compat from pandas.core.common import count_not_none, pipe +from pandas.core.describe import refine_percentiles from pandas.core.dtypes.common import ( is_list_like, is_dict_like, @@ -31,12 +31,14 @@ import pandas.core.resample import pandas.core.generic from pandas.core.indexing import convert_to_index_sliceable -from pandas.util._validators import validate_percentile -from pandas._libs.lib import no_default +from pandas.util._validators import validate_percentile, validate_inclusive +from pandas._libs.lib import no_default, NoDefault +from pandas._libs.tslibs import to_offset from pandas._typing import ( IndexKeyFunc, TimedeltaConvertibleTypes, TimestampConvertibleTypes, + npt, ) import re from typing import Optional, Union, Sequence, Hashable @@ -50,6 +52,7 @@ from modin.config import IsExperimental from modin.logging import disable_logging from modin._compat.pandas_api.classes import BasePandasDatasetCompat +from .window import Expanding # Similar to pandas, sentinel value to use as kwarg in place of None when None has # special meaning and needs to be distinguished from a user explicitly passing None. @@ -149,9 +152,7 @@ def _build_repr_df(self, num_rows, num_cols): A pandas dataset with `num_rows` or fewer rows and `num_cols` or fewer columns. """ # Fast track for empty dataframe. - if len(self.index) == 0 or ( - hasattr(self, "columns") and len(self.columns) == 0 - ): + if len(self.index) == 0 or (len(self._query_compiler.columns) == 0): return pandas.DataFrame( index=self.index, columns=self.columns if hasattr(self, "columns") else None, @@ -175,26 +176,35 @@ def _build_repr_df(self, num_rows, num_cols): if num_rows_for_tail is not None else [] ) - if hasattr(self, "columns"): - if len(self.columns) <= num_cols: - col_indexer = slice(None) - else: - num_cols_for_front = num_cols // 2 + 1 - num_cols_for_back = ( - num_cols_for_front - if len(self.columns) > num_cols - else len(self.columns) - num_cols_for_front - if len(self.columns) - num_cols_for_front >= 0 - else None - ) - col_indexer = list(range(len(self.columns))[:num_cols_for_front]) + ( - list(range(len(self.columns))[-num_cols_for_back:]) - if num_cols_for_back is not None - else [] - ) - indexer = row_indexer, col_indexer + if len(self._query_compiler.columns) <= num_cols: + col_indexer = slice(None) else: - indexer = row_indexer + num_cols_for_front = num_cols // 2 + 1 + num_cols_for_back = ( + num_cols_for_front + if len(self.columns) > num_cols + else len(self.columns) - num_cols_for_front + if len(self.columns) - num_cols_for_front >= 0 + else None + ) + # Scenario: num_cols = 20, len(self.columns) = 21 + # num_cols_for_front works out to 11, num_cols_for_back works out to 11 + # this leads to over lap in the columns between the set from the first + # part of the dataframe and the set from the last part of the dataframe. + # The last column from the front is the same as the first column + # from the back. This leads to specifying the same column twice and + # relational databases don't like it. + if num_cols_for_back is not None: + if num_cols_for_front + num_cols_for_back > len(self.columns): + num_cols_for_back -= ( + num_cols_for_front + num_cols_for_back - len(self.columns) + ) + col_indexer = list(range(len(self.columns))[:num_cols_for_front]) + ( + list(range(len(self.columns))[-num_cols_for_back:]) + if num_cols_for_back is not None + else [] + ) + indexer = row_indexer, col_indexer return self.iloc[indexer]._query_compiler.to_pandas() def _update_inplace(self, new_query_compiler): @@ -369,12 +379,6 @@ def _binary_op(self, op, other, **kwargs): kwargs["axis"] = axis = 1 else: axis = 0 - if kwargs.get("level", None) is not None: - # Broadcast is an internally used argument - kwargs.pop("broadcast", None) - return self._default_to_pandas( - getattr(self._pandas_class, op), other, **kwargs - ) other = self._validate_other(other, axis, dtype_check=True) exclude_list = [ "__add__", @@ -598,10 +602,6 @@ def _aggregate(self, func, *args, **kwargs): if isinstance(func, str): kwargs.pop("is_transform", None) return self._string_function(func, *args, **kwargs) - - # Dictionaries have complex behavior because they can be renamed here. - elif func is None or isinstance(func, dict): - return self._default_to_pandas("agg", func, *args, **kwargs) kwargs.pop("is_transform", None) return self.apply(func, axis=_axis, args=args, **kwargs) @@ -635,7 +635,7 @@ def _string_function(self, func, *args, **kwargs): return f f = getattr(np, func, None) if f is not None: - return self._default_to_pandas("agg", func, *args, **kwargs) + return self.apply(f, *args, **kwargs) raise ValueError("{} is an unknown string function".format(func)) def _get_dtypes(self): @@ -669,18 +669,17 @@ def align( """ Align two objects on their axes with the specified join method. """ - return self._default_to_pandas( - "align", - other, - join=join, - axis=axis, - level=level, - copy=copy, - fill_value=fill_value, - method=method, - limit=limit, - fill_axis=fill_axis, - broadcast_axis=broadcast_axis, + return self._query_compiler.align( + other._query_compiler, + join, + axis, + level, + copy, + fill_value, + method, + limit, + fill_axis, + broadcast_axis, ) def all( @@ -864,14 +863,7 @@ def asfreq( """ Convert time series to specified frequency. """ - return self._default_to_pandas( - "asfreq", - freq, - method=method, - how=how, - normalize=normalize, - fill_value=fill_value, - ) + return self._query_compiler.asfreq(freq, method, how, normalize, fill_value) def asof(self, where, subset=None): # noqa: PR01, RT01, D200 """ @@ -947,19 +939,69 @@ def at_time(self, time, asof=False, axis=None): # noqa: PR01, RT01, D200 """ Select values at particular time of day (e.g., 9:30AM). """ - axis = self._get_axis_number(axis) - idx = self.index if axis == 0 else self.columns - indexer = pandas.Series(index=idx).at_time(time, asof=asof).index - return self.loc[indexer] if axis == 0 else self.loc[:, indexer] + # TODO: before merging with master, add a base query compiler at_time that + # does the old implementation of indexing with a pandas series. + if asof: + # pandas raises NotImplementedError for asof=True, so we do, too. + raise NotImplementedError("'asof' argument is not supported") + return self.between_time( + start_time=time, end_time=time, inclusive="both", axis=axis + ) @_inherit_docstrings( pandas.DataFrame.between_time, apilink="pandas.DataFrame.between_time" ) - def _between_time(self, **kwargs): - axis = self._get_axis_number(kwargs.pop("axis", None)) - idx = self.index if axis == 0 else self.columns - indexer = pandas.Series(index=idx).between_time(**kwargs).index - return self.loc[indexer] if axis == 0 else self.loc[:, indexer] + def _between_time( + self: "BasePandasDataset", + start_time, + end_time, + include_start: "bool_t | NoDefault", + include_end: "bool_t | NoDefault", + inclusive: "str | None", + axis, + ): + # TODO: before merging with master, add a base query compiler between_time that + # does the old implementation of indexing with a pandas series. + + old_include_arg_used = (include_start is not no_default) or ( + include_end is not no_default + ) + + if old_include_arg_used and inclusive is not None: + raise ValueError( + "Deprecated arguments `include_start` and `include_end` " + "cannot be passed if `inclusive` has been given." + ) + elif old_include_arg_used: + warnings.warn( + "`include_start` and `include_end` are deprecated in " + "favour of `inclusive`.", + FutureWarning, + stacklevel=2, + ) + left = True if isinstance(include_start, NoDefault) else include_start + right = True if isinstance(include_end, NoDefault) else include_end + + inc_dict = { + (True, True): "both", + (True, False): "left", + (False, True): "right", + (False, False): "neither", + } + inclusive = inc_dict[(left, right)] + elif inclusive is None: + # On arg removal inclusive can default to "both" + inclusive = "both" + left_inclusive, right_inclusive = validate_inclusive(inclusive) + return self._create_or_update_from_compiler( + self._query_compiler.between_time( + pandas.core.tools.times.to_time(start_time), + pandas.core.tools.times.to_time(end_time), + include_start=left_inclusive, + include_end=right_inclusive, + axis=self._get_axis_number(axis), + ) + ) def bfill( self, axis=None, inplace=False, limit=None, downcast=None @@ -1125,54 +1167,33 @@ def describe( """ Generate descriptive statistics. """ - if include is not None and (isinstance(include, np.dtype) or include != "all"): - if not is_list_like(include): - include = [include] - include = [pandas_dtype(i) if i != np.number else i for i in include] - if not any( - (isinstance(inc, np.dtype) and inc == d) - or ( - not isinstance(inc, np.dtype) - and inc.__subclasscheck__(getattr(np, d.__str__())) - ) - for d in self._get_dtypes() - for inc in include - ): - # This is the error that pandas throws. - raise ValueError("No objects to concatenate") - if exclude is not None: - if not is_list_like(exclude): - exclude = [exclude] - exclude = [pandas_dtype(e) if e != np.number else e for e in exclude] - if all( - (isinstance(exc, np.dtype) and exc == d) - or ( - not isinstance(exc, np.dtype) - and exc.__subclasscheck__(getattr(np, d.__str__())) - ) - for d in self._get_dtypes() - for exc in exclude - ): - # This is the error that pandas throws. - raise ValueError("No objects to concatenate") - if percentiles is not None: - # explicit conversion of `percentiles` to list - percentiles = list(percentiles) - - # get them all to be in [0, 1] - validate_percentile(percentiles) - - # median should always be included - if 0.5 not in percentiles: - percentiles.append(0.5) - percentiles = np.asarray(percentiles) + # copied from pandas.core.describe.describe_ndframe + percentiles = refine_percentiles(percentiles) + data = self + if (include is None) and (exclude is None): + # when some numerics are found, keep only numerics + default_include: list[npt.DTypeLike] = [np.number] + if datetime_is_numeric: + default_include.append("datetime") + data = self.select_dtypes(include=default_include) + if len(data.columns) == 0: + data = self + elif include == "all": + if exclude is not None: + msg = "exclude must be None when include is 'all'" + raise ValueError(msg) + data = self else: - percentiles = np.array([0.25, 0.5, 0.75]) - return self.__constructor__( - query_compiler=self._query_compiler.describe( - percentiles=percentiles, + data = self.select_dtypes( include=include, exclude=exclude, + ) + if data.empty: + # Match pandas error from concatenting empty list of series descriptions. + raise ValueError("No objects to concatenate") + return self.__constructor__( + query_compiler=data._query_compiler.describe( + percentiles=percentiles, datetime_is_numeric=datetime_is_numeric, ) ) @@ -1202,18 +1223,6 @@ def drop( Drop specified labels from `BasePandasDataset`. """ # TODO implement level - if level is not None: - return self._default_to_pandas( - "drop", - labels=labels, - axis=axis, - index=index, - columns=columns, - level=level, - inplace=inplace, - errors=errors, - ) - inplace = self._validate_bool_kwarg(inplace, "inplace") if labels is not None: if index is not None or columns is not None: @@ -1235,6 +1244,10 @@ def drop( elif axes[axis] is not None: if not is_list_like(axes[axis]): axes[axis] = [axes[axis]] + # In case of lazy execution we should bypass these error checking components + # because they can force the materialization of the row or column labels. + if self._query_compiler.lazy_execution: + continue if errors == "raise": non_existent = pandas.Index(axes[axis]).difference( getattr(self, axis) @@ -1344,13 +1357,25 @@ def explode(self, column, ignore_index: bool = False): # noqa: PR01, RT01, D200 @_inherit_docstrings(pandas.DataFrame.ewm, apilink="pandas.DataFrame.ewm") def _ewm(self, **kwargs): - return self._default_to_pandas("ewm", **kwargs) + from .ewm import ExponentialMovingWindow + + return ExponentialMovingWindow(self, kwargs) @_inherit_docstrings( pandas.DataFrame.expanding, apilink="pandas.DataFrame.expanding" ) - def _expanding(self, **kwargs): - return self._default_to_pandas("expanding", **kwargs) + def _expanding( + self, min_periods=1, center=None, axis=0, method="single", *args, **kwargs + ): + return Expanding( + self, + min_periods=min_periods, + center=center, + axis=axis, + method=method, + *args, + **kwargs, + ) def ffill( self, axis=None, inplace=False, limit=None, downcast=None @@ -1499,7 +1524,9 @@ def first(self, offset): # noqa: PR01, RT01, D200 """ Select initial periods of time series data based on a date offset. """ - return self.loc[pandas.Series(index=self.index).first(offset).index] + return self._create_or_update_from_compiler( + self._query_compiler.first(to_offset(offset)) + ) def first_valid_index(self): # noqa: RT01, D200 """ @@ -1606,8 +1633,18 @@ def isin(self, values): # noqa: PR01, RT01, D200 """ Whether elements in `BasePandasDataset` are contained in `values`. """ + # TODO: Use base._is_dataframe from upstream modin once it's available instead + # of doing this cylic import. + from .dataframe import Series + + # Query compiler needs to know whether values is series or df because column + # names matter for df but not for series. return self.__constructor__( - query_compiler=self._query_compiler.isin(values=values) + query_compiler=self._query_compiler.isin( + values=values, + values_is_series=isinstance(values, Series), + self_is_series=isinstance(self, Series), + ) ) def isna(self): # noqa: RT01, D200 @@ -1667,7 +1704,9 @@ def last(self, offset): # noqa: PR01, RT01, D200 """ Select final periods of time series data based on a date offset. """ - return self.loc[pandas.Series(index=self.index).last(offset).index] + return self._create_or_update_from_compiler( + self._query_compiler.last(to_offset(offset)) + ) def last_valid_index(self): # noqa: RT01, D200 """ @@ -1716,20 +1755,11 @@ def _mad(self, axis, skipna, level): @_inherit_docstrings(pandas.DataFrame.mask, apilink="pandas.DataFrame.mask") def _mask(self, *args, **kwargs): - return self._default_to_pandas("mask", *args, **kwargs) + return self.__constructor__(self._query_compiler.mask(*args, **kwargs)) @_inherit_docstrings(pandas.DataFrame.max, apilink="pandas.DataFrame.max") def _max(self, axis, skipna, level, numeric_only, **kwargs): self._validate_bool_kwarg(skipna, "skipna", none_allowed=False) - if level is not None: - return self._default_to_pandas( - "max", - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) axis = self._get_axis_number(axis) data = self._validate_dtypes_min_max(axis, numeric_only) return data._reduce_dimension( @@ -1781,15 +1811,6 @@ def _stat_operation( """ axis = self._get_axis_number(axis) self._validate_bool_kwarg(skipna, "skipna", none_allowed=False) - if level is not None: - return self._default_to_pandas( - op_name, - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) # If `numeric_only` is None, then we can do this precheck to whether or not # frame contains non-numeric columns, if it doesn't, then we can pass to a query compiler # `numeric_only=False` parameter and make its work easier in that case, rather than @@ -1837,15 +1858,6 @@ def _min( Return the minimum of the values over the requested axis. """ self._validate_bool_kwarg(skipna, "skipna", none_allowed=False) - if level is not None: - return self._default_to_pandas( - "min", - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) axis = self._get_axis_number(axis) data = self._validate_dtypes_min_max(axis, numeric_only) return data._reduce_dimension( @@ -1920,13 +1932,8 @@ def pct_change( """ Percentage change between the current and a prior element. """ - return self._default_to_pandas( - "pct_change", - periods=periods, - fill_method=fill_method, - limit=limit, - freq=freq, - **kwargs, + return self._query_compiler.pct_change( + periods, fill_method, limit, freq, **kwargs ) def pipe(self, func, *args, **kwargs): # noqa: PR01, RT01, D200 @@ -2073,17 +2080,6 @@ def _reindex( """ Conform `BasePandasDataset` to new index with optional filling logic. """ - if ( - kwargs.get("level") is not None - or (index is not None and self._query_compiler.has_multiindex()) - or (columns is not None and self._query_compiler.has_multiindex(axis=1)) - ): - if index is not None: - kwargs["index"] = index - if columns is not None: - kwargs["columns"] = columns - return self._default_to_pandas("reindex", copy=copy, **kwargs) - new_query_compiler = None if index is not None: if not isinstance(index, pandas.Index): @@ -2116,14 +2112,7 @@ def reindex_like( """ Return an object with matching indices as `other` object. """ - return self._default_to_pandas( - "reindex_like", - other, - method=method, - copy=copy, - limit=limit, - tolerance=tolerance, - ) + return self._query_compiler.reindex_like(other, method, copy, limit, tolerance) def rename_axis( self, mapper=None, index=None, columns=None, axis=None, copy=True, inplace=False @@ -2445,20 +2434,6 @@ def _sample( raise ValueError( "A negative number of rows requested. Please provide positive value." ) - if n == 0: - # This returns an empty object, and since it is a weird edge case that - # doesn't need to be distributed, we default to pandas for n=0. - # We don't need frac to be set to anything since n is already 0. - return self._default_to_pandas( - "sample", - n=n, - frac=None, - replace=replace, - weights=weights, - random_state=random_state, - axis=axis, - **kwargs, - ) if random_state is not None: # Get a random number generator depending on the type of # random_state that is passed in @@ -2759,7 +2734,7 @@ def to_clipboard( return self._default_to_pandas("to_clipboard", excel=excel, sep=sep, **kwargs) def to_dict(self, orient="dict", into=dict): # pragma: no cover - return self._default_to_pandas("to_dict", orient=orient, into=into) + return self._query_compiler.dataframe_to_dict(orient, into) def to_hdf( self, path_or_buf, key, format="table", **kwargs @@ -2981,6 +2956,30 @@ def tz_localize( ) return self.set_axis(labels=new_labels, axis=axis, inplace=not copy) + def interpolate( + self, + method="linear", + axis=0, + limit=None, + inplace=False, + limit_direction: Optional[str] = None, + limit_area=None, + downcast=None, + **kwargs, + ): # noqa: PR01, RT01, D200 + return self.__constructor__( + self._query_compiler.interpolate( + method, + axis, + limit, + inplace, + limit_direction, + limit_area, + downcast, + **kwargs, + ) + ) + # TODO: uncomment the following lines when #3331 issue will be closed # @prepend_to_notes( # """ @@ -3172,8 +3171,16 @@ def __getitem__(self, key): BasePandasDataset Located dataset. """ - if not self._query_compiler.lazy_execution and len(self) == 0: - return self._default_to_pandas("__getitem__", key) + if not self._query_compiler.lazy_execution: + if len(self) == 0: + return self._default_to_pandas("__getitem__", key) + # fastpath for common case + if isinstance(key, str) and key in self._query_compiler.columns: + return self._getitem(key) + elif is_list_like(key) and all( + k in self._query_compiler.columns for k in key + ): + return self._getitem(key) # see if we can slice the rows # This lets us reuse code in pandas to error check indexer = None @@ -3358,7 +3365,7 @@ def __sizeof__(self): ------- int """ - return self._default_to_pandas("__sizeof__") + return self._query_compiler.sizeof() def __str__(self): # pragma: no cover """ diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 172431d6717..07400d8e95d 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -23,7 +23,7 @@ ) from pandas.core.indexes.frozen import FrozenList from pandas.util._validators import validate_bool_kwarg -from pandas.io.formats.printing import pprint_thing +from pandas.io.formats.info import DataFrameInfo from pandas._libs.lib import no_default import re @@ -704,10 +704,8 @@ def corrwith( """ Compute pairwise correlation. """ - if isinstance(other, DataFrame): - other = other._query_compiler.to_pandas() - return self._default_to_pandas( - pandas.DataFrame.corrwith, other, axis=axis, drop=drop, method=method + return self.__constructor__( + query_compiler=self._query_compiler.corrwith(other, axis, drop, method) ) def cov(self, min_periods=None, ddof: Optional[int] = 1): # noqa: PR01, RT01, D200 @@ -979,22 +977,23 @@ def hist( """ Make a histogram of the ``DataFrame``. """ - return self._default_to_pandas( - pandas.DataFrame.hist, - column=column, - by=by, - grid=grid, - xlabelsize=xlabelsize, - xrot=xrot, - ylabelsize=ylabelsize, - yrot=yrot, - ax=ax, - sharex=sharex, - sharey=sharey, - figsize=figsize, - layout=layout, - bins=bins, - **kwds, + return self.__constructor__( + query_compiler=self._query_compiler.dataframe_hist( + column, + by, + grid, + xlabelsize, + xrot, + ylabelsize, + yrot, + ax, + sharex, + sharey, + figsize, + layout, + bins, + **kwds, + ) ) def _info( @@ -1009,133 +1008,28 @@ def _info( """ Print a concise summary of the ``DataFrame``. """ + # for ponder, set memory_Usage to false for now. + memory_usage = False - def put_str(src, output_len=None, spaces=2): - src = str(src) - return src.ljust(output_len if output_len else len(src)) + " " * spaces - - def format_size(num): - for x in ["bytes", "KB", "MB", "GB", "TB"]: - if num < 1024.0: - return f"{num:3.1f} {x}" - num /= 1024.0 - return f"{num:3.1f} PB" - - output = [] - - type_line = str(type(self)) - index_line = self.index._summary() - columns = self.columns - columns_len = len(columns) - dtypes = self.dtypes - dtypes_line = f"dtypes: {', '.join(['{}({})'.format(dtype, count) for dtype, count in dtypes.value_counts().items()])}" - - if max_cols is None: - max_cols = 100 - - exceeds_info_cols = columns_len > max_cols - - if buf is None: - buf = sys.stdout - - if null_counts is None: - null_counts = not exceeds_info_cols - - if verbose is None: - verbose = not exceeds_info_cols - - if null_counts and verbose: - # We're gonna take items from `non_null_count` in a loop, which - # works kinda slow with `Modin.Series`, that's why we call `_to_pandas()` here - # that will be faster. - non_null_count = self.count()._to_pandas() - - if memory_usage is None: - memory_usage = True - - def get_header(spaces=2): - output = [] - head_label = " # " - column_label = "Column" - null_label = "Non-Null Count" - dtype_label = "Dtype" - non_null_label = " non-null" - delimiter = "-" - - lengths = {} - lengths["head"] = max(len(head_label), len(pprint_thing(len(columns)))) - lengths["column"] = max( - len(column_label), max(len(pprint_thing(col)) for col in columns) - ) - lengths["dtype"] = len(dtype_label) - dtype_spaces = ( - max(lengths["dtype"], max(len(pprint_thing(dtype)) for dtype in dtypes)) - - lengths["dtype"] - ) - - header = put_str(head_label, lengths["head"]) + put_str( - column_label, lengths["column"] - ) - if null_counts: - lengths["null"] = max( - len(null_label), - max(len(pprint_thing(x)) for x in non_null_count) - + len(non_null_label), - ) - header += put_str(null_label, lengths["null"]) - header += put_str(dtype_label, lengths["dtype"], spaces=dtype_spaces) - - output.append(header) - - delimiters = put_str(delimiter * lengths["head"]) + put_str( - delimiter * lengths["column"] + if null_counts is not None: + if show_counts is not None: + raise ValueError("null_counts used with show_counts. Use show_counts.") + warnings.warn( + "null_counts is deprecated. Use show_counts instead", + FutureWarning, + stacklevel=2, ) - if null_counts: - delimiters += put_str(delimiter * lengths["null"]) - delimiters += put_str(delimiter * lengths["dtype"], spaces=dtype_spaces) - output.append(delimiters) - - return output, lengths - - output.extend([type_line, index_line]) - - def verbose_repr(output): - columns_line = f"Data columns (total {len(columns)} columns):" - header, lengths = get_header() - output.extend([columns_line, *header]) - for i, col in enumerate(columns): - i, col, dtype = map(pprint_thing, [i, col, dtypes[col]]) - - to_append = put_str(" {}".format(i), lengths["head"]) + put_str( - col, lengths["column"] - ) - if null_counts: - non_null = pprint_thing(non_null_count[col]) - to_append += put_str( - "{} non-null".format(non_null), lengths["null"] - ) - to_append += put_str(dtype, lengths["dtype"], spaces=0) - output.append(to_append) - - def non_verbose_repr(output): - output.append(columns._summary(name="Columns")) - - if verbose: - verbose_repr(output) - else: - non_verbose_repr(output) - - output.append(dtypes_line) - - if memory_usage: - deep = memory_usage == "deep" - mem_usage_bytes = self.memory_usage(index=True, deep=deep).sum() - mem_line = f"memory usage: {format_size(mem_usage_bytes)}" - - output.append(mem_line) - - output.append("") - buf.write("\n".join(output)) + show_counts = null_counts + info = DataFrameInfo( + data=self, + memory_usage=memory_usage, + ) + info.render( + buf=buf, + max_cols=max_cols, + verbose=verbose, + show_counts=show_counts, + ) def insert(self, loc, column, value, allow_duplicates=False): # noqa: PR01, D200 """ @@ -1195,32 +1089,6 @@ def insert(self, loc, column, value, allow_duplicates=False): # noqa: PR01, D20 self._update_inplace(new_query_compiler=new_query_compiler) - def interpolate( - self, - method="linear", - axis=0, - limit=None, - inplace=False, - limit_direction: Optional[str] = None, - limit_area=None, - downcast=None, - **kwargs, - ): # noqa: PR01, RT01, D200 - """ - Fill NaN values using an interpolation method. - """ - return self._default_to_pandas( - pandas.DataFrame.interpolate, - method=method, - axis=axis, - limit=limit, - inplace=inplace, - limit_direction=limit_direction, - limit_area=limit_area, - downcast=downcast, - **kwargs, - ) - def iterrows(self): # noqa: D200 """ Iterate over ``DataFrame`` rows as (index, ``Series``) pairs. @@ -1331,7 +1199,7 @@ def lookup(self, row_labels, col_labels): # noqa: PR01, RT01, D200 """ Label-based "fancy indexing" function for ``DataFrame``. """ - return self._default_to_pandas(pandas.DataFrame.lookup, row_labels, col_labels) + return self.__constructor__(self._query_compiler.lookup(row_labels, col_labels)) def lt(self, other, axis="columns", level=None): # noqa: PR01, RT01, D200 """ @@ -1377,12 +1245,6 @@ def memory_usage(self, index=True, deep=False): # noqa: PR01, RT01, D200 """ Return the memory usage of each column in bytes. """ - if index: - result = self._reduce_dimension( - self._query_compiler.memory_usage(index=False, deep=deep) - ) - index_value = self.index.memory_usage(deep=deep) - return Series(index_value, index=["Index"]).append(result) return super(DataFrame, self).memory_usage(index=index, deep=deep) def merge( @@ -2555,7 +2417,7 @@ def __round__(self, decimals=0): ------- DataFrame """ - return self._default_to_pandas(pandas.DataFrame.__round__, decimals=decimals) + return self.round(decimals) def __delitem__(self, key): """ diff --git a/modin/pandas/ewm.py b/modin/pandas/ewm.py new file mode 100644 index 00000000000..3b416aee9aa --- /dev/null +++ b/modin/pandas/ewm.py @@ -0,0 +1,68 @@ +import pandas + +from modin.utils import _inherit_docstrings + + +@_inherit_docstrings(pandas.core.window.rolling.Window) +class ExponentialMovingWindow: + def _init(self, dataframe, ewm_kwargs): + self._dataframe = dataframe + self._query_compiler = dataframe._query_compiler + self.ewm_kwargs = ewm_kwargs + + def mean(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.ewm_mean( + self.ewm_kwargs, *args, **kwargs + ) + ) + + def sum(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.ewm_sum( + self.ewm_kwargs, *args, **kwargs + ) + ) + + def std(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.ewm_std( + self.ewm_kwargs, *args, **kwargs + ) + ) + + def var(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.ewm_var( + self.ewm_kwargs, *args, **kwargs + ) + ) + + # cor and cov get special treatment because they can take another dataframe or + # series, from which we have to extract the query compiler. + + def cov(self, other=None, *args, **kwargs): + from .dataframe import DataFrame + from .series import Series + + if isinstance(other, (DataFrame, Series)): + other = other._query_compiler + + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.ewm_cov( + self.ewm_kwargs, other, *args, **kwargs + ) + ) + + def corr(self, *args, **kwargs): + from .dataframe import DataFrame + from .series import Series + + if isinstance(other, (DataFrame, Series)): + other = other._query_compiler + + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.ewm_corr( + self.ewm_kwargs, other, *args, **kwargs + ) + ) diff --git a/modin/pandas/general.py b/modin/pandas/general.py index 8cbc0bb5e41..23c9a5294d9 100644 --- a/modin/pandas/general.py +++ b/modin/pandas/general.py @@ -127,12 +127,8 @@ def merge_ordered( raise ValueError( "can not merge DataFrame with instance of type {}".format(type(right)) ) - ErrorMessage.default_to_pandas("`merge_ordered`") - if isinstance(right, DataFrame): - right = to_pandas(right) return DataFrame( - pandas.merge_ordered( - to_pandas(left), + query_compiler=left._query_compiler.merge_ordered( right, on=on, left_on=left_on, @@ -171,6 +167,23 @@ def merge_asof( raise ValueError( "can not merge DataFrame with instance of type {}".format(type(right)) ) + return DataFrame( + query_compiler=left._query_compiler.merge_asof( + right._query_compiler, + on=on, + left_on=left_on, + right_on=right_on, + left_index=left_index, + right_index=right_index, + by=by, + left_by=left_by, + right_by=right_by, + suffixes=suffixes, + tolerance=tolerance, + allow_exact_matches=allow_exact_matches, + direction=direction, + ) + ) ErrorMessage.default_to_pandas("`merge_asof`") # As of Pandas 1.2 these should raise an error; before that it did @@ -724,9 +737,14 @@ def wide_to_long( raise ValueError( "can not wide_to_long with instance of type {}".format(type(df)) ) - ErrorMessage.default_to_pandas("`wide_to_long`") return DataFrame( - pandas.wide_to_long(to_pandas(df), stubnames, i, j, sep=sep, suffix=suffix) + query_compiler=df._query_compiler.wide_to_long( + stubnames, + i, + j, + sep, + suffix, + ) ) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index c50a9bfaa54..80ba17f1cff 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -16,7 +16,7 @@ import numpy as np import pandas import pandas.core.groupby -from pandas.core.dtypes.common import is_list_like, is_numeric_dtype +from pandas.core.dtypes.common import is_list_like, is_numeric_dtype, is_integer from pandas._libs.lib import no_default import pandas.core.common as com from types import BuiltinFunctionType @@ -131,7 +131,11 @@ def ffill(self, limit=None): return self._default_to_pandas(lambda df: df.ffill(limit=limit)) def sem(self, ddof=1): - return self._default_to_pandas(lambda df: df.sem(ddof=ddof)) + return self._wrap_aggregation( + type(self._query_compiler).groupby_sem, + agg_kwargs=dict(ddof=ddof), + numeric_only=True, + ) def mean(self, numeric_only=None): return self._check_index( @@ -277,7 +281,28 @@ def _shift(data, periods, freq, axis, fill_value, is_set_nan_rows=True): return result def nth(self, n, dropna=None): - return self._default_to_pandas(lambda df: df.nth(n, dropna=dropna)) + # TODO: what we really should do is create a GroupByNthSelector to mimic + # pandas behavior and then implement some of these methods there. + # Adapted error checking from pandas + if dropna: + if not is_integer(n): + raise ValueError("dropna option only supported for an integer argument") + + if dropna not in ["any", "all"]: + # Note: when agg-ing picker doesn't raise this, just returns NaN + raise ValueError( + "For a DataFrame or Series groupby.nth, dropna must be " + "either None, 'any' or 'all', " + f"(was passed {dropna})." + ) + + return self._check_index( + self._wrap_aggregation( + type(self._query_compiler).groupby_nth, + numeric_only=False, + agg_kwargs=dict(n=n, dropna=dropna), + ) + ) def cumsum(self, axis=0, *args, **kwargs): return self._check_index_name( @@ -346,7 +371,11 @@ def dtypes(self): ) def first(self, **kwargs): - return self._default_to_pandas(lambda df: df.first(**kwargs)) + return self._wrap_aggregation( + type(self._query_compiler).groupby_first, + agg_kwargs=dict(**kwargs), + numeric_only=False, + ) def backfill(self, limit=None): return self.bfill(limit) @@ -545,10 +574,13 @@ def try_get_str_func(fn): kwargs = {} func = func_dict elif is_list_like(func): - return self._default_to_pandas( - lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs), - *args, - **kwargs, + return self._wrap_aggregation( + qc_method=type(self._query_compiler).groupby_agg, + numeric_only=False, + agg_func=func, + agg_args=args, + agg_kwargs=kwargs, + how="axis_wise", ) elif callable(func): return self._check_index( @@ -599,7 +631,11 @@ def try_get_str_func(fn): agg = aggregate def last(self, **kwargs): - return self._default_to_pandas(lambda df: df.last(**kwargs)) + return self._wrap_aggregation( + type(self._query_compiler).groupby_last, + agg_kwargs=dict(**kwargs), + numeric_only=False, + ) def mad(self, **kwargs): return self._default_to_pandas(lambda df: df.mad(**kwargs)) @@ -726,7 +762,17 @@ def boxplot( ) def ngroup(self, ascending=True): - return self._default_to_pandas(lambda df: df.ngroup(ascending)) + result = self._wrap_aggregation( + type(self._query_compiler).groupby_ngroup, + numeric_only=False, + agg_kwargs=dict(ascending=ascending), + ) + if not isinstance(result, Series): + # The result should always be a Series with name None and type int64 + result = result.squeeze(axis=1) + # TODO: this might not hold in the future + result.name = None + return result def nunique(self, dropna=True): return self._check_index( @@ -749,7 +795,13 @@ def median(self, numeric_only=None): ) def head(self, n=5): - return self._default_to_pandas(lambda df: df.head(n)) + return self._check_index( + self._wrap_aggregation( + type(self._query_compiler).groupby_head, + agg_kwargs=dict(n=n), + numeric_only=False, + ) + ) def cumprod(self, axis=0, *args, **kwargs): return self._check_index_name( @@ -794,7 +846,7 @@ def fillna(self, *args, **kwargs): squeeze=self._squeeze, **new_groupby_kwargs, ) - return work_object._check_index_name( + return work_object._check_index( work_object._wrap_aggregation( type(self._query_compiler).groupby_fillna, numeric_only=False, @@ -817,13 +869,25 @@ def pipe(self, func, *args, **kwargs): return com.pipe(self, func, *args, **kwargs) def cumcount(self, ascending=True): - result = self._default_to_pandas(lambda df: df.cumcount(ascending=ascending)) - # pandas does not name the index on cumcount - result._query_compiler.set_index_name(None) + result = self._wrap_aggregation( + type(self._query_compiler).groupby_cumcount, + numeric_only=False, + agg_kwargs=dict(ascending=ascending), + ) + if not isinstance(result, Series): + # The result should always be a Series with name None and type int64 + result = result.squeeze(axis=1) + result.name = None return result def tail(self, n=5): - return self._default_to_pandas(lambda df: df.tail(n)) + return self._check_index( + self._wrap_aggregation( + type(self._query_compiler).groupby_tail, + agg_kwargs=dict(n=n), + numeric_only=False, + ) + ) # expanding and rolling are unique cases and need to likely be handled # separately. They do not appear to be commonly used. @@ -837,6 +901,8 @@ def hist(self): return self._default_to_pandas(lambda df: df.hist()) def quantile(self, q=0.5, interpolation="linear"): + # TODO: pandas 1.5 now supports numeric_only as an argument + # TODO: handle list-like cases properly if is_list_like(q): return self._default_to_pandas( lambda df: df.quantile(q=q, interpolation=interpolation) @@ -845,7 +911,7 @@ def quantile(self, q=0.5, interpolation="linear"): return self._check_index( self._wrap_aggregation( type(self._query_compiler).groupby_quantile, - numeric_only=False, + numeric_only=True, agg_kwargs=dict(q=q, interpolation=interpolation), ) ) @@ -1241,6 +1307,31 @@ def _iter(self): for k in (sorted(group_ids) if self._sort else group_ids) ) + def unique(self): + return self._check_index( + self._wrap_aggregation( + type(self._query_compiler).groupby_unique, + numeric_only=False, + ) + ) + + def nlargest(self, n=5, keep="first"): + return self._check_index( + self._wrap_aggregation( + type(self._query_compiler).groupby_nlargest, + agg_kwargs=dict(n=n, keep=keep), + numeric_only=True, + ) + ) + + def nsmallest(self, n=5, keep="first"): + return self._check_index( + self._wrap_aggregation( + type(self._query_compiler).groupby_nsmallest, + agg_kwargs=dict(n=n, keep=keep), + numeric_only=True, + ) + ) if IsExperimental.get(): from modin.experimental.cloud.meta_magic import make_wrapped_class diff --git a/modin/pandas/indexing.py b/modin/pandas/indexing.py index b3a9d8a7caf..8b291cb969b 100644 --- a/modin/pandas/indexing.py +++ b/modin/pandas/indexing.py @@ -35,6 +35,7 @@ from pandas.api.types import is_list_like, is_bool from pandas.core.dtypes.common import is_integer, is_bool_dtype, is_integer_dtype from pandas.core.indexing import IndexingError +from modin.core.execution.client.query_compiler import ClientQueryCompiler from modin.error_message import ErrorMessage from modin.logging import ClassLogger @@ -677,7 +678,14 @@ def __getitem__(self, key): if isinstance(row_loc, Series) and is_boolean_array(row_loc): return self._handle_boolean_masking(row_loc, col_loc) - + if isinstance(self.qc, ClientQueryCompiler) and self.qc.lazy_execution: + # Since we don't know if the row labels are present or not in lazy evaluation, + # immediately hand off computation to the engine + return type(self.df)( + query_compiler=self.qc.getitem_row_labels_array( + row_loc + ).getitem_column_array(col_loc) + ) row_lookup, col_lookup = self._compute_lookup(row_loc, col_loc) result = self._getitem_positional( diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 17d032d28db..feab9334563 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -162,12 +162,6 @@ def read_sql( Engine.subscribe(_update_engine) from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher - if kwargs.get("chunksize") is not None: - ErrorMessage.default_to_pandas("Parameters provided [chunksize]") - df_gen = pandas.read_sql(**kwargs) - return ( - DataFrame(query_compiler=FactoryDispatcher.from_pandas(df)) for df in df_gen - ) return DataFrame(query_compiler=FactoryDispatcher.read_sql(**kwargs)) diff --git a/modin/pandas/resample.py b/modin/pandas/resample.py index fc1a67e3304..30a0f58a651 100644 --- a/modin/pandas/resample.py +++ b/modin/pandas/resample.py @@ -383,18 +383,18 @@ def ohlc(self, _method="ohlc", *args, **kwargs): ) ) - def prod(self, _method="prod", min_count=0, *args, **kwargs): - if self.resample_kwargs["axis"] == 0: - result = self.__groups.prod(min_count=min_count, *args, **kwargs) - else: - result = self.__groups.prod(min_count=min_count, *args, **kwargs).T - return result - - def size(self): - from .series import Series + def prod(self, _method="prod", *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.resample_prod( + self.resample_kwargs, *args, **kwargs + ) + ) - return Series( - query_compiler=self._query_compiler.resample_size(self.resample_kwargs) + def size(self, _method="size"): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.resample_size( + self.resample_kwargs, None, None + ) ) def sem(self, _method="sem", *args, **kwargs): @@ -414,12 +414,12 @@ def std(self, ddof=1, *args, **kwargs): ) ) - def sum(self, _method="sum", min_count=0, *args, **kwargs): - if self.resample_kwargs["axis"] == 0: - result = self.__groups.sum(min_count=min_count, *args, **kwargs) - else: - result = self.__groups.sum(min_count=min_count, *args, **kwargs).T - return result + def sum(self, _method="sum", *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.resample_sum( + self.resample_kwargs, *args, **kwargs + ) + ) def var(self, ddof=1, *args, **kwargs): return self._dataframe.__constructor__( diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 1e5bf4ed275..bf51b7fe57b 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -176,15 +176,11 @@ def __radd__(self, left): @_doc_binary_op(operation="union", bin_op="and", right="other") def __and__(self, other): - if isinstance(other, (list, np.ndarray, pandas.Series)): - return self._default_to_pandas(pandas.Series.__and__, other) new_self, new_other = self._prepare_inter_op(other) return super(Series, new_self).__and__(new_other) @_doc_binary_op(operation="union", bin_op="and", right="other") def __rand__(self, other): - if isinstance(other, (list, np.ndarray, pandas.Series)): - return self._default_to_pandas(pandas.Series.__rand__, other) new_self, new_other = self._prepare_inter_op(other) return super(Series, new_self).__rand__(new_other) @@ -310,7 +306,9 @@ def __getattr__(self, key): try: return object.__getattribute__(self, key) except AttributeError as err: - if key not in _ATTRS_NO_LOOKUP and key in self.index: + if not self._query_compiler.lazy_execution and ( + key not in _ATTRS_NO_LOOKUP and key in self.index + ): return self[key] raise err @@ -352,29 +350,21 @@ def __rmul__(self, left): @_doc_binary_op(operation="disjunction", bin_op="or", right="other") def __or__(self, other): - if isinstance(other, (list, np.ndarray, pandas.Series)): - return self._default_to_pandas(pandas.Series.__or__, other) new_self, new_other = self._prepare_inter_op(other) return super(Series, new_self).__or__(new_other) @_doc_binary_op(operation="disjunction", bin_op="or", right="other") def __ror__(self, other): - if isinstance(other, (list, np.ndarray, pandas.Series)): - return self._default_to_pandas(pandas.Series.__ror__, other) new_self, new_other = self._prepare_inter_op(other) return super(Series, new_self).__ror__(new_other) @_doc_binary_op(operation="exclusive or", bin_op="xor", right="other") def __xor__(self, other): - if isinstance(other, (list, np.ndarray, pandas.Series)): - return self._default_to_pandas(pandas.Series.__xor__, other) new_self, new_other = self._prepare_inter_op(other) return super(Series, new_self).__xor__(new_other) @_doc_binary_op(operation="exclusive or", bin_op="xor", right="other") def __rxor__(self, other): - if isinstance(other, (list, np.ndarray, pandas.Series)): - return self._default_to_pandas(pandas.Series.__rxor__, other) new_self, new_other = self._prepare_inter_op(other) return super(Series, new_self).__rxor__(new_other) @@ -717,8 +707,8 @@ def argsort(self, axis=0, kind="quicksort", order=None): # noqa: PR01, RT01, D2 """ Return the integer indices that would sort the Series values. """ - return self._default_to_pandas( - pandas.Series.argsort, axis=axis, kind=kind, order=order + return self.__constructor__( + query_compiler=self._query_compiler.argsort(axis, kind, order) ) def autocorr(self, lag=1): # noqa: PR01, RT01, D200 @@ -731,8 +721,8 @@ def _between(self, left, right, inclusive): # noqa: PR01, RT01, D200 """ Return boolean Series equivalent to left <= series <= right. """ - return self._default_to_pandas( - pandas.Series.between, left, right, inclusive=inclusive + return self.__constructor__( + query_compiler=self._query_compiler.between(left, right, inclusive) ) def combine(self, other, func, fill_value=None): # noqa: PR01, RT01, D200 @@ -816,12 +806,7 @@ def corr(self, other, method="pearson", min_periods=None): # noqa: PR01, RT01, return result[0] return self.__constructor__( - query_compiler=self._query_compiler.default_to_pandas( - pandas.Series.corr, - other._query_compiler, - method=method, - min_periods=min_periods, - ) + query_compiler=self._query_compiler.series_corr(other, method, min_periods) ) def count(self, level=None): # noqa: PR01, RT01, D200 @@ -890,8 +875,8 @@ def divmod( """ Return Integer division and modulo of series and `other`, element-wise (binary operator `divmod`). """ - return self._default_to_pandas( - pandas.Series.divmod, other, level=level, fill_value=fill_value, axis=axis + return self.__constructor__( + self._query_compiler.divmod(other, level, fill_value, axis) ) def dot(self, other): # noqa: PR01, RT01, D200 @@ -980,8 +965,8 @@ def factorize(self, sort=False, na_sentinel=-1): # noqa: PR01, RT01, D200 """ Encode the object as an enumerated type or categorical variable. """ - return self._default_to_pandas( - pandas.Series.factorize, sort=sort, na_sentinel=na_sentinel + return self.__constructor__( + query_compiler=self._query_compiler.factorize(sort, na_sentinel) ) def fillna( @@ -1106,18 +1091,10 @@ def hist( """ Draw histogram of the input series using matplotlib. """ - return self._default_to_pandas( - pandas.Series.hist, - by=by, - ax=ax, - grid=grid, - xlabelsize=xlabelsize, - xrot=xrot, - ylabelsize=ylabelsize, - yrot=yrot, - figsize=figsize, - bins=bins, - **kwds, + return self.__constructor__( + query_compiler=self._query_compiler.series_hist( + by, ax, grid, xlabelsize, xrot, ylabelsize, yrot, figsize, bins, **kwds + ) ) def idxmax(self, axis=0, skipna=True, *args, **kwargs): # noqa: PR01, RT01, D200 @@ -1136,32 +1113,6 @@ def idxmin(self, axis=0, skipna=True, *args, **kwargs): # noqa: PR01, RT01, D20 skipna = True return super(Series, self).idxmin(axis=axis, skipna=skipna, *args, **kwargs) - def interpolate( - self, - method="linear", - axis=0, - limit=None, - inplace=False, - limit_direction: Optional[str] = None, - limit_area=None, - downcast=None, - **kwargs, - ): # noqa: PR01, RT01, D200 - """ - Fill NaN values using an interpolation method. - """ - return self._default_to_pandas( - pandas.Series.interpolate, - method=method, - axis=axis, - limit=limit, - inplace=inplace, - limit_direction=limit_direction, - limit_area=limit_area, - downcast=downcast, - **kwargs, - ) - def item(self): # noqa: RT01, D200 """ Return the first element of the underlying data as a Python scalar. @@ -1242,39 +1193,11 @@ def arg(s): ) ) - @_inherit_docstrings(pandas.Series.mask, apilink="pandas.Series.mask") - def _mask( - self, - cond, - other, - inplace, - axis, - level, - errors, - try_cast, - ): - return self._default_to_pandas( - pandas.Series.mask, - cond, - other=other, - inplace=inplace, - axis=axis, - level=level, - errors=errors, - try_cast=try_cast, - ) - def memory_usage(self, index=True, deep=False): # noqa: PR01, RT01, D200 """ Return the memory usage of the Series. """ - if index: - result = self._reduce_dimension( - self._query_compiler.memory_usage(index=False, deep=deep) - ) - index_value = self.index.memory_usage(deep=deep) - return result + index_value - return super(Series, self).memory_usage(index=index, deep=deep) + return super(Series, self).memory_usage(index=index, deep=deep).sum() def mod(self, other, level=None, fill_value=None, axis=0): # noqa: PR01, RT01, D200 """ @@ -1313,13 +1236,29 @@ def nlargest(self, n=5, keep="first"): # noqa: PR01, RT01, D200 """ Return the largest `n` elements. """ - return self._default_to_pandas(pandas.Series.nlargest, n=n, keep=keep) + if len(self._query_compiler.columns) == 0: + raise NotImplementedError( + "Series.nlargest is not implemented for empty Series." + ) + return Series( + query_compiler=self._query_compiler.nlargest( + n=n, columns=self.name, keep=keep + ) + ) def nsmallest(self, n=5, keep="first"): # noqa: PR01, RT01, D200 """ Return the smallest `n` elements. """ - return Series(query_compiler=self._query_compiler.nsmallest(n=n, keep=keep)) + if len(self._query_compiler.columns) == 0: + raise NotImplementedError( + "Series.nsmallest is not implemented for empty Series." + ) + return Series( + query_compiler=self._query_compiler.nsmallest( + n=n, columns=self.name, keep=keep + ) + ) def slice_shift(self, periods=1, axis=0): # noqa: PR01, RT01, D200 """ @@ -1581,8 +1520,8 @@ def rdivmod( """ Return integer division and modulo of series and `other`, element-wise (binary operator `rdivmod`). """ - return self._default_to_pandas( - pandas.Series.rdivmod, other, level=level, fill_value=fill_value, axis=axis + return self.__constructor__( + self._query_compiler.rdivmod(other, level, fill_value, axis) ) def rfloordiv( @@ -1684,14 +1623,6 @@ def searchsorted(self, value, side="left", sorter=None): # noqa: PR01, RT01, D2 Find indices where elements should be inserted to maintain order. """ searchsorted_qc = self._query_compiler - if sorter is not None: - # `iloc` method works slowly (https://github.com/modin-project/modin/issues/1903), - # so _default_to_pandas is used for now - # searchsorted_qc = self.iloc[sorter].reset_index(drop=True)._query_compiler - # sorter = None - return self._default_to_pandas( - pandas.Series.searchsorted, value, side=side, sorter=sorter - ) # searchsorted should return item number irrespective of Series index, so # Series.index is always set to pandas.RangeIndex, which can be easily processed # on the query_compiler level @@ -1833,7 +1764,7 @@ def swaplevel(self, i=-2, j=-1, copy=True): # noqa: PR01, RT01, D200 """ Swap levels `i` and `j` in a `MultiIndex`. """ - return self._default_to_pandas("swaplevel", i=i, j=j, copy=copy) + return self.__constructor__(self.__query_compiler__.swaplevel(i, j, copy)) def take(self, indices, axis=0, is_copy=None, **kwargs): # noqa: PR01, RT01, D200 """ @@ -1845,7 +1776,7 @@ def to_dict(self, into=dict): # pragma: no cover # noqa: PR01, RT01, D200 """ Convert Series to {label -> value} dict or dict-like object. """ - return self._default_to_pandas("to_dict", into=into) + return self.__query_compiler__.series_to_dict(into) def _to_frame(self, name: "Hashable") -> "DataFrame": # noqa: PR01, RT01, D200 """ @@ -1866,7 +1797,7 @@ def to_list(self): # noqa: RT01, D200 """ Return a list of the values. """ - return self._default_to_pandas(pandas.Series.to_list) + return self.__query_compiler__.to_list() def to_numpy( self, dtype=None, copy=False, na_value=no_default, **kwargs @@ -1958,8 +1889,8 @@ def truncate( """ Truncate a Series before and after some index value. """ - return self._default_to_pandas( - pandas.Series.truncate, before=before, after=after, axis=axis, copy=copy + return self.__constructor__( + self.__query_compiler__.truncate(before, after, axis, copy) ) def unique(self): # noqa: RT01, D200 @@ -2029,8 +1960,11 @@ def _where( """ Replace values where the condition is False. """ + # TODO: probably need to remove this conversion to pandas if isinstance(other, Series): other = to_pandas(other) + # TODO: add error checking like for dataframe where, then forward to + # same query compiler method return self._default_to_pandas( pandas.Series.where, cond, @@ -2396,12 +2330,15 @@ def _prepare_inter_op(self, other): Prepared `other`. """ if isinstance(other, Series): - # NB: deep=False is important for performance bc it retains obj.index._id - new_self = self.copy(deep=False) - new_other = other.copy(deep=False) - if self.name == other.name: - new_self.name = new_other.name = self.name - else: + names_different = self.name != other.name + # NB: if we don't need a rename, do the interaction with shallow + # copies so that we preserve obj.index._id. It's fine to work + # with shallow copies because we'll discard the copies but keep + # the result after the interaction operation. We can't do a rename + # on shallow copies because we'll mutate the original objects. + new_self = self.copy(deep=names_different) + new_other = other.copy(deep=names_different) + if names_different: new_self.name = new_other.name = MODIN_UNNAMED_SERIES_LABEL else: new_self = self diff --git a/modin/pandas/series_utils.py b/modin/pandas/series_utils.py index ae01e884835..827f1f8933e 100644 --- a/modin/pandas/series_utils.py +++ b/modin/pandas/series_utils.py @@ -136,27 +136,35 @@ def __init__(self, series): self._query_compiler = series._query_compiler def casefold(self): - return self._default_to_pandas(pandas.Series.str.casefold) + return Series(query_compiler=self._query_compiler.str_casefold()) def cat(self, others=None, sep=None, na_rep=None, join=None): if isinstance(others, Series): others = others._to_pandas() - return self._default_to_pandas( - pandas.Series.str.cat, others=others, sep=sep, na_rep=na_rep, join=join + data = Series(query_compiler=self._query_compiler) + return data._reduce_dimension( + self._query_compiler.str_cat( + others=others, + sep=sep, + na_rep=na_rep, + join=join + ) ) def decode(self, encoding, errors="strict"): - return self._default_to_pandas( - pandas.Series.str.decode, encoding, errors=errors - ) + return Series(query_compiler=self._query_compiler.str_decode(encoding, errors)) def split(self, pat=None, n=-1, expand=False): + from .dataframe import DataFrame + if not pat and pat is not None: raise ValueError("split() requires a non-empty pattern match.") if expand: - return self._default_to_pandas( - pandas.Series.str.split, pat=pat, n=n, expand=expand + return DataFrame( + query_compiler=self._query_compiler.str_split( + pat=pat, n=n, expand=expand + ) ) else: return Series( @@ -166,12 +174,16 @@ def split(self, pat=None, n=-1, expand=False): ) def rsplit(self, pat=None, n=-1, expand=False): + from .dataframe import DataFrame + if not pat and pat is not None: raise ValueError("rsplit() requires a non-empty pattern match.") if expand: - return self._default_to_pandas( - pandas.Series.str.rsplit, pat=pat, n=n, expand=expand + return DataFrame( + query_compiler=self._query_compiler.str_rsplit( + pat=pat, n=n, expand=expand + ) ) else: return Series( @@ -189,7 +201,7 @@ def join(self, sep): return Series(query_compiler=self._query_compiler.str_join(sep)) def get_dummies(self, sep="|"): - return self._default_to_pandas(pandas.Series.str.get_dummies, sep=sep) + return Series(query_compiler=self._query_compiler.str_get_dummies(sep)) def contains(self, pat, case=True, flags=0, na=np.NaN, regex=True): if pat is None and not case: @@ -274,9 +286,7 @@ def startswith(self, pat, na=np.NaN): return Series(query_compiler=self._query_compiler.str_startswith(pat, na=na)) def encode(self, encoding, errors="strict"): - return self._default_to_pandas( - pandas.Series.str.encode, encoding, errors=errors - ) + return Series(query_compiler=self._query_compiler.str_encode(encoding, errors)) def endswith(self, pat, na=np.NaN): return Series(query_compiler=self._query_compiler.str_endswith(pat, na=na)) @@ -288,6 +298,13 @@ def findall(self, pat, flags=0, **kwargs): query_compiler=self._query_compiler.str_findall(pat, flags=flags, **kwargs) ) + def fullmatch(self, pat, case=True, flags=0, na=None): + if not isinstance(pat, (str, _pattern_type)): + raise TypeError("first argument must be string or compiled pattern") + return Series( + query_compiler=self._query_compiler.str_fullmatch(pat, flags=flags, na=na) + ) + def match(self, pat, case=True, flags=0, na=np.NaN): if not isinstance(pat, (str, _pattern_type)): raise TypeError("first argument must be string or compiled pattern") @@ -296,12 +313,19 @@ def match(self, pat, case=True, flags=0, na=np.NaN): ) def extract(self, pat, flags=0, expand=True): - return self._default_to_pandas( - pandas.Series.str.extract, pat, flags=flags, expand=expand + import re + n = re.compile(pat).groups + if expand or n > 1: + from .dataframe import DataFrame + return DataFrame( + query_compiler=self._query_compiler.str_extract(pat, flags, expand) + ) + return Series( + query_compiler=self._query_compiler.str_extract(pat, flags, expand) ) def extractall(self, pat, flags=0): - return self._default_to_pandas(pandas.Series.str.extractall, pat, flags=flags) + return Series(query_compiler=self._query_compiler.str_extractall(pat, flags)) def len(self): return Series(query_compiler=self._query_compiler.str_len()) @@ -320,8 +344,11 @@ def partition(self, sep=" ", expand=True): raise ValueError("empty separator") if expand: - return self._default_to_pandas( - pandas.Series.str.partition, sep=sep, expand=expand + from .dataframe import DataFrame + return DataFrame( + query_compiler=self._query_compiler.str_partition( + sep=sep, expand=expand + ) ) else: return Series( @@ -330,16 +357,25 @@ def partition(self, sep=" ", expand=True): ) ) + def removeprefix(self, prefix): + return Series(query_compiler=self._query_compiler.str_removeprefix(prefix)) + + def removesuffix(self, suffix): + return Series(query_compiler=self._query_compiler.str_removesuffix(suffix)) + def repeat(self, repeats): - return self._default_to_pandas(pandas.Series.str.repeat, repeats) + return Series(query_compiler=self._query_compiler.str_repeat(repeats)) def rpartition(self, sep=" ", expand=True): if sep is not None and len(sep) == 0: raise ValueError("empty separator") if expand: - return self._default_to_pandas( - pandas.Series.str.rpartition, sep=sep, expand=expand + from .dataframe import DataFrame + return DataFrame( + query_compiler=self._query_compiler.str_rpartition( + sep=sep, expand=expand + ) ) else: return Series( @@ -516,6 +552,10 @@ def week(self): def weekofyear(self): return Series(query_compiler=self._query_compiler.dt_weekofyear()) + @property + def day_of_week(self): + return Series(query_compiler=self._query_compiler.dt_day_of_week()) + @property def dayofweek(self): return Series(query_compiler=self._query_compiler.dt_dayofweek()) @@ -524,6 +564,10 @@ def dayofweek(self): def weekday(self): return Series(query_compiler=self._query_compiler.dt_weekday()) + @property + def day_of_year(self): + return Series(query_compiler=self._query_compiler.dt_day_of_year()) + @property def dayofyear(self): return Series(query_compiler=self._query_compiler.dt_dayofyear()) diff --git a/modin/pandas/window.py b/modin/pandas/window.py index fa7922df5c8..6189d334160 100644 --- a/modin/pandas/window.py +++ b/modin/pandas/window.py @@ -18,7 +18,7 @@ from pandas.core.dtypes.common import is_list_like from modin.utils import _inherit_docstrings -from modin._compat.pandas_api.classes import WindowCompat, RollingCompat +from modin._compat.pandas_api.classes import WindowCompat, RollingCompat, ExpandingCompat @_inherit_docstrings(pandas.core.window.rolling.Window) @@ -76,6 +76,13 @@ def count(self): ) ) + def sem(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.rolling_sem( + self.axis, self.rolling_args, *args, **kwargs + ) + ) + def sum(self, *args, **kwargs): return self._dataframe.__constructor__( query_compiler=self._query_compiler.rolling_sum( @@ -129,10 +136,8 @@ def corr(self, other=None, pairwise=None, *args, **kwargs): from .dataframe import DataFrame from .series import Series - if isinstance(other, DataFrame): - other = other._query_compiler.to_pandas() - elif isinstance(other, Series): - other = other._query_compiler.to_pandas().squeeze() + if isinstance(other, (DataFrame, Series)): + other = other._query_compiler return self._dataframe.__constructor__( query_compiler=self._query_compiler.rolling_corr( @@ -144,10 +149,8 @@ def cov(self, other=None, pairwise=None, ddof: Optional[int] = 1, **kwargs): from .dataframe import DataFrame from .series import Series - if isinstance(other, DataFrame): - other = other._query_compiler.to_pandas() - elif isinstance(other, Series): - other = other._query_compiler.to_pandas().squeeze() + if isinstance(other, (DataFrame, Series)): + other = other._query_compiler return self._dataframe.__constructor__( query_compiler=self._query_compiler.rolling_cov( @@ -224,3 +227,76 @@ def quantile(self, quantile, interpolation="linear", **kwargs): self.axis, self.rolling_args, quantile, interpolation, **kwargs ) ) + +@_inherit_docstrings( + pandas.core.window.expanding.Expanding, + excluded=[pandas.core.window.expanding.Expanding.__init__], +) +class Expanding(ExpandingCompat): + def _init(self, dataframe, expanding_args, axis): + self._dataframe = dataframe + self._query_compiler = dataframe._query_compiler + self.expanding_args = expanding_args + self.axis = axis + + def aggregate(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_aggregate( + self.axis, self.expanding_args, *args, **kwargs) + ) + def sum(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_sum( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def min(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_min( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def max(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_max( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def mean(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_mean( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def var(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_var( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def std(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_std( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def count(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_count( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + + def sem(self, *args, **kwargs): + return self._dataframe.__constructor__( + query_compiler=self._query_compiler.expanding_sem( + self.axis, self.expanding_args, *args, **kwargs + ) + ) + diff --git a/modin/utils.py b/modin/utils.py index c7015130f86..c9e7374d0ec 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -539,7 +539,7 @@ def get_current_execution(): str Returns On-like string. """ - return f"{'Experimental' if IsExperimental.get() else ''}{StorageFormat.get()}On{Engine.get()}" + return f"{'Experimental' if IsExperimental.get() else ''}{StorageFormat.get()}{'On' if StorageFormat.get() != '' else ''}{Engine.get()}" def instancer(_class):