Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] Add annotation for Tune module. #27060

Merged
merged 15 commits into from
Jul 27, 2022
1 change: 1 addition & 0 deletions ci/lint/check_api_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def verify(symbol, scanned, ok, output, prefix=None, ignore=None):
verify(ray.rllib, set(), ok, output)
verify(ray.air, set(), ok, output)
verify(ray.train, set(), ok, output)
verify(ray.tune, set(), ok, output)
verify(ray, set(), ok, output, ignore=["ray.workflow", "ray.tune", "ray.serve"])
verify(ray.serve, set(), ok, output)
assert len(ok) >= 500, len(ok)
Expand Down
3 changes: 0 additions & 3 deletions doc/source/tune/api_docs/env.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ Environment variables
Some of Ray Tune's behavior can be configured using environment variables.
These are the environment variables Ray Tune currently considers:

* **TUNE_CLUSTER_SSH_KEY**: SSH key used by the Tune driver process to connect
to remote cluster machines for checkpoint syncing. If this is not set,
``~/ray_bootstrap_key.pem`` will be used.
* **TUNE_DISABLE_AUTO_CALLBACK_LOGGERS**: Ray Tune automatically adds a CSV and
JSON logger callback if they haven't been passed. Setting this variable to
`1` disables this automatic creation. Please note that this will most likely
Expand Down
10 changes: 5 additions & 5 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
)
from ray.tune.experiment import Trial
from ray.tune.execution.trial_runner import (
find_newest_experiment_checkpoint,
load_trial_from_checkpoint,
_find_newest_experiment_checkpoint,
_load_trial_from_checkpoint,
)
from ray.tune.trainable.util import TrainableUtil
from ray.tune.utils.util import unflattened_lookup
Expand Down Expand Up @@ -154,7 +154,7 @@ def _load_checkpoints_from_latest(self, latest_checkpoint: List[str]) -> None:
def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]:
# Case 1: Dir specified, find latest checkpoint.
if experiment_checkpoint_path.is_dir():
latest_checkpoint = find_newest_experiment_checkpoint(
latest_checkpoint = _find_newest_experiment_checkpoint(
str(experiment_checkpoint_path)
)
# If no checkpoint in this folder the sub-directory is searched.
Expand All @@ -165,7 +165,7 @@ def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]:
latest_checkpoint = []
for fname in experiment_checkpoint_path.iterdir():
fname = experiment_checkpoint_path.joinpath(fname)
latest_checkpoint_subdir = find_newest_experiment_checkpoint(
latest_checkpoint_subdir = _find_newest_experiment_checkpoint(
str(fname)
)
if latest_checkpoint_subdir:
Expand Down Expand Up @@ -801,7 +801,7 @@ def _get_trial_paths(self) -> List[str]:
_trial_paths = []
for checkpoint, path in self._checkpoints_and_paths:
try:
trial = load_trial_from_checkpoint(
trial = _load_trial_from_checkpoint(
checkpoint, stub=True, local_dir=str(path)
)
except Exception:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/tune/automl/search_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from ray.tune.experiment import Trial
from ray.tune.search import SearchAlgorithm
from ray.tune.experiment import convert_to_experiment_list
from ray.tune.experiment import _convert_to_experiment_list
from ray.tune.search.variant_generator import generate_variants
from ray.tune.experiment.config_parser import make_parser, create_trial_from_spec
from ray.tune.experiment.config_parser import _make_parser, _create_trial_from_spec

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self, search_space, reward_attr):
self.experiment_list = []
self.best_trial = None
self._is_finished = False
self._parser = make_parser()
self._parser = _make_parser()
self._unfinished_count = 0
self._running_trials = {}
self._completed_trials = {}
Expand All @@ -66,7 +66,7 @@ def __init__(self, search_space, reward_attr):
self._start_ts = 0

def add_configurations(self, experiments):
self.experiment_list = convert_to_experiment_list(experiments)
self.experiment_list = _convert_to_experiment_list(experiments)

def get_best_trial(self):
"""Returns the Trial object with the best reward_attr"""
Expand Down Expand Up @@ -107,7 +107,7 @@ def _generate_next_trials(self):
tag += "%s=%s-" % (path.split(".")[-1], value)
deep_insert(path.split("."), value, new_spec["config"])

trial = create_trial_from_spec(
trial = _create_trial_from_spec(
new_spec, exp.dir_name, self._parser, experiment_tag=tag
)

Expand Down
23 changes: 1 addition & 22 deletions python/ray/tune/execution/cluster_info.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,12 @@
from functools import lru_cache
import getpass
import os


@lru_cache()
def is_ray_cluster():
def _is_ray_cluster():
"""Checks if the bootstrap config file exists.

This will always exist if using an autoscaling cluster/started
with the ray cluster launcher.
"""
return os.path.exists(os.path.expanduser("~/ray_bootstrap_config.yaml"))


def get_ssh_user():
"""Returns ssh username for connecting to cluster workers."""

return getpass.getuser()


def get_ssh_key():
"""Returns ssh key to connecting to cluster workers.

If the env var TUNE_CLUSTER_SSH_KEY is provided, then this key
will be used for syncing across different nodes.
"""
path = os.environ.get(
"TUNE_CLUSTER_SSH_KEY", os.path.expanduser("~/ray_bootstrap_key.pem")
)
if os.path.exists(path):
return path
return None
8 changes: 4 additions & 4 deletions python/ray/tune/execution/insufficient_resources_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from typing import Dict

from ray.tune.execution.cluster_info import is_ray_cluster
from ray.tune.execution.cluster_info import _is_ray_cluster
from ray.tune.experiment import Trial

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,7 +39,7 @@ def _can_fulfill_no_autoscaler(trial: Trial) -> bool:

@lru_cache()
def _get_insufficient_resources_warning_threshold() -> float:
if is_ray_cluster():
if _is_ray_cluster():
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"
Expand All @@ -65,7 +65,7 @@ def _get_insufficient_resources_warning_msg() -> str:
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
if is_ray_cluster():
if _is_ray_cluster():
return "Ignore this message if the cluster is autoscaling. " + msg
else:
return msg
Expand Down Expand Up @@ -115,7 +115,7 @@ def on_no_available_trials(self, all_trials):
time.monotonic() - self._no_running_trials_since
> _get_insufficient_resources_warning_threshold()
):
if not is_ray_cluster(): # autoscaler not enabled
if not _is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tune/execution/placement_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
_tune_pg_prefix = None


def get_tune_pg_prefix():
def _get_tune_pg_prefix():
"""Get the tune placement group name prefix.

This will store the prefix in a global variable so that subsequent runs
Expand Down Expand Up @@ -260,7 +260,9 @@ def __repr__(self) -> str:
)


@DeveloperAPI
def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]):
"""Translates resource dict into PlacementGroupFactory."""
spec = spec or {"cpu": 1}

if isinstance(spec, Resources):
Expand Down
16 changes: 8 additions & 8 deletions python/ray/tune/execution/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ray.tune.utils import warn_if_slow
from ray.tune.execution.placement_groups import (
_PlacementGroupManager,
get_tune_pg_prefix,
_get_tune_pg_prefix,
)
from ray.tune.utils.resource_updater import _ResourceUpdater
from ray.tune.trainable.util import TrainableUtil
Expand Down Expand Up @@ -85,7 +85,7 @@ def unwrap(self):
return self._result


def post_stop_cleanup(future, pg):
def _post_stop_cleanup(future, pg):
"""Things to be done after a trial is stopped."""
assert isinstance(pg, PlacementGroup)
try:
Expand Down Expand Up @@ -137,7 +137,7 @@ def is_empty(self):
return len(self._future_to_insert_time) == 0


def noop_logger_creator(config, logdir):
def _noop_logger_creator(config, logdir):
# Set the working dir in the remote process, for user file writes
os.makedirs(logdir, exist_ok=True)
if not ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
Expand Down Expand Up @@ -219,7 +219,7 @@ def __init__(
self._reuse_actors = reuse_actors
# The maxlen will be updated when `set_max_pending_trials()` is called
self._cached_actor_pg = deque(maxlen=1)
self._pg_manager = _PlacementGroupManager(prefix=get_tune_pg_prefix())
self._pg_manager = _PlacementGroupManager(prefix=_get_tune_pg_prefix())
self._staged_trials = set()
self._trial_just_finished = False
self._trial_just_finished_before = False
Expand Down Expand Up @@ -323,7 +323,7 @@ def _setup_remote_runner(self, trial):
trial.init_logdir()
# We checkpoint metadata here to try mitigating logdir duplication
self._trials_to_cache.add(trial)
logger_creator = partial(noop_logger_creator, logdir=trial.logdir)
logger_creator = partial(_noop_logger_creator, logdir=trial.logdir)

if len(self._cached_actor_pg) > 0:
assert self._reuse_actors
Expand Down Expand Up @@ -708,7 +708,7 @@ def _do_force_trial_cleanup(self) -> None:
break
if next_future_to_clean in self._futures.keys():
_, pg = self._futures.pop(next_future_to_clean)
post_stop_cleanup(next_future_to_clean, pg)
_post_stop_cleanup(next_future_to_clean, pg)
else:
# This just means that before the deadline reaches,
# the future is already cleaned up.
Expand Down Expand Up @@ -838,7 +838,7 @@ def cleanup(self, trials: List[Trial]) -> None:
continue
event_type, trial_or_pg = self._futures.pop(ready[0])
if event_type == _ExecutorEventType.STOP_RESULT:
post_stop_cleanup(ready[0], trial_or_pg)
_post_stop_cleanup(ready[0], trial_or_pg)

self._pg_manager.reconcile_placement_groups(trials)
self._pg_manager.cleanup(force=True)
Expand Down Expand Up @@ -981,7 +981,7 @@ def get_next_executor_event(
result_type, trial_or_pg = self._futures.pop(ready_future)
if result_type == _ExecutorEventType.STOP_RESULT:
pg = trial_or_pg
post_stop_cleanup(ready_future, pg)
_post_stop_cleanup(ready_future, pg)
else:
trial = trial_or_pg
assert isinstance(trial, Trial)
Expand Down
18 changes: 10 additions & 8 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
logger = logging.getLogger(__name__)


def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
def _find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
"""Returns path to most recently modified checkpoint."""
full_paths = [
os.path.join(ckpt_dir, fname)
Expand All @@ -65,15 +65,15 @@ def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
return max(full_paths)


def load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs):
def _load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs):
new_trial = Trial(
trial_cp["trainable_name"], stub=stub, _setup_default_resource=False, **kwargs
)
new_trial.__setstate__(trial_cp)
return new_trial


def load_trials_from_experiment_checkpoint(
def _load_trials_from_experiment_checkpoint(
experiment_checkpoint: Mapping[str, Any], stub: bool = False
) -> List[Trial]:
"""Create trial objects from experiment checkpoint.
Expand All @@ -87,7 +87,7 @@ def load_trials_from_experiment_checkpoint(

trials = []
for trial_cp in checkpoints:
trials.append(load_trial_from_checkpoint(trial_cp, stub=stub))
trials.append(_load_trial_from_checkpoint(trial_cp, stub=stub))

return trials

Expand Down Expand Up @@ -713,7 +713,9 @@ def resume(
Requires user to manually re-register their objects. Also stops
all ongoing trials.
"""
newest_ckpt_path = find_newest_experiment_checkpoint(self._local_checkpoint_dir)
newest_ckpt_path = _find_newest_experiment_checkpoint(
self._local_checkpoint_dir
)

if not newest_ckpt_path:
raise ValueError(
Expand Down Expand Up @@ -742,7 +744,7 @@ def resume(
if self._search_alg.has_checkpoint(self._local_checkpoint_dir):
self._search_alg.restore_from_dir(self._local_checkpoint_dir)

trials = load_trials_from_experiment_checkpoint(runner_state)
trials = _load_trials_from_experiment_checkpoint(runner_state)
for trial in sorted(trials, key=lambda t: t.last_update_time, reverse=True):
trial_to_add = trial
if trial.status == Trial.ERROR:
Expand Down Expand Up @@ -1011,14 +1013,14 @@ def add_trial(self, trial: Trial):
self.trial_executor.mark_trial_to_checkpoint(trial)

def debug_string(self, delim="\n"):
from ray.tune.progress_reporter import trial_progress_str
from ray.tune.progress_reporter import _trial_progress_str

result_keys = [list(t.last_result) for t in self.get_trials() if t.last_result]
metrics = set().union(*result_keys)
messages = [
self._scheduler_alg.debug_string(),
self.trial_executor.debug_string(),
trial_progress_str(self.get_trials(), metrics, force_table=True),
_trial_progress_str(self.get_trials(), metrics, force_table=True),
]
return delim.join(messages)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/experiment/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ray.tune.experiment.experiment import Experiment, convert_to_experiment_list
from ray.tune.experiment.experiment import Experiment, _convert_to_experiment_list
from ray.tune.experiment.trial import Trial

__all__ = ["Experiment", "convert_to_experiment_list", "Trial"]
__all__ = ["Experiment", "_convert_to_experiment_list", "Trial"]
8 changes: 4 additions & 4 deletions python/ray/tune/experiment/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ray.tune.utils.util import SafeFallbackEncoder


def make_parser(parser_creator=None, **kwargs):
def _make_parser(parser_creator=None, **kwargs):
"""Returns a base argument parser for the ray.tune tool.

Args:
Expand Down Expand Up @@ -145,7 +145,7 @@ def make_parser(parser_creator=None, **kwargs):
return parser


def to_argv(config):
def _to_argv(config):
"""Converts configuration to a command line argument format."""
argv = []
for k, v in config.items():
Expand All @@ -169,7 +169,7 @@ def to_argv(config):
_cached_pgf = {}


def create_trial_from_spec(
def _create_trial_from_spec(
spec: dict, output_path: str, parser: argparse.ArgumentParser, **trial_kwargs
):
"""Creates a Trial object from parsing the spec.
Expand All @@ -193,7 +193,7 @@ def create_trial_from_spec(
resources = spec.pop("resources_per_trial", None)

try:
args, _ = parser.parse_known_args(to_argv(spec))
args, _ = parser.parse_known_args(_to_argv(spec))
except SystemExit:
raise TuneError("Error parsing args, see above message", spec)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/tune/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.stopper import CombinedStopper, FunctionStopper, Stopper, TimeoutStopper
from ray.tune.syncer import SyncConfig
from ray.tune.utils import date_str, detect_checkpoint_function
from ray.tune.utils import date_str, _detect_checkpoint_function

from ray.util.annotations import DeveloperAPI

Expand Down Expand Up @@ -144,7 +144,7 @@ def __init__(
if (
callable(run)
and not inspect.isclass(run)
and detect_checkpoint_function(run)
and _detect_checkpoint_function(run)
):
if checkpoint_at_end:
raise ValueError(
Expand Down Expand Up @@ -424,7 +424,7 @@ def public_spec(self) -> Dict[str, Any]:
return {k: v for k, v in self.spec.items() if k in self.PUBLIC_KEYS}


def convert_to_experiment_list(experiments: Union[Experiment, List[Experiment], Dict]):
def _convert_to_experiment_list(experiments: Union[Experiment, List[Experiment], Dict]):
"""Produces a list of Experiment objects.

Converts input from dict, single experiment, or list of
Expand Down
Loading