diff --git a/dashboard/modules/runtime_env/runtime_env_agent.py b/dashboard/modules/runtime_env/runtime_env_agent.py index 2ee74085d06c..c28342c6b1ac 100644 --- a/dashboard/modules/runtime_env/runtime_env_agent.py +++ b/dashboard/modules/runtime_env/runtime_env_agent.py @@ -212,6 +212,13 @@ async def DeleteURIs(self, request, context): elif plugin == "py_modules": if not self._py_modules_manager.delete_uri(uri): failed_uris.append(uri) + elif plugin == "conda": + if not self._conda_manager.delete_uri(uri): + failed_uris.append(uri) + else: + raise ValueError( + "RuntimeEnvAgent received DeleteURI request " + f"for unsupported plugin {plugin}. URI: {uri}") if failed_uris: return runtime_env_agent_pb2.DeleteURIsReply( diff --git a/python/ray/_private/runtime_env/conda.py b/python/ray/_private/runtime_env/conda.py index 92bc3d8cb113..3ff5d31f1f2c 100644 --- a/python/ray/_private/runtime_env/conda.py +++ b/python/ray/_private/runtime_env/conda.py @@ -1,5 +1,6 @@ import os import sys +import json import logging import yaml import hashlib @@ -8,15 +9,16 @@ import shutil from filelock import FileLock -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Set from pathlib import Path import ray -from ray._private.runtime_env.conda_utils import (get_conda_activate_commands, - get_or_create_conda_env) +from ray._private.runtime_env.conda_utils import ( + get_conda_activate_commands, create_conda_env, delete_conda_env) from ray._private.runtime_env.context import RuntimeEnvContext from ray._private.utils import (get_wheel_filename, get_master_wheel_url, get_release_wheel_url, try_to_create_directory) +from ray._private.runtime_env.packaging import Protocol, parse_uri default_logger = logging.getLogger(__name__) @@ -65,7 +67,7 @@ def _current_py_version(): return ".".join(map(str, sys.version_info[:3])) # like 3.6.10 -def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]: +def get_conda_dict(runtime_env, resources_dir) -> Optional[Dict[Any, Any]]: """ Construct a conda dependencies dict from a runtime env. This function does not inject Ray or Python into the conda dict. @@ -85,9 +87,8 @@ def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]: pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest() pip_hash_str = f"pip-generated-{pip_hash}" - conda_dir = os.path.join(runtime_env_dir, "conda") requirements_txt_path = os.path.join( - conda_dir, f"requirements-{pip_hash_str}.txt") + resources_dir, f"requirements-{pip_hash_str}.txt") conda_dict = { "name": pip_hash_str, "dependencies": ["pip", { @@ -95,8 +96,8 @@ def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]: }] } file_lock_name = f"ray-{pip_hash_str}.lock" - with FileLock(os.path.join(runtime_env_dir, file_lock_name)): - try_to_create_directory(conda_dir) + with FileLock(os.path.join(resources_dir, file_lock_name)): + try_to_create_directory(resources_dir) with open(requirements_txt_path, "w") as file: file.write(requirements_txt) return conda_dict @@ -189,23 +190,97 @@ def inject_dependencies( return conda_dict +def _get_conda_env_hash(conda_dict: Dict) -> str: + # Set `sort_keys=True` so that different orderings yield the same hash. + serialized_conda_spec = json.dumps(conda_dict, sort_keys=True) + hash = hashlib.sha1(serialized_conda_spec.encode("utf-8")).hexdigest() + return hash + + +def _get_pip_hash(pip_list: List[str]) -> str: + serialized_pip_spec = json.dumps(pip_list) + hash = hashlib.sha1(serialized_pip_spec.encode("utf-8")).hexdigest() + return hash + + +def get_uri(runtime_env: Dict) -> Optional[str]: + """Return `"conda://"`, or None if no GC required.""" + conda = runtime_env.get("conda") + pip = runtime_env.get("pip") + if conda is not None: + if isinstance(conda, str): + # User-preinstalled conda env. We don't garbage collect these, so + # we don't track them with URIs. + uri = None + elif isinstance(conda, dict): + uri = "conda://" + _get_conda_env_hash(conda_dict=conda) + else: + raise TypeError("conda field received by RuntimeEnvAgent must be " + f"str or dict, not {type(conda).__name__}.") + elif pip is not None: + if isinstance(pip, list): + uri = "conda://" + _get_pip_hash(pip_list=pip) + else: + raise TypeError("pip field received by RuntimeEnvAgent must be " + f"list, not {type(pip).__name__}.") + else: + uri = None + return uri + + class CondaManager: def __init__(self, resources_dir: str): - self._resources_dir = resources_dir + self._resources_dir = os.path.join(resources_dir, "conda") + if not os.path.isdir(self._resources_dir): + os.makedirs(self._resources_dir) + self._created_envs: Set[str] = set() + + def _get_path_from_hash(self, hash: str) -> str: + """Generate a path from the hash of a conda or pip spec. + + The output path also functions as the name of the conda environment + when using the `--prefix` option to `conda create` and `conda remove`. + + Example output: + /tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources + /conda/ray-9a7972c3a75f55e976e620484f58410c920db091 + """ + return os.path.join(self._resources_dir, hash) + + def delete_uri(self, + uri: str, + logger: Optional[logging.Logger] = default_logger) -> bool: + logger.error(f"Got request to delete URI {uri}") + protocol, hash = parse_uri(uri) + if protocol != Protocol.CONDA: + raise ValueError( + "CondaManager can only delete URIs with protocol " + f"conda. Received protocol {protocol}, URI {uri}") + + conda_env_path = self._get_path_from_hash(hash) + self._created_envs.remove(conda_env_path) + successful = delete_conda_env(prefix=conda_env_path, logger=logger) + if not successful: + logger.debug(f"Error when deleting conda env {conda_env_path}. ") + return successful def setup(self, - runtime_env: dict, + runtime_env: Dict, context: RuntimeEnvContext, logger: Optional[logging.Logger] = default_logger): if not runtime_env.get("conda") and not runtime_env.get("pip"): return logger.debug(f"Setting up conda or pip for runtime_env: {runtime_env}") - conda_dict = get_conda_dict(runtime_env, self._resources_dir) + if isinstance(runtime_env.get("conda"), str): conda_env_name = runtime_env["conda"] else: + conda_dict = get_conda_dict(runtime_env, self._resources_dir) + protocol, hash = parse_uri(get_uri(runtime_env)) + conda_env_name = self._get_path_from_hash(hash) assert conda_dict is not None + ray_pip = current_ray_pip_specifier(logger=logger) if ray_pip: extra_pip_dependencies = [ray_pip, "ray[default]"] @@ -216,6 +291,7 @@ def setup(self, extra_pip_dependencies = [] conda_dict = inject_dependencies(conda_dict, _current_py_version(), extra_pip_dependencies) + logger.info(f"Setting up conda environment with {runtime_env}") # It is not safe for multiple processes to install conda envs # concurrently, even if the envs are different, so use a global @@ -223,20 +299,23 @@ def setup(self, # See https://github.com/ray-project/ray/issues/17086 file_lock_name = "ray-conda-install.lock" with FileLock(os.path.join(self._resources_dir, file_lock_name)): - conda_dir = os.path.join(self._resources_dir, "conda") - try_to_create_directory(conda_dir) - conda_yaml_path = os.path.join(conda_dir, "environment.yml") - with open(conda_yaml_path, "w") as file: - # Sort keys because we hash based on the file contents, - # and we don't want the hash to depend on the order - # of the dependencies. - yaml.dump(conda_dict, file, sort_keys=True) - conda_env_name = get_or_create_conda_env( - conda_yaml_path, conda_dir, logger=logger) - - if runtime_env.get("_inject_current_ray"): - conda_path = os.path.join(conda_dir, conda_env_name) - _inject_ray_to_conda_site(conda_path, logger=logger) + conda_yaml_file = os.path.join(self._resources_dir, + "environment.yml") + with open(conda_yaml_file, "w") as file: + yaml.dump(conda_dict, file) + + if conda_env_name in self._created_envs: + logger.debug(f"Conda env {conda_env_name} already " + "created, skipping creation.") + else: + create_conda_env( + conda_yaml_file, prefix=conda_env_name, logger=logger) + self._created_envs.add(conda_env_name) + os.remove(conda_yaml_file) + + if runtime_env.get("_inject_current_ray"): + _inject_ray_to_conda_site( + conda_path=conda_env_name, logger=logger) context.py_executable = "python" context.command_prefix += get_conda_activate_commands(conda_env_name) diff --git a/python/ray/_private/runtime_env/conda_utils.py b/python/ray/_private/runtime_env/conda_utils.py index 4b58944e6aa5..23dc45b7bbec 100644 --- a/python/ray/_private/runtime_env/conda_utils.py +++ b/python/ray/_private/runtime_env/conda_utils.py @@ -58,28 +58,24 @@ def get_conda_bin_executable(executable_name: str) -> str: def _get_conda_env_name(conda_env_path: str) -> str: - conda_env_contents = open(conda_env_path).read() if conda_env_path else "" + conda_env_contents = open(conda_env_path).read() return "ray-%s" % hashlib.sha1( conda_env_contents.encode("utf-8")).hexdigest() -def get_or_create_conda_env(conda_env_path: str, - base_dir: Optional[str] = None, - logger: Optional[logging.Logger] = None) -> str: +def create_conda_env(conda_yaml_file: str, + prefix: str, + logger: Optional[logging.Logger] = None) -> None: """ - Given a conda YAML, creates a conda environment containing the required - dependencies if such a conda environment doesn't already exist. Returns the - name of the conda environment, which is based on a hash of the YAML. + Given a conda YAML file and a path, creates a conda environment containing + the required dependencies. Args: - conda_env_path: Path to a conda environment YAML file. - base_dir (str, optional): Directory to install the environment into via - the --prefix option to conda create. If not specified, will - install into the default conda directory (e.g. ~/anaconda3/envs) - Returns: - The name of the env, or the path to the env if base_dir is specified. - In either case, the return value should be valid to pass in to - `conda activate`. + conda_yaml_file (str): The path to a conda `environment.yml` file. + prefix (str): Directory to install the environment into via + the `--prefix` option to conda create. This also becomes the name + of the conda env; i.e. it can be passed into `conda activate` and + `conda remove`. """ if logger is None: logger = logging.getLogger(__name__) @@ -95,35 +91,37 @@ def get_or_create_conda_env(conda_env_path: str, "You can also configure Ray to look for a specific " f"Conda executable by setting the {RAY_CONDA_HOME} " "environment variable to the path of the Conda executable.") - _, stdout, _ = exec_cmd([conda_path, "env", "list", "--json"]) - envs = json.loads(stdout)["envs"] - create_cmd = None - env_name = _get_conda_env_name(conda_env_path) - if base_dir: - env_name = f"{base_dir}/{env_name}" - if env_name not in envs: - create_cmd = [ - conda_path, "env", "create", "--file", conda_env_path, - "--prefix", env_name - ] - else: - env_names = [os.path.basename(env) for env in envs] - if env_name not in env_names: - create_cmd = [ - conda_path, "env", "create", "-n", env_name, "--file", - conda_env_path - ] + create_cmd = [ + conda_path, "env", "create", "--file", conda_yaml_file, "--prefix", + prefix + ] if create_cmd is not None: - logger.info(f"Creating conda environment {env_name}") + logger.info(f"Creating conda environment {prefix}") exit_code, output = exec_cmd_stream_to_logger(create_cmd, logger) if exit_code != 0: - shutil.rmtree(env_name) + shutil.rmtree(prefix) raise RuntimeError( - f"Failed to install conda environment:\n{output}") + f"Failed to install conda environment {prefix}:\n{output}") + + +def delete_conda_env(prefix: str, + logger: Optional[logging.Logger] = None) -> bool: + if logger is None: + logger = logging.getLogger(__name__) + + logger.info(f"Deleting conda environment {prefix}") + + conda_path = get_conda_bin_executable("conda") + delete_cmd = [conda_path, "remove", "-p", prefix, "--all", "-y"] + exit_code, output = exec_cmd_stream_to_logger(delete_cmd, logger) + + if exit_code != 0: + logger.debug(f"Failed to delete conda environment {prefix}:\n{output}") + return False - return env_name + return True def get_conda_env_list() -> list: diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index d88d4fa62963..76000292a219 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -41,6 +41,7 @@ def __new__(cls, value, doc=None): GCS = "gcs", "For packages dynamically uploaded and managed by the GCS." S3 = "s3", "Remote s3 path, assumes everything packed in one zip file." + CONDA = "conda", "For conda environments installed locally on each node." def _xor_bytes(left: bytes, right: bytes) -> bytes: diff --git a/python/ray/_private/runtime_env/validation.py b/python/ray/_private/runtime_env/validation.py index 32c3cfa42dd4..ba3aa25facd0 100644 --- a/python/ray/_private/runtime_env/validation.py +++ b/python/ray/_private/runtime_env/validation.py @@ -10,6 +10,7 @@ import ray from ray._private.runtime_env.plugin import RuntimeEnvPlugin from ray._private.utils import import_attr +from ray._private.runtime_env import conda logger = logging.getLogger(__name__) @@ -79,7 +80,7 @@ def parse_and_validate_conda(conda: Union[str, dict]) -> Union[str, dict]: Conda can be one of three cases: 1) A dictionary describing the env. This is passed through directly. - 2) A string referring to a preinstalled conda env. + 2) A string referring to the name of a preinstalled conda env. 3) A string pointing to a local conda YAML file. This is detected by looking for a '.yaml' or '.yml' suffix. In this case, the file will be read as YAML and passed through as a dictionary. @@ -354,6 +355,11 @@ def get_uris(self) -> List[str]: if "py_modules" in self: for uri in self["py_modules"]: plugin_uris.append(_encode_plugin_uri("py_modules", uri)) + if "conda" or "pip" in self: + uri = conda.get_uri(self) + if uri is not None: + plugin_uris.append(_encode_plugin_uri("conda", uri)) + return plugin_uris @classmethod diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 4db5d7e25ba0..55b6a6983f2e 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -251,6 +251,16 @@ py_test_module_list( deps = ["//:ray_lib"], ) +py_test_module_list( + files = [ + "test_runtime_env_conda.py" + ], + size = "large", + extra_srcs = SRCS, + tags = ["exclusive", "post_wheel_build", "team:serve"], + deps = ["//:ray_lib"], +) + py_test( name = "test_actor_group", size = "medium", diff --git a/python/ray/tests/test_runtime_env_complicated.py b/python/ray/tests/test_runtime_env_complicated.py index a7db638ba874..f3d54b30cdd8 100644 --- a/python/ray/tests/test_runtime_env_complicated.py +++ b/python/ray/tests/test_runtime_env_complicated.py @@ -620,8 +620,8 @@ def f(): with pytest.raises(ModuleNotFoundError): # Ensure pip-install-test is not installed on the test machine import pip_install_test # noqa - assert ray.get(f_pip.remote()) - assert ray.get(f_conda.remote()) + assert ray.get(f_pip.remote()), str(runtime_env) + assert ray.get(f_conda.remote()), str(runtime_env) install_env_script = """ diff --git a/python/ray/tests/test_runtime_env_conda.py b/python/ray/tests/test_runtime_env_conda.py new file mode 100644 index 000000000000..12b1675e930d --- /dev/null +++ b/python/ray/tests/test_runtime_env_conda.py @@ -0,0 +1,140 @@ +import os +import pytest +import sys +from ray._private.test_utils import wait_for_condition +import yaml + +import ray + +if not os.environ.get("CI"): + # This flags turns on the local development that link against current ray + # packages and fall back all the dependencies to current python's site. + os.environ["RAY_RUNTIME_ENV_LOCAL_DEV_MODE"] = "1" + + +@pytest.fixture(scope="function", params=["ray_client", "no_ray_client"]) +def start_cluster(ray_start_cluster, request): + assert request.param in {"ray_client", "no_ray_client"} + use_ray_client: bool = request.param == "ray_client" + + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + if use_ray_client: + cluster.head_node._ray_params.ray_client_server_port = "10004" + cluster.head_node.start_ray_client_server() + address = "ray://localhost:10004" + else: + address = cluster.address + + yield cluster, address + + +def check_local_files_gced(cluster): + for node in cluster.list_all_nodes(): + for subdir in ["conda"]: + all_files = os.listdir( + os.path.join(node.get_runtime_env_dir_path(), subdir)) + # Check that there are no files remaining except for .lock files + # and generated requirements.txt files. + # TODO(architkulkarni): these files should get cleaned up too! + if len( + list( + filter(lambda f: not f.endswith(".lock", ".txt"), + all_files))) > 0: + print(str(all_files)) + return False + + return True + + +def generate_runtime_env_dict(field, spec_format, tmp_path): + if field == "conda": + conda_dict = { + "dependencies": ["pip", { + "pip": ["pip-install-test==0.5"] + }] + } + if spec_format == "file": + conda_file = tmp_path / "environment.yml" + conda_file.write_text(yaml.dump(conda_dict)) + conda = str(conda_file) + elif spec_format == "python_object": + conda = conda_dict + runtime_env = {"conda": conda} + elif field == "pip": + if spec_format == "file": + pip_file = tmp_path / "requirements.txt" + pip_file.write_text("\n".join(["pip-install-test==0.5"])) + pip = str(pip_file) + elif spec_format == "python_object": + pip = ["pip-install-test==0.5"] + runtime_env = {"pip": pip} + return runtime_env + + +@pytest.mark.skipif( + os.environ.get("CI") and sys.platform != "linux", + reason="Requires PR wheels built in CI, so only run on linux CI machines.") +@pytest.mark.parametrize("field", ["conda", "pip"]) +@pytest.mark.parametrize("spec_format", ["file", "python_object"]) +def test_job_level_gc(start_cluster, field, spec_format, tmp_path): + """Tests that job-level conda env is GC'd when the job exits.""" + # We must use a single-node cluster. If we simulate a multi-node cluster + # then the conda installs will proceed simultaneously, one on each node, + # but since they're actually running on the same machine we get errors. + cluster, address = start_cluster + + ray.init( + address, + runtime_env=generate_runtime_env_dict(field, spec_format, tmp_path)) + + @ray.remote + def f(): + import pip_install_test # noqa: F401 + return True + + # Ensure that the runtime env has been installed. + assert ray.get(f.remote()) + + assert not check_local_files_gced(cluster) + + ray.shutdown() + + wait_for_condition(lambda: check_local_files_gced(cluster), timeout=30) + + +@pytest.mark.skipif( + os.environ.get("CI") and sys.platform != "linux", + reason="Requires PR wheels built in CI, so only run on linux CI machines.") +@pytest.mark.parametrize("field", ["conda", "pip"]) +@pytest.mark.parametrize("spec_format", ["file", "python_object"]) +def test_detached_actor_gc(start_cluster, field, spec_format, tmp_path): + """Tests that a detached actor's conda env is GC'd only when it exits.""" + cluster, address = start_cluster + + ray.init( + address, + runtime_env=generate_runtime_env_dict(field, spec_format, tmp_path)) + + @ray.remote + class A: + def test_import(self): + import pip_install_test # noqa: F401 + return True + + a = A.options(name="test", lifetime="detached").remote() + ray.get(a.test_import.remote()) + + assert not check_local_files_gced(cluster) + + ray.shutdown() + ray.init(address, namespace="test") + + assert not check_local_files_gced(cluster) + + a = ray.get_actor("test") + assert ray.get(a.test_import.remote()) + + ray.kill(a) + + wait_for_condition(lambda: check_local_files_gced(cluster), timeout=30) diff --git a/src/ray/common/runtime_env_manager.cc b/src/ray/common/runtime_env_manager.cc index 3e2969455297..f04eb846fb49 100644 --- a/src/ray/common/runtime_env_manager.cc +++ b/src/ray/common/runtime_env_manager.cc @@ -23,6 +23,7 @@ void RuntimeEnvManager::AddURIReference(const std::string &hex_id, for (const auto &uri : uris) { uri_reference_[uri]++; id_to_uris_[hex_id].push_back(uri); + RAY_LOG(DEBUG) << "Added URI Reference " << uri; } } @@ -44,6 +45,7 @@ void RuntimeEnvManager::RemoveURIReference(const std::string &hex_id) { RAY_CHECK(ref_count >= 0); if (ref_count == 0) { uri_reference_.erase(uri); + RAY_LOG(DEBUG) << "Deleting URI Reference " << uri; deleter_(uri, [](bool success) {}); } }