Skip to content

Commit

Permalink
REFACTOR-#2580: Move automatic engine init to after data ingestion (#…
Browse files Browse the repository at this point in the history
…2581)

* REFACTOR-#2580: Move automatic engine init to after data ingestion

* Resovles #2580

Instead of automatically starting the engine when Modin is imported,
we start it after the first time the user reads or creates a dataframe.
This is intended to help downstream libraries not need the engine to
check for typing, as well as clear up some transient errors that can
occur with certain engines on large machines.

I have also added a warning message that informs the user how to clear
the message. We will likely need a way to suppress these errors, because
many users will not care about them and potentially want to suppress.
We will probably also want to add a benchmarking page on best practices
for benchmarking because this change can give the impression of a
performance degradation on data ingestion even though nothing is
changing from that perspective.

Signed-off-by: Devin Petersohn <[email protected]>

* REFACTOR-#2580: Add to experimental API

Signed-off-by: Devin Petersohn <[email protected]>

* REFACTOR-#2580: Add `read_feather` and `read_clipboard`

Signed-off-by: Devin Petersohn <[email protected]>

* REFACTOR-#2580: Remove redundant error message

Signed-off-by: Devin Petersohn <[email protected]>
  • Loading branch information
devin-petersohn authored Jan 12, 2021
1 parent bcab1cc commit 9a6695d
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 39 deletions.
35 changes: 35 additions & 0 deletions modin/engines/dask/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.config import CpuCount
from modin.error_message import ErrorMessage


def initialize_dask():
from distributed.client import get_client

try:
get_client()
except ValueError:
from distributed import Client

# The indentation here is intentional, we want the code to be indented.
ErrorMessage.not_initialized(
"Dask",
"""
from distributed import Client
client = Client()
""",
)
Client(n_workers=CpuCount.get())
29 changes: 12 additions & 17 deletions modin/engines/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# governing permissions and limitations under the License.

import builtins
import threading
import os
import sys

Expand Down Expand Up @@ -94,7 +93,7 @@ def initialize_ray(
"""
import ray

if threading.current_thread().name == "MainThread" or override_is_cluster:
if not ray.is_initialized() or override_is_cluster:
import secrets

cluster = override_is_cluster or IsRayCluster.get()
Expand All @@ -111,6 +110,17 @@ def initialize_ray(
logging_level=100,
)
else:
from modin.error_message import ErrorMessage

# This string is intentionally formatted this way. We want it indented in
# the warning message.
ErrorMessage.not_initialized(
"Ray",
"""
import ray
ray.init()
""",
)
object_store_memory = Memory.get()
plasma_directory = RayPlasmaDir.get()
if IsOutOfCore.get():
Expand Down Expand Up @@ -148,21 +158,6 @@ def initialize_ray(
_memory=object_store_memory,
_lru_evict=True,
)

global_node = ray.worker._global_node
# Check only for head node
if global_node.head:
import psutil
from modin.error_message import ErrorMessage

ray_session_dir = os.path.dirname(global_node._session_dir)
ray_free_space_GB = psutil.disk_usage(ray_session_dir).free // 10 ** 9
ErrorMessage.single_warning(
f"Modin Ray engine was started with {ray_free_space_GB} GB free space avaliable, "
"if it is not enough for your application, please set environment variable "
"MODIN_ON_RAY_PLASMA_DIR=/directory/without/space/limiting"
)

_move_stdlib_ahead_of_site_packages()
ray.worker.global_worker.run_function_on_all_workers(
_move_stdlib_ahead_of_site_packages
Expand Down
8 changes: 8 additions & 0 deletions modin/error_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,11 @@ def missmatch_with_pandas(cls, operation, message):
cls.single_warning(
f"`{operation}` implementation has mismatches with pandas:\n{message}."
)

@classmethod
def not_initialized(cls, engine, code):
warnings.warn(
"{} execution environment not yet initialized. Initializing...\n"
"To remove this warning, run the following python code before doing dataframe operations:\n"
"{}".format(engine, code)
)
4 changes: 3 additions & 1 deletion modin/experimental/pandas/io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

from . import DataFrame
from modin.data_management.factories.dispatcher import EngineDispatcher
from modin.config import IsExperimental
from modin.config import IsExperimental, Engine
from ...pandas import _update_engine


def read_sql(
Expand Down Expand Up @@ -63,6 +64,7 @@ def read_sql(
Returns:
Pandas Dataframe
"""
Engine.subscribe(_update_engine)
assert IsExperimental.get(), "This only works in experimental mode"
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
return DataFrame(query_compiler=EngineDispatcher.read_sql(**kwargs))
23 changes: 5 additions & 18 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
NamedAgg,
NA,
)
import threading
import os
import multiprocessing

Expand Down Expand Up @@ -118,24 +117,14 @@ def _update_engine(publisher: Parameter):
if _is_first_update.get("Ray", True):
initialize_ray()
num_cpus = ray.cluster_resources()["CPU"]
elif publisher.get() == "Dask": # pragma: no cover
elif publisher.get() == "Dask":
from distributed.client import get_client

if threading.current_thread().name == "MainThread" and _is_first_update.get(
"Dask", True
):
import warnings
if _is_first_update.get("Dask", True):
from modin.engines.dask.utils import initialize_dask

warnings.warn("The Dask Engine for Modin is experimental.")

try:
dask_client = get_client()
except ValueError:
from distributed import Client

dask_client = Client(n_workers=CpuCount.get())

num_cpus = len(dask_client.ncores())
initialize_dask()
num_cpus = len(get_client().ncores())

elif publisher.get() == "Cloudray":
from modin.experimental.cloud import get_connection
Expand Down Expand Up @@ -177,8 +166,6 @@ def init_remote_ray(partition):
DEFAULT_NPARTITIONS = max(4, int(num_cpus))


Engine.subscribe(_update_engine)

from .. import __version__
from .dataframe import DataFrame
from .io import (
Expand Down
4 changes: 3 additions & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@

from modin.error_message import ErrorMessage
from modin.utils import _inherit_docstrings, to_pandas, hashable
from modin.config import IsExperimental
from modin.config import Engine, IsExperimental
from .utils import (
from_pandas,
from_non_pandas,
)
from . import _update_engine
from .iterator import PartitionIterator
from .series import Series
from .base import BasePandasDataset, _ATTRS_NO_LOOKUP
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
query_compiler: query_compiler
A query compiler object to manage distributed computation.
"""
Engine.subscribe(_update_engine)
if isinstance(data, (DataFrame, Series)):
self._query_compiler = data._query_compiler.copy()
if index is not None and any(i not in data.index for i in index):
Expand Down
23 changes: 22 additions & 1 deletion modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

from modin.error_message import ErrorMessage
from .dataframe import DataFrame
from modin.utils import _inherit_func_docstring, _inherit_docstrings
from modin.utils import _inherit_func_docstring, _inherit_docstrings, Engine
from . import _update_engine

PQ_INDEX_REGEX = re.compile(r"__index_level_\d+__")

Expand Down Expand Up @@ -130,6 +131,7 @@ def _read(**kwargs):
"""
from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
pd_obj = EngineDispatcher.read_csv(**kwargs)
# This happens when `read_csv` returns a TextFileReader object for iterating through
if isinstance(pd_obj, pandas.io.parsers.TextFileReader):
Expand All @@ -149,6 +151,7 @@ def _read(**kwargs):
def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(
query_compiler=EngineDispatcher.read_parquet(
path=path, columns=columns, engine=engine, **kwargs
Expand Down Expand Up @@ -178,6 +181,7 @@ def read_json(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_json(**kwargs))


Expand All @@ -204,6 +208,7 @@ def read_gbq(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_gbq(**kwargs))


Expand All @@ -229,6 +234,7 @@ def read_html(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_html(**kwargs))


Expand All @@ -239,6 +245,7 @@ def read_clipboard(sep=r"\s+", **kwargs): # pragma: no cover

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_clipboard(**kwargs))


Expand Down Expand Up @@ -274,6 +281,7 @@ def read_excel(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
intermediate = EngineDispatcher.read_excel(**kwargs)
if isinstance(intermediate, (OrderedDict, dict)):
parsed = type(intermediate)()
Expand Down Expand Up @@ -303,6 +311,7 @@ def read_hdf(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_hdf(**kwargs))


Expand All @@ -312,6 +321,7 @@ def read_feather(path, columns=None, use_threads: bool = True):

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_feather(**kwargs))


Expand All @@ -332,6 +342,7 @@ def read_stata(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_stata(**kwargs))


Expand All @@ -348,6 +359,7 @@ def read_sas(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_sas(**kwargs))


Expand All @@ -359,6 +371,7 @@ def read_pickle(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_pickle(**kwargs))


Expand All @@ -377,6 +390,7 @@ def read_sql(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
if kwargs.get("chunksize") is not None:
ErrorMessage.default_to_pandas("Parameters provided [chunksize]")
df_gen = pandas.read_sql(**kwargs)
Expand All @@ -396,6 +410,7 @@ def read_fwf(
):
from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
kwargs.update(kwargs.pop("kwds", {}))
pd_obj = EngineDispatcher.read_fwf(**kwargs)
Expand Down Expand Up @@ -424,6 +439,7 @@ def read_sql_table(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_sql_table(**kwargs))


Expand All @@ -441,6 +457,7 @@ def read_sql_query(

from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=EngineDispatcher.read_sql_query(**kwargs))


Expand All @@ -452,6 +469,7 @@ def read_spss(
):
from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
return DataFrame(
query_compiler=EngineDispatcher.read_spss(path, usecols, convert_categoricals)
)
Expand All @@ -466,6 +484,7 @@ def to_pickle(
):
from modin.data_management.factories.dispatcher import EngineDispatcher

Engine.subscribe(_update_engine)
if isinstance(obj, DataFrame):
obj = obj._query_compiler
return EngineDispatcher.to_pickle(
Expand All @@ -485,6 +504,7 @@ def json_normalize(
max_level: Optional[int] = None,
) -> DataFrame:
ErrorMessage.default_to_pandas("json_normalize")
Engine.subscribe(_update_engine)
return DataFrame(
pandas.json_normalize(
data, record_path, meta, meta_prefix, record_prefix, errors, sep, max_level
Expand All @@ -497,6 +517,7 @@ def read_orc(
path: FilePathOrBuffer, columns: Optional[List[str]] = None, **kwargs
) -> DataFrame:
ErrorMessage.default_to_pandas("read_orc")
Engine.subscribe(_update_engine)
return DataFrame(pandas.read_orc(path, columns, **kwargs))


Expand Down
4 changes: 3 additions & 1 deletion modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
from typing import Union, Optional
import warnings

from modin.utils import _inherit_docstrings, to_pandas
from modin.utils import _inherit_docstrings, to_pandas, Engine
from modin.config import IsExperimental
from .base import BasePandasDataset, _ATTRS_NO_LOOKUP
from .iterator import PartitionIterator
from .utils import from_pandas, is_scalar
from .accessor import CachedAccessor, SparseAccessor
from . import _update_engine


@_inherit_docstrings(pandas.Series, excluded=[pandas.Series.__init__])
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
query_compiler: query_compiler
A query compiler object to create the Series from.
"""
Engine.subscribe(_update_engine)
if isinstance(data, type(self)):
query_compiler = data._query_compiler.copy()
if index is not None:
Expand Down

0 comments on commit 9a6695d

Please sign in to comment.