diff --git a/python/ray/_private/runtime_env/py_modules.py b/python/ray/_private/runtime_env/py_modules.py index e8fa2bd8f754..947c08b6062f 100644 --- a/python/ray/_private/runtime_env/py_modules.py +++ b/python/ray/_private/runtime_env/py_modules.py @@ -2,6 +2,7 @@ import os from types import ModuleType from typing import Any, Dict, Optional +from pathlib import Path from ray.experimental.internal_kv import _internal_kv_initialized from ray._private.runtime_env.context import RuntimeEnvContext @@ -47,6 +48,8 @@ def upload_py_modules_if_needed( if isinstance(module, str): # module_path is a local path or a URI. module_path = module + elif isinstance(module, Path): + module_path = str(module) elif isinstance(module, ModuleType): # NOTE(edoakes): Python allows some installed Python packages to # be split into multiple directories. We could probably handle diff --git a/python/ray/_private/runtime_env/working_dir.py b/python/ray/_private/runtime_env/working_dir.py index d134264c0932..140cadaa4b14 100644 --- a/python/ray/_private/runtime_env/working_dir.py +++ b/python/ray/_private/runtime_env/working_dir.py @@ -1,6 +1,7 @@ import logging import os from typing import Any, Dict, Optional +from pathlib import Path from ray._private.runtime_env.utils import RuntimeEnv from ray.experimental.internal_kv import _internal_kv_initialized @@ -24,10 +25,13 @@ def upload_working_dir_if_needed( if working_dir is None: return runtime_env - if not isinstance(working_dir, str): + if not isinstance(working_dir, str) and not isinstance(working_dir, Path): raise TypeError( - "working_dir must be a string (either a local path or remote " - f"URI), got {type(working_dir)}.") + "working_dir must be a string or Path (either a local path " + f"or remote URI), got {type(working_dir)}.") + + if isinstance(working_dir, Path): + working_dir = str(working_dir) # working_dir is already a URI -- just pass it through. try: diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index fb46271d93bd..df7fca226bf5 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -8,7 +8,7 @@ import requests from ray import serve -from ray.tests.test_runtime_env_working_dir import tmp_working_dir # noqa: F401, E501 +from ray.tests.conftest import tmp_working_dir # noqa: F401, E501 @pytest.fixture diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 126939427ae4..148b830c74bb 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -249,7 +249,8 @@ py_test_module_list( files = [ "test_runtime_env.py", "test_runtime_env_working_dir.py", - "test_runtime_env_working_dir_2.py" + "test_runtime_env_working_dir_2.py", + "test_runtime_env_working_dir_remote_uri.py" ], size = "large", extra_srcs = SRCS, diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 843534d044f3..063762771170 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -4,9 +4,11 @@ import os from contextlib import contextmanager import pytest +import tempfile import subprocess import json import time +from pathlib import Path import ray from ray.cluster_utils import (Cluster, AutoscalingCluster, @@ -271,6 +273,30 @@ def start_cluster(ray_start_cluster, request): yield cluster, address +@pytest.fixture(scope="function") +def tmp_working_dir(): + with tempfile.TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) + + hello_file = path / "hello" + with hello_file.open(mode="w") as f: + f.write("world") + + module_path = path / "test_module" + module_path.mkdir(parents=True) + + test_file = module_path / "test.py" + with test_file.open(mode="w") as f: + f.write("def one():\n") + f.write(" return 1\n") + + init_file = module_path / "__init__.py" + with init_file.open(mode="w") as f: + f.write("from test_module.test import one\n") + + yield tmp_dir + + @pytest.fixture def enable_pickle_debug(): os.environ["RAY_PICKLE_VERBOSE_DEBUG"] = "1" diff --git a/python/ray/tests/test_runtime_env_working_dir.py b/python/ray/tests/test_runtime_env_working_dir.py index 350ec14d0f25..058cb7eb81c1 100644 --- a/python/ray/tests/test_runtime_env_working_dir.py +++ b/python/ray/tests/test_runtime_env_working_dir.py @@ -5,7 +5,6 @@ import tempfile import pytest -from pytest_lazyfixture import lazy_fixture import ray @@ -19,34 +18,12 @@ "test_module/archive/HEAD.zip") S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip" GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip" -REMOTE_URIS = [HTTPS_PACKAGE_URI, S3_PACKAGE_URI] -@pytest.fixture(scope="function") -def tmp_working_dir(): - with tempfile.TemporaryDirectory() as tmp_dir: - path = Path(tmp_dir) - - hello_file = path / "hello" - with hello_file.open(mode="w") as f: - f.write("world") - - module_path = path / "test_module" - module_path.mkdir(parents=True) - - test_file = module_path / "test.py" - with test_file.open(mode="w") as f: - f.write("def one():\n") - f.write(" return 1\n") - - init_file = module_path / "__init__.py" - with init_file.open(mode="w") as f: - f.write("from test_module.test import one\n") - - yield tmp_dir - - -@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"]) +@pytest.mark.parametrize("option", [ + "failure", "working_dir", "py_modules", "py_modules_path", + "working_dir_path" +]) @pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") def test_lazy_reads(start_cluster, tmp_working_dir, option: str): """Tests the case where we lazily read files or import inside a task/actor. @@ -62,12 +39,21 @@ def call_ray_init(): ray.init(address) elif option == "working_dir": ray.init(address, runtime_env={"working_dir": tmp_working_dir}) + elif option == "working_dir_path": + ray.init( + address, runtime_env={"working_dir": Path(tmp_working_dir)}) elif option == "py_modules": ray.init( address, runtime_env={ "py_modules": [str(Path(tmp_working_dir) / "test_module")] }) + elif option == "py_modules_path": + ray.init( + address, + runtime_env={ + "py_modules": [Path(tmp_working_dir) / "test_module"] + }) call_ray_init() @@ -253,93 +239,6 @@ def test_input_validation(start_cluster, option: str): ray.init(address, runtime_env={"py_modules": "."}) -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("remote_uri", REMOTE_URIS) -@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"]) -@pytest.mark.parametrize("per_task_actor", [True, False]) -def test_remote_package_uri(start_cluster, remote_uri, option, per_task_actor): - """Tests the case where we lazily read files or import inside a task/actor. - - In this case, the files come from a remote location. - - This tests both that this fails *without* the working_dir and that it - passes with it. - """ - cluster, address = start_cluster - - if option == "working_dir": - env = {"working_dir": remote_uri} - elif option == "py_modules": - env = {"py_modules": [remote_uri]} - - if option == "failure" or per_task_actor: - ray.init(address) - else: - ray.init(address, runtime_env=env) - - @ray.remote - def test_import(): - import test_module - return test_module.one() - - if option != "failure" and per_task_actor: - test_import = test_import.options(runtime_env=env) - - if option == "failure": - with pytest.raises(ImportError): - ray.get(test_import.remote()) - else: - assert ray.get(test_import.remote()) == 2 - - @ray.remote - class Actor: - def test_import(self): - import test_module - return test_module.one() - - if option != "failure" and per_task_actor: - Actor = Actor.options(runtime_env=env) - - a = Actor.remote() - if option == "failure": - with pytest.raises(ImportError): - assert ray.get(a.test_import.remote()) == 2 - else: - assert ray.get(a.test_import.remote()) == 2 - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("option", ["working_dir", "py_modules"]) -@pytest.mark.parametrize( - "source", [*REMOTE_URIS, lazy_fixture("tmp_working_dir")]) -def test_multi_node(start_cluster, option: str, source: str): - """Tests that the working_dir is propagated across multi-node clusters.""" - NUM_NODES = 3 - cluster, address = start_cluster - for i in range(NUM_NODES - 1): # Head node already added. - cluster.add_node( - num_cpus=1, runtime_env_dir_name=f"node_{i}_runtime_resources") - - if option == "working_dir": - ray.init(address, runtime_env={"working_dir": source}) - elif option == "py_modules": - if source not in REMOTE_URIS: - source = str(Path(source) / "test_module") - ray.init(address, runtime_env={"py_modules": [source]}) - - @ray.remote(num_cpus=1) - class A: - def check_and_get_node_id(self): - import test_module - test_module.one() - return ray.get_runtime_context().node_id - - num_cpus = int(ray.available_resources()["CPU"]) - actors = [A.remote() for _ in range(num_cpus)] - object_refs = [a.check_and_get_node_id.remote() for a in actors] - assert len(set(ray.get(object_refs))) == NUM_NODES - - @pytest.mark.parametrize("option", ["working_dir", "py_modules"]) @pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") def test_exclusion(start_cluster, tmp_working_dir, option): @@ -492,39 +391,6 @@ def get_all(): ] -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize( - "working_dir", - [*REMOTE_URIS, lazy_fixture("tmp_working_dir")]) -def test_runtime_context(start_cluster, working_dir): - """Tests that the working_dir is propagated in the runtime_context.""" - cluster, address = start_cluster - ray.init(runtime_env={"working_dir": working_dir}) - - def check(): - wd = ray.get_runtime_context().runtime_env["working_dir"] - if working_dir in REMOTE_URIS: - assert wd == working_dir - else: - assert wd.startswith("gcs://_ray_pkg_") - - check() - - @ray.remote - def task(): - check() - - ray.get(task.remote()) - - @ray.remote - class Actor: - def check(self): - check() - - a = Actor.remote() - ray.get(a.check.remote()) - - def test_override_failure(shutdown_only): """Tests invalid override behaviors.""" ray.init() diff --git a/python/ray/tests/test_runtime_env_working_dir_2.py b/python/ray/tests/test_runtime_env_working_dir_2.py index c19a07849b42..c8d32c0afc60 100644 --- a/python/ray/tests/test_runtime_env_working_dir_2.py +++ b/python/ray/tests/test_runtime_env_working_dir_2.py @@ -9,8 +9,6 @@ import ray import ray.experimental.internal_kv as kv -from ray.tests.test_runtime_env_working_dir \ - import tmp_working_dir # noqa: F401 from ray._private.test_utils import wait_for_condition, chdir from ray._private.runtime_env import RAY_WORKER_DEV_EXCLUDES from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE diff --git a/python/ray/tests/test_runtime_env_working_dir_remote_uri.py b/python/ray/tests/test_runtime_env_working_dir_remote_uri.py new file mode 100644 index 000000000000..1186e61b967e --- /dev/null +++ b/python/ray/tests/test_runtime_env_working_dir_remote_uri.py @@ -0,0 +1,143 @@ +from pathlib import Path +import sys + +import pytest +from pytest_lazyfixture import lazy_fixture + +import ray + +# This test requires you have AWS credentials set up (any AWS credentials will +# do, this test only accesses a public bucket). + +# This package contains a subdirectory called `test_module`. +# Calling `test_module.one()` should return `2`. +# If you find that confusing, take it up with @jiaodong... +HTTPS_PACKAGE_URI = ("https://github.com/shrekris-anyscale/" + "test_module/archive/HEAD.zip") +S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip" +GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip" +REMOTE_URIS = [HTTPS_PACKAGE_URI, S3_PACKAGE_URI] + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize("remote_uri", REMOTE_URIS) +@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"]) +@pytest.mark.parametrize("per_task_actor", [True, False]) +def test_remote_package_uri(start_cluster, remote_uri, option, per_task_actor): + """Tests the case where we lazily read files or import inside a task/actor. + + In this case, the files come from a remote location. + + This tests both that this fails *without* the working_dir and that it + passes with it. + """ + cluster, address = start_cluster + + if option == "working_dir": + env = {"working_dir": remote_uri} + elif option == "py_modules": + env = {"py_modules": [remote_uri]} + + if option == "failure" or per_task_actor: + ray.init(address) + else: + ray.init(address, runtime_env=env) + + @ray.remote + def test_import(): + import test_module + return test_module.one() + + if option != "failure" and per_task_actor: + test_import = test_import.options(runtime_env=env) + + if option == "failure": + with pytest.raises(ImportError): + ray.get(test_import.remote()) + else: + assert ray.get(test_import.remote()) == 2 + + @ray.remote + class Actor: + def test_import(self): + import test_module + return test_module.one() + + if option != "failure" and per_task_actor: + Actor = Actor.options(runtime_env=env) + + a = Actor.remote() + if option == "failure": + with pytest.raises(ImportError): + assert ray.get(a.test_import.remote()) == 2 + else: + assert ray.get(a.test_import.remote()) == 2 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize("option", ["working_dir", "py_modules"]) +@pytest.mark.parametrize( + "source", [*REMOTE_URIS, lazy_fixture("tmp_working_dir")]) +def test_multi_node(start_cluster, option: str, source: str): + """Tests that the working_dir is propagated across multi-node clusters.""" + NUM_NODES = 3 + cluster, address = start_cluster + for i in range(NUM_NODES - 1): # Head node already added. + cluster.add_node( + num_cpus=1, runtime_env_dir_name=f"node_{i}_runtime_resources") + + if option == "working_dir": + ray.init(address, runtime_env={"working_dir": source}) + elif option == "py_modules": + if source not in REMOTE_URIS: + source = str(Path(source) / "test_module") + ray.init(address, runtime_env={"py_modules": [source]}) + + @ray.remote(num_cpus=1) + class A: + def check_and_get_node_id(self): + import test_module + test_module.one() + return ray.get_runtime_context().node_id + + num_cpus = int(ray.available_resources()["CPU"]) + actors = [A.remote() for _ in range(num_cpus)] + object_refs = [a.check_and_get_node_id.remote() for a in actors] + assert len(set(ray.get(object_refs))) == NUM_NODES + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize( + "working_dir", + [*REMOTE_URIS, lazy_fixture("tmp_working_dir")]) +def test_runtime_context(start_cluster, working_dir): + """Tests that the working_dir is propagated in the runtime_context.""" + cluster, address = start_cluster + ray.init(runtime_env={"working_dir": working_dir}) + + def check(): + wd = ray.get_runtime_context().runtime_env["working_dir"] + if working_dir in REMOTE_URIS: + assert wd == working_dir + else: + assert wd.startswith("gcs://_ray_pkg_") + + check() + + @ray.remote + def task(): + check() + + ray.get(task.remote()) + + @ray.remote + class Actor: + def check(self): + check() + + a = Actor.remote() + ray.get(a.check.remote()) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__]))