Skip to content

Commit

Permalink
FEAT-modin-project#4931: Create a query compiler that can connect to …
Browse files Browse the repository at this point in the history
…a service

Signed-off-by: Devin Petersohn <[email protected]>

Fixes to pass CI + docs for io.py

Update implementation

Signed-off-by: Devin Petersohn <[email protected]>

Fix some things

Signed-off-by: Devin Petersohn <[email protected]>

Lint fixes

Fix put

Signed-off-by: Devin Petersohn <[email protected]>

Clean up and add new details

Signed-off-by: Devin Petersohn <[email protected]>

Use fsspec to get full path and allow URLs

Signed-off-by: Devin Petersohn <[email protected]>

Add lazy loc

Signed-off-by: Devin Petersohn <[email protected]>

fixes for tests

porting more tests

more fixes

moar fixes

Raise exception

Signed-off-by: Devin Petersohn <[email protected]>

Lint fixes

Return Python as the default modin engine

Handle indexing case for client qc

Call fast path for __getitem__ if not lazy

Remove user warning for Python-engine fall back

Add init

Signed-off-by: Devin Petersohn <[email protected]>

Implement free as a no-op

Signed-off-by: Devin Petersohn <[email protected]>

Add support for replace - client side

Fix a couple of issues with Client

Signed-off-by: Devin Petersohn <[email protected]>

Throw errors on to_pandas

Signed-off-by: Devin Petersohn <[email protected]>

Do not default to pandas for str_repeat

Add support for 18 datetime functions/properties

Fix columns caching when renaming columns

Fix test_query: put backticks back for col names

Add support for astype -- client side

hard coded changes for functions

Client support for str_(en/de)code, to_datetime

Add all missing query compiler methods.

Signed-off-by: mvashishtha <[email protected]>

Fix getitem_column_array and take_2d.

Signed-off-by: mvashishtha <[email protected]>

Fix getitem_column_array and take_2d.

Signed-off-by: mvashishtha <[email protected]>

Fix again.

Signed-off-by: mvashishtha <[email protected]>

Fix more bugs.

Signed-off-by: mvashishtha <[email protected]>

More fixes.

Signed-off-by: mvashishtha <[email protected]>

Fix more bugs-- pushdown tests test_dates and test_pivot still broken due to service bugs.

Signed-off-by: mvashishtha <[email protected]>

Fix typo. Note drop() broken because service requires you to specify both argument and client QC at base of this PR uses default Nones.

Signed-off-by: mvashishtha <[email protected]>

Add query compiler class.

Signed-off-by: mvashishtha <[email protected]>

Testing a commit

Initial changes for adding support for Expanding

FEAT Support for rolling.sem

FEAT support for Expanding sum, min, max, mean, var, std, count, sem

Removing extratenous comment

REFACTOR: Remove defaults to pandas at API layer and add some corresponding client QC methods.

Signed-off-by: mvashishtha <[email protected]>

Add more methods.

Signed-off-by: mvashishtha <[email protected]>

Fix expanding.

Signed-off-by: mvashishtha <[email protected]>

Add ewm.

Signed-off-by: mvashishtha <[email protected]>

Revert whitespace.

Signed-off-by: mvashishtha <[email protected]>

Fix to_numpy by making it like to_pandas.

Signed-off-by: mvashishtha <[email protected]>

Remove extra to_numpy.

Signed-off-by: mvashishtha <[email protected]>

Pass kwargs

Signed-off-by: mvashishtha <[email protected]>

Fix DataFrame import for isin.

Signed-off-by: mvashishtha <[email protected]>

Fix again.

Signed-off-by: mvashishtha <[email protected]>

Remove breakpoint

Signed-off-by: mvashishtha <[email protected]>

Tell if series.

Signed-off-by: mvashishtha <[email protected]>

Fix client qc.

Signed-off-by: mvashishtha <[email protected]>

Add self_is_series.

Signed-off-by: mvashishtha <[email protected]>

FIX: Set numeric_only to True in groupby quantile

Add some comments

Fix str_cat/fullmatch/removeprefix/removesuffix/translate/wrap (modin-project#44)

* Fix str_cat/fullmatch/removeprefix/removesuffix/translate/wrap

* Update modin/core/storage_formats/base/query_compiler.py

Co-authored-by: Mahesh Vashishtha <[email protected]>

* Update modin/pandas/series_utils.py

Co-authored-by: Mahesh Vashishtha <[email protected]>

* Update modin/core/storage_formats/base/query_compiler.py

Co-authored-by: Mahesh Vashishtha <[email protected]>

Co-authored-by: Mahesh Vashishtha <[email protected]>

FEAT Support expanding.aggregate (modin-project#45)

Fix at_time and between_time. (modin-project#43)

Signed-off-by: mvashishtha <[email protected]>

Signed-off-by: mvashishtha <[email protected]>

Add QC method for groupby.sem (modin-project#47)

* FEAT: Add partial support for groupby.sem()

* Add sem changes to groupby

Fix nlargest and nsmallest Series support (modin-project#46)

* Fix nlargest and smallest support

Signed-off-by: Naren Krishna <[email protected]>

Remove client query compiler's columnarize. (modin-project#48)

Signed-off-by: mvashishtha <[email protected]>

Signed-off-by: mvashishtha <[email protected]>

Fix info and set memory_usage=False. (modin-project#49)

Signed-off-by: mvashishtha <[email protected]>

Signed-off-by: mvashishtha <[email protected]>

POND-815 fixes for 21 column dataset (modin-project#50)

* POND-815 fixes for 21 column dataset

* Update modin/pandas/base.py

Co-authored-by: helmeleegy <[email protected]>

---------

Co-authored-by: helmeleegy <[email protected]>

Bring in upstream series binary operation fix 6d5545f… (modin-project#52)

* Bring in upstream series binary operation fix 6d5545f.

Signed-off-by: mvashishtha <[email protected]>

* Update modin/pandas/series.py

Co-authored-by: Karthik Velayutham <[email protected]>

---------

Signed-off-by: mvashishtha <[email protected]>
Co-authored-by: Karthik Velayutham <[email protected]>

Support groupby first/last (modin-project#53)

Signed-off-by: Naren Krishna <[email protected]>

FEAT: Add initial partial support for groupby.cumcount() (modin-project#54)

* FEAT: Add partial support for cumcount

* Remove the set_index_name

* Squeeze the result

* Write cumcount name to None

* Can't set dtype to int64

Fix resample sum, prod, size (modin-project#56)

Signed-off-by: Naren Krishna <[email protected]>

POND-184: fix describe and simplify query compiler interface (modin-project#55)

* Fix describe

Signed-off-by: mvashishtha <[email protected]>

* Pass datetime_is_numeric.

Signed-off-by: mvashishtha <[email protected]>

---------

Signed-off-by: mvashishtha <[email protected]>

Fix dt_day_of_week/day_of_year, str_cat/extract/partition/replace/rpartition (modin-project#51)

* Fix dt_day_of_week/day_of_year, str_partition/replace/rpartition

* Fix str_extract

Revert "Fix dt_day_of_week/day_of_year, str_cat/extract/partition/replace/rpartition (modin-project#51)" (modin-project#58)

This reverts commit f7a31ab.

Revert "Revert "Fix dt_day_of_week/day_of_year, str_cat/extract/partition/replace/rpartition (modin-project#51)" (modin-project#58)" (modin-project#60)

This reverts commit ad9231d.

Add query compiler method for groupby.prod() (modin-project#57)

Signed-off-by: Naren Krishna <[email protected]>

FEAT: Add support for groupby.head and groupby.tail (modin-project#61)

* FEAT: Add support for groupby.head and groupby.tail

* Change _change_index

FEAT: Add partial support for groupby.nth (modin-project#62)

FIX: Push first and last down to query compiler. (modin-project#64)

* FIX: Push first and last down to query compiler.

Signed-off-by: mvashishtha <[email protected]>

* Fix last.

Signed-off-by: mvashishtha <[email protected]>

---------

Signed-off-by: mvashishtha <[email protected]>

FEAT: Add partial support for groupby.ngroup (modin-project#65)

* FEAT: Add partial support for groupby.ngroup

* Name of result should be none for now

Add client support for SeriesGroupby unique, nsmallest, nlargest (modin-project#63)

* Add client support for SeriesGroupby unique, nsmallest, nlargest

Signed-off-by: Naren Krishna <[email protected]>

---------

Signed-off-by: Naren Krishna <[email protected]>

Push memory_usage entirely to query compiler [change is not to be upstreamed to Modin] (modin-project#66)

* Fix dataframe memory usage.

Signed-off-by: mvashishtha <[email protected]>

* Fix series memory_usage() the same way.

Signed-off-by: mvashishtha <[email protected]>

---------

Signed-off-by: mvashishtha <[email protected]>

FIX: allow updating backend query compilers in place. (modin-project#67)

* FIX: Mutate client query compiler columns and index in the service.

Motivation: Align axis update semantics across query compilers. In the base
query compiler and even our service's query compiler, you can update the index
and columns in place. However, the service gives no way to update axes of a
query compiler.

Right now, for inplace updates, service exposes an extra method rename(), and
client query compiler uses this to get the id of a new compiler with updated
axis, and then updates its id ID of the new query compiler.

This change might be the first to make the service present a mutable interface
for a backend query compiler. That seems safe to me, except I had to make
copy() get a new query compiler copied from the old query compiler, because we
can't let updates to the new query compiler change the original (or vice versa).

Signed-off-by: mvashishtha <[email protected]>

* Add a comment.

Signed-off-by: mvashishtha <[email protected]>

---------

Signed-off-by: mvashishtha <[email protected]>

FEAT replace groupby.fillna with a simpler logic (modin-project#68)

* FEAT Support expanding.aggregate

* Replaced groupby.fillna logic with a simpler one

* Fix in groupby.fillna. Work object was causing problems.

* Only need to change _check_index_name to _check_index

* Removed commented out code.
  • Loading branch information
devin-petersohn committed Feb 15, 2023
1 parent 0a2c0de commit 20653a4
Show file tree
Hide file tree
Showing 26 changed files with 2,230 additions and 589 deletions.
33 changes: 33 additions & 0 deletions check_client_query_compiler_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import inspect

from modin.core.execution.client.query_compiler import ClientQueryCompiler
from modin.core.storage_formats.base import BaseQueryCompiler


KNOWN_MISSING = frozenset(
[
# conj no longer exists in pandas
"conj",
# no need to make service implement get_axis, which I think
# is used internally in a few places.
"get_axis",
# we don't need to forward these two either
"get_index_name",
"get_index_names",
# Base QC can do this for us
"has_multiindex",
# Base QC can do this for us
"is_series_like",
# Let Base QC call drop() for us
"delitem",
]
)

print(
"base query compiler methods that are not in client query compiler, but should be:"
)
for name, f in inspect.getmembers(ClientQueryCompiler, predicate=inspect.isfunction):
if name in KNOWN_MISSING:
continue
if f == getattr(BaseQueryCompiler, name, None):
print(name)
20 changes: 20 additions & 0 deletions modin/_compat/pandas_api/abc/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,23 @@ def _init(self, dataframe, rolling_args, axis):
The axis to build Rolling against.
"""
pass

class BaseCompatibleExpanding(ClassLogger):
"""Interface for class compatibility layer for Rolling."""

def _init(self, dataframe, expanding_args, axis):
"""
Initialize the Rolling object for real.
Utilize translated potentially pandas-specific arguments.
Parameters
----------
dataframe : DataFrame
The dataframe object to apply rolling functions against.
rolling_args : sequence
The arguments to be passed to .Rolling() except dataframe.
axis : {0, 1}
The axis to build Rolling against.
"""
pass
2 changes: 2 additions & 0 deletions modin/_compat/pandas_api/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .latest import LatestCompatibleSeriesGroupBy as SeriesGroupByCompat
from .latest import LatestCompatibleWindow as WindowCompat
from .latest import LatestCompatibleRolling as RollingCompat
from .latest import LatestCompatibleExpanding as ExpandingCompat

__all__ = [
"BasePandasDatasetCompat",
Expand All @@ -44,4 +45,5 @@
"SeriesGroupByCompat",
"WindowCompat",
"RollingCompat",
"ExpandingCompat"
]
3 changes: 2 additions & 1 deletion modin/_compat/pandas_api/latest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .dataframe import LatestCompatibleDataFrame
from .series import LatestCompatibleSeries
from .groupby import LatestCompatibleDataFrameGroupBy, LatestCompatibleSeriesGroupBy
from .window import LatestCompatibleWindow, LatestCompatibleRolling
from .window import LatestCompatibleWindow, LatestCompatibleRolling, LatestCompatibleExpanding

__all__ = [
"LatestCompatibleBasePandasDataset",
Expand All @@ -25,4 +25,5 @@
"LatestCompatibleSeriesGroupBy",
"LatestCompatibleWindow",
"LatestCompatibleRolling",
"LatestCompatibleExpanding"
]
9 changes: 5 additions & 4 deletions modin/_compat/pandas_api/latest/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import numpy as np
import pandas
from pandas.io.formats.info import SeriesInfo
from pandas.util._validators import validate_bool_kwarg
from pandas._libs.lib import no_default, NoDefault
from pandas._typing import Axis
Expand Down Expand Up @@ -44,12 +45,12 @@ def info(
memory_usage: "bool | str | None" = None,
show_counts: "bool" = True,
):
return self._default_to_pandas(
pandas.Series.info,
verbose=verbose,
# Can't do memory_usage yet
memory_usage = False
return SeriesInfo(self, memory_usage).render(
buf=buf,
max_cols=max_cols,
memory_usage=memory_usage,
verbose=verbose,
show_counts=show_counts,
)

Expand Down
24 changes: 23 additions & 1 deletion modin/_compat/pandas_api/latest/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import pandas.core.window.rolling

from ..abc.window import BaseCompatibleWindow, BaseCompatibleRolling
from ..abc.window import BaseCompatibleWindow, BaseCompatibleRolling, BaseCompatibleExpanding
from modin.utils import _inherit_docstrings, append_to_docstring


Expand Down Expand Up @@ -79,3 +79,25 @@ def __init__(
],
axis,
)

@append_to_docstring("Compatibility layer for 'latest pandas' for Expanding.")
@_inherit_docstrings(pandas.core.window.expanding.Expanding)
class LatestCompatibleExpanding(BaseCompatibleExpanding):
def __init__(
self,
dataframe,
min_periods=1,
center=None,
axis=0,
method="single"
):
self._init(
dataframe,
[
min_periods,
center,
axis,
method,
],
axis
)
11 changes: 6 additions & 5 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class Engine(EnvironmentVariable, type=str):
"""Distribution engine to run queries by."""

varname = "MODIN_ENGINE"
choices = ("Ray", "Dask", "Python", "Native")
choices = ("Ray", "Dask", "Python", "Native", "Client")

@classmethod
def _get_default(cls) -> str:
Expand Down Expand Up @@ -131,17 +131,18 @@ def _get_default(cls) -> str:
pass
else:
return "Native"
raise ImportError(
"Please refer to installation documentation page to install an engine"
)

# If we can't import any other engines we should go ahead and default to Python being
# the default backend engine.
return "Python"


class StorageFormat(EnvironmentVariable, type=str):
"""Engine to run on a single node of distribution."""

varname = "MODIN_STORAGE_FORMAT"
default = "Pandas"
choices = ("Pandas", "Hdk", "Pyarrow", "Cudf")
choices = ("Pandas", "Hdk", "Pyarrow", "Cudf", "")


class IsExperimental(EnvironmentVariable, type=bool):
Expand Down
Empty file.
124 changes: 124 additions & 0 deletions modin/core/execution/client/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""The module holds the factory which performs I/O using pandas on a Client."""

from modin.core.io.io import BaseIO
import fsspec
from .query_compiler import ClientQueryCompiler


class ClientIO(BaseIO):
"""Factory providing methods for performing I/O operations using a given Client as the execution engine."""

_server_conn = None
_data_conn = None
query_compiler_cls = ClientQueryCompiler

@classmethod
def set_server_connection(cls, conn):
"""
Set the server connection for the I/O object.
Parameters
----------
conn : Any
Connection object that implements various methods.
"""
cls._server_conn = conn

@classmethod
def set_data_connection(cls, conn):
"""
Set the data connection for the I/O object.
Parameters
----------
conn : Any
Connection object that is implementation specific.
"""
cls._data_conn = conn

@classmethod
def read_csv(cls, filepath_or_buffer, **kwargs):
"""
Read CSV data from given filepath or buffer.
Parameters
----------
filepath_or_buffer : str, path object or file-like object
`filepath_or_buffer` parameter of read functions.
**kwargs : dict
Parameters of ``read_csv`` function.
Returns
-------
ClientQueryCompiler
Query compiler with CSV data read in.
"""
if isinstance(filepath_or_buffer, str):
filepath_or_buffer = fsspec.open(filepath_or_buffer).full_name
if filepath_or_buffer.startswith("file://"):
# We will do this so that the backend can know whether this
# is a path or a URL.
filepath_or_buffer = filepath_or_buffer[7:]
else:
raise NotImplementedError("Only filepaths are supported for read_csv")
if cls._server_conn is None:
raise ConnectionError(
"Missing server connection, did you initialize the connection?"
)
return cls.query_compiler_cls(
cls._server_conn.read_csv(cls._data_conn, filepath_or_buffer, **kwargs)
)

@classmethod
def read_sql(cls, sql, con, **kwargs):
"""
Read data from a SQL connection.
Parameters
----------
sql : str or SQLAlchemy Selectable (select or text object)
SQL query to be executed or a table name.
con : SQLAlchemy connectable, str, or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
for engine disposal and connection closure for the SQLAlchemy
connectable; str connections are closed automatically. See
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
**kwargs : dict
Parameters of ``read_sql`` function.
Returns
-------
ClientQueryCompiler
Query compiler with data read in from SQL connection.
"""
if isinstance(con, str) and con.lower() == "auto" and cls._data_conn is None:
raise ConnectionError(
"Cannot connect with parameter 'auto' because connection is not set. Did you initialize it?"
)
if cls._data_conn is None:
cls._data_conn = con
if cls._server_conn is None:
raise ConnectionError(
"Missing server connection, did you initialize the connection?"
)
return cls.query_compiler_cls(
cls._server_conn.read_sql(sql, cls._data_conn, **kwargs)
)

@classmethod
def to_sql(cls, qc, **kwargs):
qc.to_sql(**kwargs)
Loading

0 comments on commit 20653a4

Please sign in to comment.