Skip to content

Commit

Permalink
REFACTOR-modin-project#2059: Migrate MODIN_CPUS
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <[email protected]>
  • Loading branch information
vnlitvinov committed Oct 12, 2020
1 parent a09fca3 commit c7481cf
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
6 changes: 6 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class CpuCount(EnvironmentVariable, type=int):

varname = "MODIN_CPUS"

@classmethod
def _get_default(cls):
import multiprocessing

return multiprocessing.cpu_count()


class Memory(EnvironmentVariable, type=int):
"""
Expand Down
6 changes: 2 additions & 4 deletions modin/engines/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
import threading
import os
import sys
import multiprocessing

from modin.config import IsRayCluster, RayRedisAddress
from modin.config import IsRayCluster, RayRedisAddress, CpuCount


def handle_ray_task_error(e):
Expand Down Expand Up @@ -105,7 +104,6 @@ def initialize_ray(
logging_level=100,
)
else:
num_cpus = os.environ.get("MODIN_CPUS", None) or multiprocessing.cpu_count()
object_store_memory = os.environ.get("MODIN_MEMORY", None)
plasma_directory = os.environ.get("MODIN_ON_RAY_PLASMA_DIR", None)
if os.environ.get("MODIN_OUT_OF_CORE", "False").title() == "True":
Expand Down Expand Up @@ -133,7 +131,7 @@ def initialize_ray(
else:
object_store_memory = int(object_store_memory)
ray.init(
num_cpus=int(num_cpus),
num_cpus=CpuCount.get(),
include_dashboard=False,
ignore_reinit_error=True,
_plasma_directory=plasma_directory,
Expand Down
9 changes: 3 additions & 6 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
import os
import multiprocessing

from modin.config import Engine, Backend, Parameter
from modin.config import Engine, Backend, Parameter, CpuCount

# Set this so that Pandas doesn't try to multithread by itself
os.environ["OMP_NUM_THREADS"] = "1"
Expand All @@ -112,7 +112,7 @@ def _update_engine(publisher: Parameter):
# With OmniSci backend there is only a single worker per node
# and we allow it to work on all cores.
if Backend.get() == "Omnisci":
os.environ["MODIN_CPUS"] = "1"
CpuCount.put(1)
os.environ["OMP_NUM_THREADS"] = str(multiprocessing.cpu_count())
if _is_first_update.get("Ray", True):
initialize_ray()
Expand All @@ -132,10 +132,7 @@ def _update_engine(publisher: Parameter):
except ValueError:
from distributed import Client

num_cpus = (
os.environ.get("MODIN_CPUS", None) or multiprocessing.cpu_count()
)
dask_client = Client(n_workers=int(num_cpus))
dask_client = Client(n_workers=CpuCount.get())

elif publisher.get() == "Cloudray":
from modin.experimental.cloud import get_connection
Expand Down

0 comments on commit c7481cf

Please sign in to comment.