From 9a6695d3b36cb6c21e835b10d62f26a474583f15 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Tue, 12 Jan 2021 12:08:06 -0600 Subject: [PATCH] REFACTOR-#2580: Move automatic engine init to after data ingestion (#2581) * REFACTOR-#2580: Move automatic engine init to after data ingestion * Resovles #2580 Instead of automatically starting the engine when Modin is imported, we start it after the first time the user reads or creates a dataframe. This is intended to help downstream libraries not need the engine to check for typing, as well as clear up some transient errors that can occur with certain engines on large machines. I have also added a warning message that informs the user how to clear the message. We will likely need a way to suppress these errors, because many users will not care about them and potentially want to suppress. We will probably also want to add a benchmarking page on best practices for benchmarking because this change can give the impression of a performance degradation on data ingestion even though nothing is changing from that perspective. Signed-off-by: Devin Petersohn * REFACTOR-#2580: Add to experimental API Signed-off-by: Devin Petersohn * REFACTOR-#2580: Add `read_feather` and `read_clipboard` Signed-off-by: Devin Petersohn * REFACTOR-#2580: Remove redundant error message Signed-off-by: Devin Petersohn --- modin/engines/dask/utils.py | 35 +++++++++++++++++++++++++++++ modin/engines/ray/utils.py | 29 ++++++++++-------------- modin/error_message.py | 8 +++++++ modin/experimental/pandas/io_exp.py | 4 +++- modin/pandas/__init__.py | 23 +++++-------------- modin/pandas/dataframe.py | 4 +++- modin/pandas/io.py | 23 ++++++++++++++++++- modin/pandas/series.py | 4 +++- 8 files changed, 91 insertions(+), 39 deletions(-) create mode 100644 modin/engines/dask/utils.py diff --git a/modin/engines/dask/utils.py b/modin/engines/dask/utils.py new file mode 100644 index 00000000000..ae0c6db4974 --- /dev/null +++ b/modin/engines/dask/utils.py @@ -0,0 +1,35 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +from modin.config import CpuCount +from modin.error_message import ErrorMessage + + +def initialize_dask(): + from distributed.client import get_client + + try: + get_client() + except ValueError: + from distributed import Client + + # The indentation here is intentional, we want the code to be indented. + ErrorMessage.not_initialized( + "Dask", + """ + from distributed import Client + + client = Client() +""", + ) + Client(n_workers=CpuCount.get()) diff --git a/modin/engines/ray/utils.py b/modin/engines/ray/utils.py index 2454464f7b7..4641fa5d31b 100644 --- a/modin/engines/ray/utils.py +++ b/modin/engines/ray/utils.py @@ -12,7 +12,6 @@ # governing permissions and limitations under the License. import builtins -import threading import os import sys @@ -94,7 +93,7 @@ def initialize_ray( """ import ray - if threading.current_thread().name == "MainThread" or override_is_cluster: + if not ray.is_initialized() or override_is_cluster: import secrets cluster = override_is_cluster or IsRayCluster.get() @@ -111,6 +110,17 @@ def initialize_ray( logging_level=100, ) else: + from modin.error_message import ErrorMessage + + # This string is intentionally formatted this way. We want it indented in + # the warning message. + ErrorMessage.not_initialized( + "Ray", + """ + import ray + ray.init() +""", + ) object_store_memory = Memory.get() plasma_directory = RayPlasmaDir.get() if IsOutOfCore.get(): @@ -148,21 +158,6 @@ def initialize_ray( _memory=object_store_memory, _lru_evict=True, ) - - global_node = ray.worker._global_node - # Check only for head node - if global_node.head: - import psutil - from modin.error_message import ErrorMessage - - ray_session_dir = os.path.dirname(global_node._session_dir) - ray_free_space_GB = psutil.disk_usage(ray_session_dir).free // 10 ** 9 - ErrorMessage.single_warning( - f"Modin Ray engine was started with {ray_free_space_GB} GB free space avaliable, " - "if it is not enough for your application, please set environment variable " - "MODIN_ON_RAY_PLASMA_DIR=/directory/without/space/limiting" - ) - _move_stdlib_ahead_of_site_packages() ray.worker.global_worker.run_function_on_all_workers( _move_stdlib_ahead_of_site_packages diff --git a/modin/error_message.py b/modin/error_message.py index 32c3b231848..59694ad9699 100644 --- a/modin/error_message.py +++ b/modin/error_message.py @@ -75,3 +75,11 @@ def missmatch_with_pandas(cls, operation, message): cls.single_warning( f"`{operation}` implementation has mismatches with pandas:\n{message}." ) + + @classmethod + def not_initialized(cls, engine, code): + warnings.warn( + "{} execution environment not yet initialized. Initializing...\n" + "To remove this warning, run the following python code before doing dataframe operations:\n" + "{}".format(engine, code) + ) diff --git a/modin/experimental/pandas/io_exp.py b/modin/experimental/pandas/io_exp.py index ccf73460235..f29aab362b3 100644 --- a/modin/experimental/pandas/io_exp.py +++ b/modin/experimental/pandas/io_exp.py @@ -15,7 +15,8 @@ from . import DataFrame from modin.data_management.factories.dispatcher import EngineDispatcher -from modin.config import IsExperimental +from modin.config import IsExperimental, Engine +from ...pandas import _update_engine def read_sql( @@ -63,6 +64,7 @@ def read_sql( Returns: Pandas Dataframe """ + Engine.subscribe(_update_engine) assert IsExperimental.get(), "This only works in experimental mode" _, _, _, kwargs = inspect.getargvalues(inspect.currentframe()) return DataFrame(query_compiler=EngineDispatcher.read_sql(**kwargs)) diff --git a/modin/pandas/__init__.py b/modin/pandas/__init__.py index 6efd16142d7..1b5a18f7654 100644 --- a/modin/pandas/__init__.py +++ b/modin/pandas/__init__.py @@ -83,7 +83,6 @@ NamedAgg, NA, ) -import threading import os import multiprocessing @@ -118,24 +117,14 @@ def _update_engine(publisher: Parameter): if _is_first_update.get("Ray", True): initialize_ray() num_cpus = ray.cluster_resources()["CPU"] - elif publisher.get() == "Dask": # pragma: no cover + elif publisher.get() == "Dask": from distributed.client import get_client - if threading.current_thread().name == "MainThread" and _is_first_update.get( - "Dask", True - ): - import warnings + if _is_first_update.get("Dask", True): + from modin.engines.dask.utils import initialize_dask - warnings.warn("The Dask Engine for Modin is experimental.") - - try: - dask_client = get_client() - except ValueError: - from distributed import Client - - dask_client = Client(n_workers=CpuCount.get()) - - num_cpus = len(dask_client.ncores()) + initialize_dask() + num_cpus = len(get_client().ncores()) elif publisher.get() == "Cloudray": from modin.experimental.cloud import get_connection @@ -177,8 +166,6 @@ def init_remote_ray(partition): DEFAULT_NPARTITIONS = max(4, int(num_cpus)) -Engine.subscribe(_update_engine) - from .. import __version__ from .dataframe import DataFrame from .io import ( diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 67f5f4c035d..68a67036c8a 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -45,11 +45,12 @@ from modin.error_message import ErrorMessage from modin.utils import _inherit_docstrings, to_pandas, hashable -from modin.config import IsExperimental +from modin.config import Engine, IsExperimental from .utils import ( from_pandas, from_non_pandas, ) +from . import _update_engine from .iterator import PartitionIterator from .series import Series from .base import BasePandasDataset, _ATTRS_NO_LOOKUP @@ -87,6 +88,7 @@ def __init__( query_compiler: query_compiler A query compiler object to manage distributed computation. """ + Engine.subscribe(_update_engine) if isinstance(data, (DataFrame, Series)): self._query_compiler = data._query_compiler.copy() if index is not None and any(i not in data.index for i in index): diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 8e2c1bdd85d..bc8bb30b849 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -32,7 +32,8 @@ from modin.error_message import ErrorMessage from .dataframe import DataFrame -from modin.utils import _inherit_func_docstring, _inherit_docstrings +from modin.utils import _inherit_func_docstring, _inherit_docstrings, Engine +from . import _update_engine PQ_INDEX_REGEX = re.compile(r"__index_level_\d+__") @@ -130,6 +131,7 @@ def _read(**kwargs): """ from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) pd_obj = EngineDispatcher.read_csv(**kwargs) # This happens when `read_csv` returns a TextFileReader object for iterating through if isinstance(pd_obj, pandas.io.parsers.TextFileReader): @@ -149,6 +151,7 @@ def _read(**kwargs): def read_parquet(path, engine: str = "auto", columns=None, **kwargs): from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame( query_compiler=EngineDispatcher.read_parquet( path=path, columns=columns, engine=engine, **kwargs @@ -178,6 +181,7 @@ def read_json( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_json(**kwargs)) @@ -204,6 +208,7 @@ def read_gbq( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_gbq(**kwargs)) @@ -229,6 +234,7 @@ def read_html( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_html(**kwargs)) @@ -239,6 +245,7 @@ def read_clipboard(sep=r"\s+", **kwargs): # pragma: no cover from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_clipboard(**kwargs)) @@ -274,6 +281,7 @@ def read_excel( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) intermediate = EngineDispatcher.read_excel(**kwargs) if isinstance(intermediate, (OrderedDict, dict)): parsed = type(intermediate)() @@ -303,6 +311,7 @@ def read_hdf( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_hdf(**kwargs)) @@ -312,6 +321,7 @@ def read_feather(path, columns=None, use_threads: bool = True): from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_feather(**kwargs)) @@ -332,6 +342,7 @@ def read_stata( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_stata(**kwargs)) @@ -348,6 +359,7 @@ def read_sas( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_sas(**kwargs)) @@ -359,6 +371,7 @@ def read_pickle( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_pickle(**kwargs)) @@ -377,6 +390,7 @@ def read_sql( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) if kwargs.get("chunksize") is not None: ErrorMessage.default_to_pandas("Parameters provided [chunksize]") df_gen = pandas.read_sql(**kwargs) @@ -396,6 +410,7 @@ def read_fwf( ): from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) _, _, _, kwargs = inspect.getargvalues(inspect.currentframe()) kwargs.update(kwargs.pop("kwds", {})) pd_obj = EngineDispatcher.read_fwf(**kwargs) @@ -424,6 +439,7 @@ def read_sql_table( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_sql_table(**kwargs)) @@ -441,6 +457,7 @@ def read_sql_query( from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame(query_compiler=EngineDispatcher.read_sql_query(**kwargs)) @@ -452,6 +469,7 @@ def read_spss( ): from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) return DataFrame( query_compiler=EngineDispatcher.read_spss(path, usecols, convert_categoricals) ) @@ -466,6 +484,7 @@ def to_pickle( ): from modin.data_management.factories.dispatcher import EngineDispatcher + Engine.subscribe(_update_engine) if isinstance(obj, DataFrame): obj = obj._query_compiler return EngineDispatcher.to_pickle( @@ -485,6 +504,7 @@ def json_normalize( max_level: Optional[int] = None, ) -> DataFrame: ErrorMessage.default_to_pandas("json_normalize") + Engine.subscribe(_update_engine) return DataFrame( pandas.json_normalize( data, record_path, meta, meta_prefix, record_prefix, errors, sep, max_level @@ -497,6 +517,7 @@ def read_orc( path: FilePathOrBuffer, columns: Optional[List[str]] = None, **kwargs ) -> DataFrame: ErrorMessage.default_to_pandas("read_orc") + Engine.subscribe(_update_engine) return DataFrame(pandas.read_orc(path, columns, **kwargs)) diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 48e2c274824..b465a14e337 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -36,12 +36,13 @@ from typing import Union, Optional import warnings -from modin.utils import _inherit_docstrings, to_pandas +from modin.utils import _inherit_docstrings, to_pandas, Engine from modin.config import IsExperimental from .base import BasePandasDataset, _ATTRS_NO_LOOKUP from .iterator import PartitionIterator from .utils import from_pandas, is_scalar from .accessor import CachedAccessor, SparseAccessor +from . import _update_engine @_inherit_docstrings(pandas.Series, excluded=[pandas.Series.__init__]) @@ -77,6 +78,7 @@ def __init__( query_compiler: query_compiler A query compiler object to create the Series from. """ + Engine.subscribe(_update_engine) if isinstance(data, type(self)): query_compiler = data._query_compiler.copy() if index is not None: