Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[runtime env] Allow working_dir and py_module to be Path type"" #20853

Merged
merged 10 commits into from
Dec 8, 2021
3 changes: 3 additions & 0 deletions python/ray/_private/runtime_env/py_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from types import ModuleType
from typing import Any, Dict, Optional
from pathlib import Path

from ray.experimental.internal_kv import _internal_kv_initialized
from ray._private.runtime_env.context import RuntimeEnvContext
Expand Down Expand Up @@ -47,6 +48,8 @@ def upload_py_modules_if_needed(
if isinstance(module, str):
# module_path is a local path or a URI.
module_path = module
elif isinstance(module, Path):
module_path = str(module)
elif isinstance(module, ModuleType):
# NOTE(edoakes): Python allows some installed Python packages to
# be split into multiple directories. We could probably handle
Expand Down
10 changes: 7 additions & 3 deletions python/ray/_private/runtime_env/working_dir.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from typing import Any, Dict, Optional
from pathlib import Path

from ray._private.runtime_env.utils import RuntimeEnv
from ray.experimental.internal_kv import _internal_kv_initialized
Expand All @@ -24,10 +25,13 @@ def upload_working_dir_if_needed(
if working_dir is None:
return runtime_env

if not isinstance(working_dir, str):
if not isinstance(working_dir, str) and not isinstance(working_dir, Path):
raise TypeError(
"working_dir must be a string (either a local path or remote "
f"URI), got {type(working_dir)}.")
"working_dir must be a string or Path (either a local path "
f"or remote URI), got {type(working_dir)}.")

if isinstance(working_dir, Path):
working_dir = str(working_dir)

# working_dir is already a URI -- just pass it through.
try:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests

from ray import serve
from ray.tests.test_runtime_env_working_dir import tmp_working_dir # noqa: F401, E501
from ray.tests.conftest import tmp_working_dir # noqa: F401, E501


@pytest.fixture
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ py_test_module_list(
files = [
"test_runtime_env.py",
"test_runtime_env_working_dir.py",
"test_runtime_env_working_dir_2.py"
"test_runtime_env_working_dir_2.py",
"test_runtime_env_working_dir_remote_uri.py"
],
size = "large",
extra_srcs = SRCS,
Expand Down
26 changes: 26 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import os
from contextlib import contextmanager
import pytest
import tempfile
import subprocess
import json
import time
from pathlib import Path

import ray
from ray.cluster_utils import (Cluster, AutoscalingCluster,
Expand Down Expand Up @@ -271,6 +273,30 @@ def start_cluster(ray_start_cluster, request):
yield cluster, address


@pytest.fixture(scope="function")
def tmp_working_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)

hello_file = path / "hello"
with hello_file.open(mode="w") as f:
f.write("world")

module_path = path / "test_module"
module_path.mkdir(parents=True)

test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("def one():\n")
f.write(" return 1\n")

init_file = module_path / "__init__.py"
with init_file.open(mode="w") as f:
f.write("from test_module.test import one\n")

yield tmp_dir


@pytest.fixture
def enable_pickle_debug():
os.environ["RAY_PICKLE_VERBOSE_DEBUG"] = "1"
Expand Down
160 changes: 13 additions & 147 deletions python/ray/tests/test_runtime_env_working_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import tempfile

import pytest
from pytest_lazyfixture import lazy_fixture

import ray

Expand All @@ -19,34 +18,12 @@
"test_module/archive/HEAD.zip")
S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip"
GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip"
REMOTE_URIS = [HTTPS_PACKAGE_URI, S3_PACKAGE_URI]


@pytest.fixture(scope="function")
def tmp_working_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)

hello_file = path / "hello"
with hello_file.open(mode="w") as f:
f.write("world")

module_path = path / "test_module"
module_path.mkdir(parents=True)

test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("def one():\n")
f.write(" return 1\n")

init_file = module_path / "__init__.py"
with init_file.open(mode="w") as f:
f.write("from test_module.test import one\n")

yield tmp_dir


@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"])
@pytest.mark.parametrize("option", [
"failure", "working_dir", "py_modules", "py_modules_path",
"working_dir_path"
])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
"""Tests the case where we lazily read files or import inside a task/actor.
Expand All @@ -62,12 +39,21 @@ def call_ray_init():
ray.init(address)
elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "working_dir_path":
ray.init(
address, runtime_env={"working_dir": Path(tmp_working_dir)})
elif option == "py_modules":
ray.init(
address,
runtime_env={
"py_modules": [str(Path(tmp_working_dir) / "test_module")]
})
elif option == "py_modules_path":
ray.init(
address,
runtime_env={
"py_modules": [Path(tmp_working_dir) / "test_module"]
})

call_ray_init()

Expand Down Expand Up @@ -253,93 +239,6 @@ def test_input_validation(start_cluster, option: str):
ray.init(address, runtime_env={"py_modules": "."})


@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("remote_uri", REMOTE_URIS)
@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"])
@pytest.mark.parametrize("per_task_actor", [True, False])
def test_remote_package_uri(start_cluster, remote_uri, option, per_task_actor):
"""Tests the case where we lazily read files or import inside a task/actor.

In this case, the files come from a remote location.

This tests both that this fails *without* the working_dir and that it
passes with it.
"""
cluster, address = start_cluster

if option == "working_dir":
env = {"working_dir": remote_uri}
elif option == "py_modules":
env = {"py_modules": [remote_uri]}

if option == "failure" or per_task_actor:
ray.init(address)
else:
ray.init(address, runtime_env=env)

@ray.remote
def test_import():
import test_module
return test_module.one()

if option != "failure" and per_task_actor:
test_import = test_import.options(runtime_env=env)

if option == "failure":
with pytest.raises(ImportError):
ray.get(test_import.remote())
else:
assert ray.get(test_import.remote()) == 2

@ray.remote
class Actor:
def test_import(self):
import test_module
return test_module.one()

if option != "failure" and per_task_actor:
Actor = Actor.options(runtime_env=env)

a = Actor.remote()
if option == "failure":
with pytest.raises(ImportError):
assert ray.get(a.test_import.remote()) == 2
else:
assert ray.get(a.test_import.remote()) == 2


@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("option", ["working_dir", "py_modules"])
@pytest.mark.parametrize(
"source", [*REMOTE_URIS, lazy_fixture("tmp_working_dir")])
def test_multi_node(start_cluster, option: str, source: str):
"""Tests that the working_dir is propagated across multi-node clusters."""
NUM_NODES = 3
cluster, address = start_cluster
for i in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(
num_cpus=1, runtime_env_dir_name=f"node_{i}_runtime_resources")

if option == "working_dir":
ray.init(address, runtime_env={"working_dir": source})
elif option == "py_modules":
if source not in REMOTE_URIS:
source = str(Path(source) / "test_module")
ray.init(address, runtime_env={"py_modules": [source]})

@ray.remote(num_cpus=1)
class A:
def check_and_get_node_id(self):
import test_module
test_module.one()
return ray.get_runtime_context().node_id

num_cpus = int(ray.available_resources()["CPU"])
actors = [A.remote() for _ in range(num_cpus)]
object_refs = [a.check_and_get_node_id.remote() for a in actors]
assert len(set(ray.get(object_refs))) == NUM_NODES


@pytest.mark.parametrize("option", ["working_dir", "py_modules"])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_exclusion(start_cluster, tmp_working_dir, option):
Expand Down Expand Up @@ -492,39 +391,6 @@ def get_all():
]


@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[*REMOTE_URIS, lazy_fixture("tmp_working_dir")])
def test_runtime_context(start_cluster, working_dir):
"""Tests that the working_dir is propagated in the runtime_context."""
cluster, address = start_cluster
ray.init(runtime_env={"working_dir": working_dir})

def check():
wd = ray.get_runtime_context().runtime_env["working_dir"]
if working_dir in REMOTE_URIS:
assert wd == working_dir
else:
assert wd.startswith("gcs://_ray_pkg_")

check()

@ray.remote
def task():
check()

ray.get(task.remote())

@ray.remote
class Actor:
def check(self):
check()

a = Actor.remote()
ray.get(a.check.remote())


def test_override_failure(shutdown_only):
"""Tests invalid override behaviors."""
ray.init()
Expand Down
2 changes: 0 additions & 2 deletions python/ray/tests/test_runtime_env_working_dir_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import ray
import ray.experimental.internal_kv as kv
from ray.tests.test_runtime_env_working_dir \
import tmp_working_dir # noqa: F401
from ray._private.test_utils import wait_for_condition, chdir
from ray._private.runtime_env import RAY_WORKER_DEV_EXCLUDES
from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE
Expand Down
Loading