Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[runtime env] Add garbage collection for conda envs #20072

Merged
merged 16 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dashboard/modules/runtime_env/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ async def DeleteURIs(self, request, context):
elif plugin == "py_modules":
if not self._py_modules_manager.delete_uri(uri):
failed_uris.append(uri)
elif plugin == "conda":
if not self._conda_manager.delete_uri(uri):
failed_uris.append(uri)
else:
raise ValueError(
"RuntimeEnvAgent received DeleteURI request "
f"for unsupported plugin {plugin}. URI: {uri}")

if failed_uris:
return runtime_env_agent_pb2.DeleteURIsReply(
Expand Down
129 changes: 104 additions & 25 deletions python/ray/_private/runtime_env/conda.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import json
import logging
import yaml
import hashlib
Expand All @@ -8,15 +9,16 @@
import shutil

from filelock import FileLock
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Set
from pathlib import Path

import ray
from ray._private.runtime_env.conda_utils import (get_conda_activate_commands,
get_or_create_conda_env)
from ray._private.runtime_env.conda_utils import (
get_conda_activate_commands, create_conda_env, delete_conda_env)
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url, try_to_create_directory)
from ray._private.runtime_env.packaging import Protocol, parse_uri

default_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,7 +67,7 @@ def _current_py_version():
return ".".join(map(str, sys.version_info[:3])) # like 3.6.10


def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]:
def get_conda_dict(runtime_env, resources_dir) -> Optional[Dict[Any, Any]]:
""" Construct a conda dependencies dict from a runtime env.

This function does not inject Ray or Python into the conda dict.
Expand All @@ -85,18 +87,17 @@ def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]:
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
pip_hash_str = f"pip-generated-{pip_hash}"

conda_dir = os.path.join(runtime_env_dir, "conda")
requirements_txt_path = os.path.join(
conda_dir, f"requirements-{pip_hash_str}.txt")
resources_dir, f"requirements-{pip_hash_str}.txt")
Comment on lines -90 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we clean this file up btw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no, but we should

conda_dict = {
"name": pip_hash_str,
"dependencies": ["pip", {
"pip": [f"-r {requirements_txt_path}"]
}]
}
file_lock_name = f"ray-{pip_hash_str}.lock"
with FileLock(os.path.join(runtime_env_dir, file_lock_name)):
try_to_create_directory(conda_dir)
with FileLock(os.path.join(resources_dir, file_lock_name)):
try_to_create_directory(resources_dir)
with open(requirements_txt_path, "w") as file:
file.write(requirements_txt)
return conda_dict
Expand Down Expand Up @@ -189,23 +190,97 @@ def inject_dependencies(
return conda_dict


def _get_conda_env_hash(conda_dict: Dict) -> str:
# Set `sort_keys=True` so that different orderings yield the same hash.
serialized_conda_spec = json.dumps(conda_dict, sort_keys=True)
hash = hashlib.sha1(serialized_conda_spec.encode("utf-8")).hexdigest()
return hash


def _get_pip_hash(pip_list: List[str]) -> str:
serialized_pip_spec = json.dumps(pip_list)
hash = hashlib.sha1(serialized_pip_spec.encode("utf-8")).hexdigest()
return hash


def get_uri(runtime_env: Dict) -> Optional[str]:
"""Return `"conda://<hashed_dependencies>"`, or None if no GC required."""
conda = runtime_env.get("conda")
pip = runtime_env.get("pip")
if conda is not None:
if isinstance(conda, str):
# User-preinstalled conda env. We don't garbage collect these, so
# we don't track them with URIs.
uri = None
elif isinstance(conda, dict):
uri = "conda://" + _get_conda_env_hash(conda_dict=conda)
else:
raise TypeError("conda field received by RuntimeEnvAgent must be "
f"str or dict, not {type(conda).__name__}.")
elif pip is not None:
if isinstance(pip, list):
uri = "conda://" + _get_pip_hash(pip_list=pip)
else:
raise TypeError("pip field received by RuntimeEnvAgent must be "
f"list, not {type(pip).__name__}.")
else:
uri = None
return uri


class CondaManager:
def __init__(self, resources_dir: str):
self._resources_dir = resources_dir
self._resources_dir = os.path.join(resources_dir, "conda")
if not os.path.isdir(self._resources_dir):
os.makedirs(self._resources_dir)
self._created_envs: Set[str] = set()

def _get_path_from_hash(self, hash: str) -> str:
"""Generate a path from the hash of a conda or pip spec.

The output path also functions as the name of the conda environment
when using the `--prefix` option to `conda create` and `conda remove`.

Example output:
/tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
/conda/ray-9a7972c3a75f55e976e620484f58410c920db091
"""
return os.path.join(self._resources_dir, hash)

def delete_uri(self,
uri: str,
logger: Optional[logging.Logger] = default_logger) -> bool:
logger.error(f"Got request to delete URI {uri}")
protocol, hash = parse_uri(uri)
if protocol != Protocol.CONDA:
raise ValueError(
"CondaManager can only delete URIs with protocol "
f"conda. Received protocol {protocol}, URI {uri}")

conda_env_path = self._get_path_from_hash(hash)
self._created_envs.remove(conda_env_path)
successful = delete_conda_env(prefix=conda_env_path, logger=logger)
if not successful:
logger.debug(f"Error when deleting conda env {conda_env_path}. ")
return successful

def setup(self,
runtime_env: dict,
runtime_env: Dict,
context: RuntimeEnvContext,
logger: Optional[logging.Logger] = default_logger):
if not runtime_env.get("conda") and not runtime_env.get("pip"):
return

logger.debug(f"Setting up conda or pip for runtime_env: {runtime_env}")
conda_dict = get_conda_dict(runtime_env, self._resources_dir)

if isinstance(runtime_env.get("conda"), str):
conda_env_name = runtime_env["conda"]
else:
conda_dict = get_conda_dict(runtime_env, self._resources_dir)
protocol, hash = parse_uri(get_uri(runtime_env))
conda_env_name = self._get_path_from_hash(hash)
assert conda_dict is not None

ray_pip = current_ray_pip_specifier(logger=logger)
if ray_pip:
extra_pip_dependencies = [ray_pip, "ray[default]"]
Expand All @@ -216,27 +291,31 @@ def setup(self,
extra_pip_dependencies = []
conda_dict = inject_dependencies(conda_dict, _current_py_version(),
extra_pip_dependencies)

logger.info(f"Setting up conda environment with {runtime_env}")
# It is not safe for multiple processes to install conda envs
# concurrently, even if the envs are different, so use a global
# lock for all conda installs.
# See https://github.com/ray-project/ray/issues/17086
file_lock_name = "ray-conda-install.lock"
with FileLock(os.path.join(self._resources_dir, file_lock_name)):
conda_dir = os.path.join(self._resources_dir, "conda")
try_to_create_directory(conda_dir)
conda_yaml_path = os.path.join(conda_dir, "environment.yml")
with open(conda_yaml_path, "w") as file:
# Sort keys because we hash based on the file contents,
# and we don't want the hash to depend on the order
# of the dependencies.
yaml.dump(conda_dict, file, sort_keys=True)
conda_env_name = get_or_create_conda_env(
conda_yaml_path, conda_dir, logger=logger)

if runtime_env.get("_inject_current_ray"):
conda_path = os.path.join(conda_dir, conda_env_name)
_inject_ray_to_conda_site(conda_path, logger=logger)
conda_yaml_file = os.path.join(self._resources_dir,
"environment.yml")
with open(conda_yaml_file, "w") as file:
yaml.dump(conda_dict, file)

if conda_env_name in self._created_envs:
logger.debug(f"Conda env {conda_env_name} already "
"created, skipping creation.")
else:
create_conda_env(
conda_yaml_file, prefix=conda_env_name, logger=logger)
self._created_envs.add(conda_env_name)
os.remove(conda_yaml_file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general we should have these cleanups in a finally block so they run even if the creation fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, makes sense


if runtime_env.get("_inject_current_ray"):
_inject_ray_to_conda_site(
conda_path=conda_env_name, logger=logger)

context.py_executable = "python"
context.command_prefix += get_conda_activate_commands(conda_env_name)
Expand Down
72 changes: 35 additions & 37 deletions python/ray/_private/runtime_env/conda_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,24 @@ def get_conda_bin_executable(executable_name: str) -> str:


def _get_conda_env_name(conda_env_path: str) -> str:
conda_env_contents = open(conda_env_path).read() if conda_env_path else ""
conda_env_contents = open(conda_env_path).read()
return "ray-%s" % hashlib.sha1(
conda_env_contents.encode("utf-8")).hexdigest()


def get_or_create_conda_env(conda_env_path: str,
base_dir: Optional[str] = None,
logger: Optional[logging.Logger] = None) -> str:
def create_conda_env(conda_yaml_file: str,
prefix: str,
logger: Optional[logging.Logger] = None) -> None:
"""
Given a conda YAML, creates a conda environment containing the required
dependencies if such a conda environment doesn't already exist. Returns the
name of the conda environment, which is based on a hash of the YAML.
Given a conda YAML file and a path, creates a conda environment containing
the required dependencies.

Args:
conda_env_path: Path to a conda environment YAML file.
base_dir (str, optional): Directory to install the environment into via
the --prefix option to conda create. If not specified, will
install into the default conda directory (e.g. ~/anaconda3/envs)
Returns:
The name of the env, or the path to the env if base_dir is specified.
In either case, the return value should be valid to pass in to
`conda activate`.
conda_yaml_file (str): The path to a conda `environment.yml` file.
prefix (str): Directory to install the environment into via
the `--prefix` option to conda create. This also becomes the name
of the conda env; i.e. it can be passed into `conda activate` and
`conda remove`.
"""
if logger is None:
logger = logging.getLogger(__name__)
Expand All @@ -95,35 +91,37 @@ def get_or_create_conda_env(conda_env_path: str,
"You can also configure Ray to look for a specific "
f"Conda executable by setting the {RAY_CONDA_HOME} "
"environment variable to the path of the Conda executable.")
_, stdout, _ = exec_cmd([conda_path, "env", "list", "--json"])
envs = json.loads(stdout)["envs"]

create_cmd = None
env_name = _get_conda_env_name(conda_env_path)
if base_dir:
env_name = f"{base_dir}/{env_name}"
if env_name not in envs:
create_cmd = [
conda_path, "env", "create", "--file", conda_env_path,
"--prefix", env_name
]
else:
env_names = [os.path.basename(env) for env in envs]
if env_name not in env_names:
create_cmd = [
conda_path, "env", "create", "-n", env_name, "--file",
conda_env_path
]
create_cmd = [
conda_path, "env", "create", "--file", conda_yaml_file, "--prefix",
prefix
]

if create_cmd is not None:
logger.info(f"Creating conda environment {env_name}")
logger.info(f"Creating conda environment {prefix}")
exit_code, output = exec_cmd_stream_to_logger(create_cmd, logger)
if exit_code != 0:
shutil.rmtree(env_name)
shutil.rmtree(prefix)
raise RuntimeError(
f"Failed to install conda environment:\n{output}")
f"Failed to install conda environment {prefix}:\n{output}")


def delete_conda_env(prefix: str,
logger: Optional[logging.Logger] = None) -> bool:
if logger is None:
logger = logging.getLogger(__name__)

logger.info(f"Deleting conda environment {prefix}")

conda_path = get_conda_bin_executable("conda")
delete_cmd = [conda_path, "remove", "-p", prefix, "--all", "-y"]
exit_code, output = exec_cmd_stream_to_logger(delete_cmd, logger)
Comment on lines +117 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, could we also just do rm -f the directory? or this there some special cleanup that this does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the source https://github.com/conda/conda/blob/master/conda/cli/main_remove.py and yeah, it looks like the main ingredient is rm rf. It looks like it edits some metadata too though, but not sure how important that is


if exit_code != 0:
logger.debug(f"Failed to delete conda environment {prefix}:\n{output}")
return False

return env_name
return True


def get_conda_env_list() -> list:
Expand Down
1 change: 1 addition & 0 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __new__(cls, value, doc=None):

GCS = "gcs", "For packages dynamically uploaded and managed by the GCS."
S3 = "s3", "Remote s3 path, assumes everything packed in one zip file."
CONDA = "conda", "For conda environments installed locally on each node."


def _xor_bytes(left: bytes, right: bytes) -> bytes:
Expand Down
8 changes: 7 additions & 1 deletion python/ray/_private/runtime_env/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ray
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray._private.utils import import_attr
from ray._private.runtime_env import conda

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,7 +80,7 @@ def parse_and_validate_conda(conda: Union[str, dict]) -> Union[str, dict]:

Conda can be one of three cases:
1) A dictionary describing the env. This is passed through directly.
2) A string referring to a preinstalled conda env.
2) A string referring to the name of a preinstalled conda env.
3) A string pointing to a local conda YAML file. This is detected
by looking for a '.yaml' or '.yml' suffix. In this case, the file
will be read as YAML and passed through as a dictionary.
Expand Down Expand Up @@ -354,6 +355,11 @@ def get_uris(self) -> List[str]:
if "py_modules" in self:
for uri in self["py_modules"]:
plugin_uris.append(_encode_plugin_uri("py_modules", uri))
if "conda" or "pip" in self:
uri = conda.get_uri(self)
if uri is not None:
plugin_uris.append(_encode_plugin_uri("conda", uri))

return plugin_uris

@classmethod
Expand Down
10 changes: 10 additions & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ py_test_module_list(
deps = ["//:ray_lib"],
)

py_test_module_list(
files = [
"test_runtime_env_conda.py"
],
size = "large",
extra_srcs = SRCS,
tags = ["exclusive", "post_wheel_build", "team:serve"],
deps = ["//:ray_lib"],
)

py_test(
name = "test_actor_group",
size = "medium",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_runtime_env_complicated.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,8 @@ def f():
with pytest.raises(ModuleNotFoundError):
# Ensure pip-install-test is not installed on the test machine
import pip_install_test # noqa
assert ray.get(f_pip.remote())
assert ray.get(f_conda.remote())
assert ray.get(f_pip.remote()), str(runtime_env)
assert ray.get(f_conda.remote()), str(runtime_env)


install_env_script = """
Expand Down
Loading