Skip to content

Commit

Permalink
FIX-modin-project#2616: Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Devin Petersohn <[email protected]>
  • Loading branch information
devin-petersohn committed Jan 20, 2021
1 parent 16881dc commit 78f7da7
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 12 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ jobs:
run: python -m pytest modin/test/backends/base/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_npartitions.py

test-defaults:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
run: python -m pytest modin/test/test_envvar_catcher.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_npartitions.py

test-defaults:
runs-on: ubuntu-latest
Expand Down
13 changes: 1 addition & 12 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@
# Set this so that Pandas doesn't try to multithread by itself
os.environ["OMP_NUM_THREADS"] = "1"

num_cpus = 1

_is_first_update = {}
dask_client = None
_NOINIT_ENGINES = {
Expand All @@ -101,11 +99,10 @@


def _update_engine(publisher: Parameter):
global dask_client, num_cpus
global dask_client
from modin.config import Backend, CpuCount

if publisher.get() == "Ray":
import ray
from modin.engines.ray.utils import initialize_ray

# With OmniSci backend there is only a single worker per node
Expand All @@ -115,21 +112,15 @@ def _update_engine(publisher: Parameter):
os.environ["OMP_NUM_THREADS"] = str(multiprocessing.cpu_count())
if _is_first_update.get("Ray", True):
initialize_ray()
num_cpus = ray.cluster_resources()["CPU"]
elif publisher.get() == "Dask":
from distributed.client import get_client

if _is_first_update.get("Dask", True):
from modin.engines.dask.utils import initialize_dask

initialize_dask()
num_cpus = len(get_client().ncores())

elif publisher.get() == "Cloudray":
from modin.experimental.cloud import get_connection

conn = get_connection()
remote_ray = conn.modules["ray"]
if _is_first_update.get("Cloudray", True):

@conn.teleport
Expand All @@ -151,8 +142,6 @@ def init_remote_ray(partition):
import modin.data_management.factories.dispatcher # noqa: F401
else:
get_connection().modules["modin"].set_backends("Ray", Backend.get())

num_cpus = remote_ray.cluster_resources()["CPU"]
elif publisher.get() == "Cloudpython":
from modin.experimental.cloud import get_connection

Expand Down

0 comments on commit 78f7da7

Please sign in to comment.