Skip to content

Commit

Permalink
[air] Add annotation for Tune module. (ray-project#27060)
Browse files Browse the repository at this point in the history
Co-authored-by: Kai Fricke <[email protected]>
Signed-off-by: Stefan van der Kleij <[email protected]>
  • Loading branch information
2 people authored and Stefan van der Kleij committed Aug 18, 2022
1 parent 8ba407f commit 5f25a98
Show file tree
Hide file tree
Showing 51 changed files with 267 additions and 273 deletions.
1 change: 1 addition & 0 deletions ci/lint/check_api_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def verify(symbol, scanned, ok, output, prefix=None, ignore=None):
verify(ray.rllib, set(), ok, output)
verify(ray.air, set(), ok, output)
verify(ray.train, set(), ok, output)
verify(ray.tune, set(), ok, output)
verify(ray, set(), ok, output, ignore=["ray.workflow", "ray.tune", "ray.serve"])
verify(ray.serve, set(), ok, output)
assert len(ok) >= 500, len(ok)
Expand Down
3 changes: 0 additions & 3 deletions doc/source/tune/api_docs/env.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ Environment variables
Some of Ray Tune's behavior can be configured using environment variables.
These are the environment variables Ray Tune currently considers:

* **TUNE_CLUSTER_SSH_KEY**: SSH key used by the Tune driver process to connect
to remote cluster machines for checkpoint syncing. If this is not set,
``~/ray_bootstrap_key.pem`` will be used.
* **TUNE_DISABLE_AUTO_CALLBACK_LOGGERS**: Ray Tune automatically adds a CSV and
JSON logger callback if they haven't been passed. Setting this variable to
`1` disables this automatic creation. Please note that this will most likely
Expand Down
10 changes: 5 additions & 5 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
)
from ray.tune.experiment import Trial
from ray.tune.execution.trial_runner import (
find_newest_experiment_checkpoint,
load_trial_from_checkpoint,
_find_newest_experiment_checkpoint,
_load_trial_from_checkpoint,
)
from ray.tune.trainable.util import TrainableUtil
from ray.tune.utils.util import unflattened_lookup
Expand Down Expand Up @@ -154,7 +154,7 @@ def _load_checkpoints_from_latest(self, latest_checkpoint: List[str]) -> None:
def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]:
# Case 1: Dir specified, find latest checkpoint.
if experiment_checkpoint_path.is_dir():
latest_checkpoint = find_newest_experiment_checkpoint(
latest_checkpoint = _find_newest_experiment_checkpoint(
str(experiment_checkpoint_path)
)
# If no checkpoint in this folder the sub-directory is searched.
Expand All @@ -165,7 +165,7 @@ def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]:
latest_checkpoint = []
for fname in experiment_checkpoint_path.iterdir():
fname = experiment_checkpoint_path.joinpath(fname)
latest_checkpoint_subdir = find_newest_experiment_checkpoint(
latest_checkpoint_subdir = _find_newest_experiment_checkpoint(
str(fname)
)
if latest_checkpoint_subdir:
Expand Down Expand Up @@ -801,7 +801,7 @@ def _get_trial_paths(self) -> List[str]:
_trial_paths = []
for checkpoint, path in self._checkpoints_and_paths:
try:
trial = load_trial_from_checkpoint(
trial = _load_trial_from_checkpoint(
checkpoint, stub=True, local_dir=str(path)
)
except Exception:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/tune/automl/search_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from ray.tune.experiment import Trial
from ray.tune.search import SearchAlgorithm
from ray.tune.experiment import convert_to_experiment_list
from ray.tune.experiment import _convert_to_experiment_list
from ray.tune.search.variant_generator import generate_variants
from ray.tune.experiment.config_parser import make_parser, create_trial_from_spec
from ray.tune.experiment.config_parser import _make_parser, _create_trial_from_spec

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self, search_space, reward_attr):
self.experiment_list = []
self.best_trial = None
self._is_finished = False
self._parser = make_parser()
self._parser = _make_parser()
self._unfinished_count = 0
self._running_trials = {}
self._completed_trials = {}
Expand All @@ -66,7 +66,7 @@ def __init__(self, search_space, reward_attr):
self._start_ts = 0

def add_configurations(self, experiments):
self.experiment_list = convert_to_experiment_list(experiments)
self.experiment_list = _convert_to_experiment_list(experiments)

def get_best_trial(self):
"""Returns the Trial object with the best reward_attr"""
Expand Down Expand Up @@ -107,7 +107,7 @@ def _generate_next_trials(self):
tag += "%s=%s-" % (path.split(".")[-1], value)
deep_insert(path.split("."), value, new_spec["config"])

trial = create_trial_from_spec(
trial = _create_trial_from_spec(
new_spec, exp.dir_name, self._parser, experiment_tag=tag
)

Expand Down
23 changes: 1 addition & 22 deletions python/ray/tune/execution/cluster_info.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,12 @@
from functools import lru_cache
import getpass
import os


@lru_cache()
def is_ray_cluster():
def _is_ray_cluster():
"""Checks if the bootstrap config file exists.
This will always exist if using an autoscaling cluster/started
with the ray cluster launcher.
"""
return os.path.exists(os.path.expanduser("~/ray_bootstrap_config.yaml"))


def get_ssh_user():
"""Returns ssh username for connecting to cluster workers."""

return getpass.getuser()


def get_ssh_key():
"""Returns ssh key to connecting to cluster workers.
If the env var TUNE_CLUSTER_SSH_KEY is provided, then this key
will be used for syncing across different nodes.
"""
path = os.environ.get(
"TUNE_CLUSTER_SSH_KEY", os.path.expanduser("~/ray_bootstrap_key.pem")
)
if os.path.exists(path):
return path
return None
8 changes: 4 additions & 4 deletions python/ray/tune/execution/insufficient_resources_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from typing import Dict

from ray.tune.execution.cluster_info import is_ray_cluster
from ray.tune.execution.cluster_info import _is_ray_cluster
from ray.tune.experiment import Trial

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,7 +39,7 @@ def _can_fulfill_no_autoscaler(trial: Trial) -> bool:

@lru_cache()
def _get_insufficient_resources_warning_threshold() -> float:
if is_ray_cluster():
if _is_ray_cluster():
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"
Expand All @@ -65,7 +65,7 @@ def _get_insufficient_resources_warning_msg() -> str:
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
if is_ray_cluster():
if _is_ray_cluster():
return "Ignore this message if the cluster is autoscaling. " + msg
else:
return msg
Expand Down Expand Up @@ -115,7 +115,7 @@ def on_no_available_trials(self, all_trials):
time.monotonic() - self._no_running_trials_since
> _get_insufficient_resources_warning_threshold()
):
if not is_ray_cluster(): # autoscaler not enabled
if not _is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tune/execution/placement_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
_tune_pg_prefix = None


def get_tune_pg_prefix():
def _get_tune_pg_prefix():
"""Get the tune placement group name prefix.
This will store the prefix in a global variable so that subsequent runs
Expand Down Expand Up @@ -260,7 +260,9 @@ def __repr__(self) -> str:
)


@DeveloperAPI
def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]):
"""Translates resource dict into PlacementGroupFactory."""
spec = spec or {"cpu": 1}

if isinstance(spec, Resources):
Expand Down
16 changes: 8 additions & 8 deletions python/ray/tune/execution/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ray.tune.utils import warn_if_slow
from ray.tune.execution.placement_groups import (
_PlacementGroupManager,
get_tune_pg_prefix,
_get_tune_pg_prefix,
)
from ray.tune.utils.resource_updater import _ResourceUpdater
from ray.tune.trainable.util import TrainableUtil
Expand Down Expand Up @@ -85,7 +85,7 @@ def unwrap(self):
return self._result


def post_stop_cleanup(future, pg):
def _post_stop_cleanup(future, pg):
"""Things to be done after a trial is stopped."""
assert isinstance(pg, PlacementGroup)
try:
Expand Down Expand Up @@ -137,7 +137,7 @@ def is_empty(self):
return len(self._future_to_insert_time) == 0


def noop_logger_creator(config, logdir):
def _noop_logger_creator(config, logdir):
# Set the working dir in the remote process, for user file writes
os.makedirs(logdir, exist_ok=True)
if not ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
Expand Down Expand Up @@ -219,7 +219,7 @@ def __init__(
self._reuse_actors = reuse_actors
# The maxlen will be updated when `set_max_pending_trials()` is called
self._cached_actor_pg = deque(maxlen=1)
self._pg_manager = _PlacementGroupManager(prefix=get_tune_pg_prefix())
self._pg_manager = _PlacementGroupManager(prefix=_get_tune_pg_prefix())
self._staged_trials = set()
self._trial_just_finished = False
self._trial_just_finished_before = False
Expand Down Expand Up @@ -323,7 +323,7 @@ def _setup_remote_runner(self, trial):
trial.init_logdir()
# We checkpoint metadata here to try mitigating logdir duplication
self._trials_to_cache.add(trial)
logger_creator = partial(noop_logger_creator, logdir=trial.logdir)
logger_creator = partial(_noop_logger_creator, logdir=trial.logdir)

if len(self._cached_actor_pg) > 0:
assert self._reuse_actors
Expand Down Expand Up @@ -708,7 +708,7 @@ def _do_force_trial_cleanup(self) -> None:
break
if next_future_to_clean in self._futures.keys():
_, pg = self._futures.pop(next_future_to_clean)
post_stop_cleanup(next_future_to_clean, pg)
_post_stop_cleanup(next_future_to_clean, pg)
else:
# This just means that before the deadline reaches,
# the future is already cleaned up.
Expand Down Expand Up @@ -838,7 +838,7 @@ def cleanup(self, trials: List[Trial]) -> None:
continue
event_type, trial_or_pg = self._futures.pop(ready[0])
if event_type == _ExecutorEventType.STOP_RESULT:
post_stop_cleanup(ready[0], trial_or_pg)
_post_stop_cleanup(ready[0], trial_or_pg)

self._pg_manager.reconcile_placement_groups(trials)
self._pg_manager.cleanup(force=True)
Expand Down Expand Up @@ -981,7 +981,7 @@ def get_next_executor_event(
result_type, trial_or_pg = self._futures.pop(ready_future)
if result_type == _ExecutorEventType.STOP_RESULT:
pg = trial_or_pg
post_stop_cleanup(ready_future, pg)
_post_stop_cleanup(ready_future, pg)
else:
trial = trial_or_pg
assert isinstance(trial, Trial)
Expand Down
18 changes: 10 additions & 8 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
logger = logging.getLogger(__name__)


def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
def _find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
"""Returns path to most recently modified checkpoint."""
full_paths = [
os.path.join(ckpt_dir, fname)
Expand All @@ -65,15 +65,15 @@ def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
return max(full_paths)


def load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs):
def _load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs):
new_trial = Trial(
trial_cp["trainable_name"], stub=stub, _setup_default_resource=False, **kwargs
)
new_trial.__setstate__(trial_cp)
return new_trial


def load_trials_from_experiment_checkpoint(
def _load_trials_from_experiment_checkpoint(
experiment_checkpoint: Mapping[str, Any], stub: bool = False
) -> List[Trial]:
"""Create trial objects from experiment checkpoint.
Expand All @@ -87,7 +87,7 @@ def load_trials_from_experiment_checkpoint(

trials = []
for trial_cp in checkpoints:
trials.append(load_trial_from_checkpoint(trial_cp, stub=stub))
trials.append(_load_trial_from_checkpoint(trial_cp, stub=stub))

return trials

Expand Down Expand Up @@ -713,7 +713,9 @@ def resume(
Requires user to manually re-register their objects. Also stops
all ongoing trials.
"""
newest_ckpt_path = find_newest_experiment_checkpoint(self._local_checkpoint_dir)
newest_ckpt_path = _find_newest_experiment_checkpoint(
self._local_checkpoint_dir
)

if not newest_ckpt_path:
raise ValueError(
Expand Down Expand Up @@ -742,7 +744,7 @@ def resume(
if self._search_alg.has_checkpoint(self._local_checkpoint_dir):
self._search_alg.restore_from_dir(self._local_checkpoint_dir)

trials = load_trials_from_experiment_checkpoint(runner_state)
trials = _load_trials_from_experiment_checkpoint(runner_state)
for trial in sorted(trials, key=lambda t: t.last_update_time, reverse=True):
trial_to_add = trial
if trial.status == Trial.ERROR:
Expand Down Expand Up @@ -1011,14 +1013,14 @@ def add_trial(self, trial: Trial):
self.trial_executor.mark_trial_to_checkpoint(trial)

def debug_string(self, delim="\n"):
from ray.tune.progress_reporter import trial_progress_str
from ray.tune.progress_reporter import _trial_progress_str

result_keys = [list(t.last_result) for t in self.get_trials() if t.last_result]
metrics = set().union(*result_keys)
messages = [
self._scheduler_alg.debug_string(),
self.trial_executor.debug_string(),
trial_progress_str(self.get_trials(), metrics, force_table=True),
_trial_progress_str(self.get_trials(), metrics, force_table=True),
]
return delim.join(messages)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/experiment/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ray.tune.experiment.experiment import Experiment, convert_to_experiment_list
from ray.tune.experiment.experiment import Experiment, _convert_to_experiment_list
from ray.tune.experiment.trial import Trial

__all__ = ["Experiment", "convert_to_experiment_list", "Trial"]
__all__ = ["Experiment", "_convert_to_experiment_list", "Trial"]
8 changes: 4 additions & 4 deletions python/ray/tune/experiment/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ray.tune.utils.util import SafeFallbackEncoder


def make_parser(parser_creator=None, **kwargs):
def _make_parser(parser_creator=None, **kwargs):
"""Returns a base argument parser for the ray.tune tool.
Args:
Expand Down Expand Up @@ -145,7 +145,7 @@ def make_parser(parser_creator=None, **kwargs):
return parser


def to_argv(config):
def _to_argv(config):
"""Converts configuration to a command line argument format."""
argv = []
for k, v in config.items():
Expand All @@ -169,7 +169,7 @@ def to_argv(config):
_cached_pgf = {}


def create_trial_from_spec(
def _create_trial_from_spec(
spec: dict, output_path: str, parser: argparse.ArgumentParser, **trial_kwargs
):
"""Creates a Trial object from parsing the spec.
Expand All @@ -193,7 +193,7 @@ def create_trial_from_spec(
resources = spec.pop("resources_per_trial", None)

try:
args, _ = parser.parse_known_args(to_argv(spec))
args, _ = parser.parse_known_args(_to_argv(spec))
except SystemExit:
raise TuneError("Error parsing args, see above message", spec)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/tune/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.stopper import CombinedStopper, FunctionStopper, Stopper, TimeoutStopper
from ray.tune.syncer import SyncConfig
from ray.tune.utils import date_str, detect_checkpoint_function
from ray.tune.utils import date_str, _detect_checkpoint_function

from ray.util.annotations import DeveloperAPI

Expand Down Expand Up @@ -144,7 +144,7 @@ def __init__(
if (
callable(run)
and not inspect.isclass(run)
and detect_checkpoint_function(run)
and _detect_checkpoint_function(run)
):
if checkpoint_at_end:
raise ValueError(
Expand Down Expand Up @@ -424,7 +424,7 @@ def public_spec(self) -> Dict[str, Any]:
return {k: v for k, v in self.spec.items() if k in self.PUBLIC_KEYS}


def convert_to_experiment_list(experiments: Union[Experiment, List[Experiment], Dict]):
def _convert_to_experiment_list(experiments: Union[Experiment, List[Experiment], Dict]):
"""Produces a list of Experiment objects.
Converts input from dict, single experiment, or list of
Expand Down
Loading

0 comments on commit 5f25a98

Please sign in to comment.