diff --git a/ci/lint/check_api_annotations.py b/ci/lint/check_api_annotations.py index 979999b6fd2d..2445afc1a72d 100755 --- a/ci/lint/check_api_annotations.py +++ b/ci/lint/check_api_annotations.py @@ -98,6 +98,7 @@ def verify(symbol, scanned, ok, output, prefix=None, ignore=None): verify(ray.rllib, set(), ok, output) verify(ray.air, set(), ok, output) verify(ray.train, set(), ok, output) + verify(ray.tune, set(), ok, output) verify(ray, set(), ok, output, ignore=["ray.workflow", "ray.tune", "ray.serve"]) verify(ray.serve, set(), ok, output) assert len(ok) >= 500, len(ok) diff --git a/doc/source/tune/api_docs/env.rst b/doc/source/tune/api_docs/env.rst index 05e5a33c971d..4e736537f5d2 100644 --- a/doc/source/tune/api_docs/env.rst +++ b/doc/source/tune/api_docs/env.rst @@ -6,9 +6,6 @@ Environment variables Some of Ray Tune's behavior can be configured using environment variables. These are the environment variables Ray Tune currently considers: -* **TUNE_CLUSTER_SSH_KEY**: SSH key used by the Tune driver process to connect - to remote cluster machines for checkpoint syncing. If this is not set, - ``~/ray_bootstrap_key.pem`` will be used. * **TUNE_DISABLE_AUTO_CALLBACK_LOGGERS**: Ray Tune automatically adds a CSV and JSON logger callback if they haven't been passed. Setting this variable to `1` disables this automatic creation. Please note that this will most likely diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index caeb6f66b793..d82a26c97484 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -31,8 +31,8 @@ ) from ray.tune.experiment import Trial from ray.tune.execution.trial_runner import ( - find_newest_experiment_checkpoint, - load_trial_from_checkpoint, + _find_newest_experiment_checkpoint, + _load_trial_from_checkpoint, ) from ray.tune.trainable.util import TrainableUtil from ray.tune.utils.util import unflattened_lookup @@ -154,7 +154,7 @@ def _load_checkpoints_from_latest(self, latest_checkpoint: List[str]) -> None: def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]: # Case 1: Dir specified, find latest checkpoint. if experiment_checkpoint_path.is_dir(): - latest_checkpoint = find_newest_experiment_checkpoint( + latest_checkpoint = _find_newest_experiment_checkpoint( str(experiment_checkpoint_path) ) # If no checkpoint in this folder the sub-directory is searched. @@ -165,7 +165,7 @@ def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]: latest_checkpoint = [] for fname in experiment_checkpoint_path.iterdir(): fname = experiment_checkpoint_path.joinpath(fname) - latest_checkpoint_subdir = find_newest_experiment_checkpoint( + latest_checkpoint_subdir = _find_newest_experiment_checkpoint( str(fname) ) if latest_checkpoint_subdir: @@ -801,7 +801,7 @@ def _get_trial_paths(self) -> List[str]: _trial_paths = [] for checkpoint, path in self._checkpoints_and_paths: try: - trial = load_trial_from_checkpoint( + trial = _load_trial_from_checkpoint( checkpoint, stub=True, local_dir=str(path) ) except Exception: diff --git a/python/ray/tune/automl/search_policy.py b/python/ray/tune/automl/search_policy.py index 942ad36ff8eb..deb46da332ef 100644 --- a/python/ray/tune/automl/search_policy.py +++ b/python/ray/tune/automl/search_policy.py @@ -4,9 +4,9 @@ from ray.tune.experiment import Trial from ray.tune.search import SearchAlgorithm -from ray.tune.experiment import convert_to_experiment_list +from ray.tune.experiment import _convert_to_experiment_list from ray.tune.search.variant_generator import generate_variants -from ray.tune.experiment.config_parser import make_parser, create_trial_from_spec +from ray.tune.experiment.config_parser import _make_parser, _create_trial_from_spec logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ def __init__(self, search_space, reward_attr): self.experiment_list = [] self.best_trial = None self._is_finished = False - self._parser = make_parser() + self._parser = _make_parser() self._unfinished_count = 0 self._running_trials = {} self._completed_trials = {} @@ -66,7 +66,7 @@ def __init__(self, search_space, reward_attr): self._start_ts = 0 def add_configurations(self, experiments): - self.experiment_list = convert_to_experiment_list(experiments) + self.experiment_list = _convert_to_experiment_list(experiments) def get_best_trial(self): """Returns the Trial object with the best reward_attr""" @@ -107,7 +107,7 @@ def _generate_next_trials(self): tag += "%s=%s-" % (path.split(".")[-1], value) deep_insert(path.split("."), value, new_spec["config"]) - trial = create_trial_from_spec( + trial = _create_trial_from_spec( new_spec, exp.dir_name, self._parser, experiment_tag=tag ) diff --git a/python/ray/tune/execution/cluster_info.py b/python/ray/tune/execution/cluster_info.py index 476470c0cb50..54fe636b0763 100644 --- a/python/ray/tune/execution/cluster_info.py +++ b/python/ray/tune/execution/cluster_info.py @@ -1,33 +1,12 @@ from functools import lru_cache -import getpass import os @lru_cache() -def is_ray_cluster(): +def _is_ray_cluster(): """Checks if the bootstrap config file exists. This will always exist if using an autoscaling cluster/started with the ray cluster launcher. """ return os.path.exists(os.path.expanduser("~/ray_bootstrap_config.yaml")) - - -def get_ssh_user(): - """Returns ssh username for connecting to cluster workers.""" - - return getpass.getuser() - - -def get_ssh_key(): - """Returns ssh key to connecting to cluster workers. - - If the env var TUNE_CLUSTER_SSH_KEY is provided, then this key - will be used for syncing across different nodes. - """ - path = os.environ.get( - "TUNE_CLUSTER_SSH_KEY", os.path.expanduser("~/ray_bootstrap_key.pem") - ) - if os.path.exists(path): - return path - return None diff --git a/python/ray/tune/execution/insufficient_resources_manager.py b/python/ray/tune/execution/insufficient_resources_manager.py index 1dd8d94f54cb..cc255ed9b267 100644 --- a/python/ray/tune/execution/insufficient_resources_manager.py +++ b/python/ray/tune/execution/insufficient_resources_manager.py @@ -5,7 +5,7 @@ import time from typing import Dict -from ray.tune.execution.cluster_info import is_ray_cluster +from ray.tune.execution.cluster_info import _is_ray_cluster from ray.tune.experiment import Trial logger = logging.getLogger(__name__) @@ -39,7 +39,7 @@ def _can_fulfill_no_autoscaler(trial: Trial) -> bool: @lru_cache() def _get_insufficient_resources_warning_threshold() -> float: - if is_ray_cluster(): + if _is_ray_cluster(): return float( os.environ.get( "TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60" @@ -65,7 +65,7 @@ def _get_insufficient_resources_warning_msg() -> str: f"(possibly via `resources_per_trial` or via `num_workers` for rllib) " f"and/or add more resources to your Ray runtime." ) - if is_ray_cluster(): + if _is_ray_cluster(): return "Ignore this message if the cluster is autoscaling. " + msg else: return msg @@ -115,7 +115,7 @@ def on_no_available_trials(self, all_trials): time.monotonic() - self._no_running_trials_since > _get_insufficient_resources_warning_threshold() ): - if not is_ray_cluster(): # autoscaler not enabled + if not _is_ray_cluster(): # autoscaler not enabled # If any of the pending trial cannot be fulfilled, # that's a good enough hint of trial resources not enough. for trial in all_trials: diff --git a/python/ray/tune/execution/placement_groups.py b/python/ray/tune/execution/placement_groups.py index 750b44782b0b..e2be28984b87 100644 --- a/python/ray/tune/execution/placement_groups.py +++ b/python/ray/tune/execution/placement_groups.py @@ -28,7 +28,7 @@ _tune_pg_prefix = None -def get_tune_pg_prefix(): +def _get_tune_pg_prefix(): """Get the tune placement group name prefix. This will store the prefix in a global variable so that subsequent runs @@ -260,7 +260,9 @@ def __repr__(self) -> str: ) +@DeveloperAPI def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]): + """Translates resource dict into PlacementGroupFactory.""" spec = spec or {"cpu": 1} if isinstance(spec, Resources): diff --git a/python/ray/tune/execution/ray_trial_executor.py b/python/ray/tune/execution/ray_trial_executor.py index f041abc6507e..fe6f886e304a 100644 --- a/python/ray/tune/execution/ray_trial_executor.py +++ b/python/ray/tune/execution/ray_trial_executor.py @@ -28,7 +28,7 @@ from ray.tune.utils import warn_if_slow from ray.tune.execution.placement_groups import ( _PlacementGroupManager, - get_tune_pg_prefix, + _get_tune_pg_prefix, ) from ray.tune.utils.resource_updater import _ResourceUpdater from ray.tune.trainable.util import TrainableUtil @@ -85,7 +85,7 @@ def unwrap(self): return self._result -def post_stop_cleanup(future, pg): +def _post_stop_cleanup(future, pg): """Things to be done after a trial is stopped.""" assert isinstance(pg, PlacementGroup) try: @@ -137,7 +137,7 @@ def is_empty(self): return len(self._future_to_insert_time) == 0 -def noop_logger_creator(config, logdir): +def _noop_logger_creator(config, logdir): # Set the working dir in the remote process, for user file writes os.makedirs(logdir, exist_ok=True) if not ray._private.worker._mode() == ray._private.worker.LOCAL_MODE: @@ -219,7 +219,7 @@ def __init__( self._reuse_actors = reuse_actors # The maxlen will be updated when `set_max_pending_trials()` is called self._cached_actor_pg = deque(maxlen=1) - self._pg_manager = _PlacementGroupManager(prefix=get_tune_pg_prefix()) + self._pg_manager = _PlacementGroupManager(prefix=_get_tune_pg_prefix()) self._staged_trials = set() self._trial_just_finished = False self._trial_just_finished_before = False @@ -323,7 +323,7 @@ def _setup_remote_runner(self, trial): trial.init_logdir() # We checkpoint metadata here to try mitigating logdir duplication self._trials_to_cache.add(trial) - logger_creator = partial(noop_logger_creator, logdir=trial.logdir) + logger_creator = partial(_noop_logger_creator, logdir=trial.logdir) if len(self._cached_actor_pg) > 0: assert self._reuse_actors @@ -708,7 +708,7 @@ def _do_force_trial_cleanup(self) -> None: break if next_future_to_clean in self._futures.keys(): _, pg = self._futures.pop(next_future_to_clean) - post_stop_cleanup(next_future_to_clean, pg) + _post_stop_cleanup(next_future_to_clean, pg) else: # This just means that before the deadline reaches, # the future is already cleaned up. @@ -838,7 +838,7 @@ def cleanup(self, trials: List[Trial]) -> None: continue event_type, trial_or_pg = self._futures.pop(ready[0]) if event_type == _ExecutorEventType.STOP_RESULT: - post_stop_cleanup(ready[0], trial_or_pg) + _post_stop_cleanup(ready[0], trial_or_pg) self._pg_manager.reconcile_placement_groups(trials) self._pg_manager.cleanup(force=True) @@ -981,7 +981,7 @@ def get_next_executor_event( result_type, trial_or_pg = self._futures.pop(ready_future) if result_type == _ExecutorEventType.STOP_RESULT: pg = trial_or_pg - post_stop_cleanup(ready_future, pg) + _post_stop_cleanup(ready_future, pg) else: trial = trial_or_pg assert isinstance(trial, Trial) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index ddc5b439351b..31b321c9d4fb 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -53,7 +53,7 @@ logger = logging.getLogger(__name__) -def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]: +def _find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]: """Returns path to most recently modified checkpoint.""" full_paths = [ os.path.join(ckpt_dir, fname) @@ -65,7 +65,7 @@ def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]: return max(full_paths) -def load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs): +def _load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs): new_trial = Trial( trial_cp["trainable_name"], stub=stub, _setup_default_resource=False, **kwargs ) @@ -73,7 +73,7 @@ def load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs): return new_trial -def load_trials_from_experiment_checkpoint( +def _load_trials_from_experiment_checkpoint( experiment_checkpoint: Mapping[str, Any], stub: bool = False ) -> List[Trial]: """Create trial objects from experiment checkpoint. @@ -87,7 +87,7 @@ def load_trials_from_experiment_checkpoint( trials = [] for trial_cp in checkpoints: - trials.append(load_trial_from_checkpoint(trial_cp, stub=stub)) + trials.append(_load_trial_from_checkpoint(trial_cp, stub=stub)) return trials @@ -713,7 +713,9 @@ def resume( Requires user to manually re-register their objects. Also stops all ongoing trials. """ - newest_ckpt_path = find_newest_experiment_checkpoint(self._local_checkpoint_dir) + newest_ckpt_path = _find_newest_experiment_checkpoint( + self._local_checkpoint_dir + ) if not newest_ckpt_path: raise ValueError( @@ -742,7 +744,7 @@ def resume( if self._search_alg.has_checkpoint(self._local_checkpoint_dir): self._search_alg.restore_from_dir(self._local_checkpoint_dir) - trials = load_trials_from_experiment_checkpoint(runner_state) + trials = _load_trials_from_experiment_checkpoint(runner_state) for trial in sorted(trials, key=lambda t: t.last_update_time, reverse=True): trial_to_add = trial if trial.status == Trial.ERROR: @@ -1011,14 +1013,14 @@ def add_trial(self, trial: Trial): self.trial_executor.mark_trial_to_checkpoint(trial) def debug_string(self, delim="\n"): - from ray.tune.progress_reporter import trial_progress_str + from ray.tune.progress_reporter import _trial_progress_str result_keys = [list(t.last_result) for t in self.get_trials() if t.last_result] metrics = set().union(*result_keys) messages = [ self._scheduler_alg.debug_string(), self.trial_executor.debug_string(), - trial_progress_str(self.get_trials(), metrics, force_table=True), + _trial_progress_str(self.get_trials(), metrics, force_table=True), ] return delim.join(messages) diff --git a/python/ray/tune/experiment/__init__.py b/python/ray/tune/experiment/__init__.py index ba32cbe4f9c2..39a20fc56e8e 100644 --- a/python/ray/tune/experiment/__init__.py +++ b/python/ray/tune/experiment/__init__.py @@ -1,4 +1,4 @@ -from ray.tune.experiment.experiment import Experiment, convert_to_experiment_list +from ray.tune.experiment.experiment import Experiment, _convert_to_experiment_list from ray.tune.experiment.trial import Trial -__all__ = ["Experiment", "convert_to_experiment_list", "Trial"] +__all__ = ["Experiment", "_convert_to_experiment_list", "Trial"] diff --git a/python/ray/tune/experiment/config_parser.py b/python/ray/tune/experiment/config_parser.py index a698bb04dcdc..f598d8c52763 100644 --- a/python/ray/tune/experiment/config_parser.py +++ b/python/ray/tune/experiment/config_parser.py @@ -14,7 +14,7 @@ from ray.tune.utils.util import SafeFallbackEncoder -def make_parser(parser_creator=None, **kwargs): +def _make_parser(parser_creator=None, **kwargs): """Returns a base argument parser for the ray.tune tool. Args: @@ -145,7 +145,7 @@ def make_parser(parser_creator=None, **kwargs): return parser -def to_argv(config): +def _to_argv(config): """Converts configuration to a command line argument format.""" argv = [] for k, v in config.items(): @@ -169,7 +169,7 @@ def to_argv(config): _cached_pgf = {} -def create_trial_from_spec( +def _create_trial_from_spec( spec: dict, output_path: str, parser: argparse.ArgumentParser, **trial_kwargs ): """Creates a Trial object from parsing the spec. @@ -193,7 +193,7 @@ def create_trial_from_spec( resources = spec.pop("resources_per_trial", None) try: - args, _ = parser.parse_known_args(to_argv(spec)) + args, _ = parser.parse_known_args(_to_argv(spec)) except SystemExit: raise TuneError("Error parsing args, see above message", spec) diff --git a/python/ray/tune/experiment/experiment.py b/python/ray/tune/experiment/experiment.py index da958f6a44bf..b3b9a3783738 100644 --- a/python/ray/tune/experiment/experiment.py +++ b/python/ray/tune/experiment/experiment.py @@ -14,7 +14,7 @@ from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.stopper import CombinedStopper, FunctionStopper, Stopper, TimeoutStopper from ray.tune.syncer import SyncConfig -from ray.tune.utils import date_str, detect_checkpoint_function +from ray.tune.utils import date_str, _detect_checkpoint_function from ray.util.annotations import DeveloperAPI @@ -144,7 +144,7 @@ def __init__( if ( callable(run) and not inspect.isclass(run) - and detect_checkpoint_function(run) + and _detect_checkpoint_function(run) ): if checkpoint_at_end: raise ValueError( @@ -424,7 +424,7 @@ def public_spec(self) -> Dict[str, Any]: return {k: v for k, v in self.spec.items() if k in self.PUBLIC_KEYS} -def convert_to_experiment_list(experiments: Union[Experiment, List[Experiment], Dict]): +def _convert_to_experiment_list(experiments: Union[Experiment, List[Experiment], Dict]): """Produces a list of Experiment objects. Converts input from dict, single experiment, or list of diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index aa3be23ff69a..468d41672824 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -167,7 +167,7 @@ def trial_resources(self, new_resources: Union[Resources, PlacementGroupFactory] self._trial_resources = new_resources -def create_unique_logdir_name(root: str, relative_logdir: str) -> str: +def _create_unique_logdir_name(root: str, relative_logdir: str) -> str: candidate = Path(root).expanduser().joinpath(relative_logdir) if candidate.exists(): relative_logdir_old = relative_logdir @@ -559,7 +559,7 @@ def reset(self): def init_logdir(self): """Init logdir.""" if not self.relative_logdir: - self.relative_logdir = create_unique_logdir_name( + self.relative_logdir = _create_unique_logdir_name( self.local_dir, self._generate_dirname() ) assert self.logdir diff --git a/python/ray/tune/logger/logger.py b/python/ray/tune/logger/logger.py index 397b23ba72f3..3998beb44a3d 100644 --- a/python/ray/tune/logger/logger.py +++ b/python/ray/tune/logger/logger.py @@ -212,6 +212,7 @@ def represent_sequence(self, tag, sequence, flow_style=None): return super().represent_sequence(tag, sequence, flow_style=flow_style) +@DeveloperAPI def pretty_print(result): result = result.copy() result.update(config=None) # drop config from pretty print diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index 941d675fdccd..b039f849c6ca 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -348,8 +348,8 @@ def _progress_str( self._metric_columns.update(user_metrics) messages = [ "== Status ==", - time_passed_str(self._start_time, time.time()), - memory_debug_str(), + _time_passed_str(self._start_time, time.time()), + _memory_debug_str(), *sys_info, ] if done: @@ -362,13 +362,13 @@ def _progress_str( current_best_trial, metric = self._current_best_trial(trials) if current_best_trial: messages.append( - best_trial_str(current_best_trial, metric, self._parameter_columns) + _best_trial_str(current_best_trial, metric, self._parameter_columns) ) if has_verbosity(Verbosity.V1_EXPERIMENT): # Will filter the table in `trial_progress_str` messages.append( - trial_progress_str( + _trial_progress_str( trials, metric_columns=self._metric_columns, parameter_columns=self._parameter_columns, @@ -383,7 +383,7 @@ def _progress_str( sort_by_metric=self._sort_by_metric, ) ) - messages.append(trial_errors_str(trials, fmt=fmt, max_rows=max_error)) + messages.append(_trial_errors_str(trials, fmt=fmt, max_rows=max_error)) return delim.join(messages) + delim @@ -641,7 +641,7 @@ def report(self, trials: List[Trial], done: bool, *sys_info: Dict): print(self._progress_str(trials, done, *sys_info)) -def memory_debug_str(): +def _memory_debug_str(): try: import ray # noqa F401 @@ -667,7 +667,7 @@ def memory_debug_str(): return "Unknown memory usage. Please run `pip install psutil` to resolve)" -def time_passed_str(start_time: float, current_time: float): +def _time_passed_str(start_time: float, current_time: float): current_time_dt = datetime.datetime.fromtimestamp(current_time) start_time_dt = datetime.datetime.fromtimestamp(start_time) delta: datetime.timedelta = current_time_dt - start_time_dt @@ -703,7 +703,7 @@ def _get_trials_by_state(trials: List[Trial]): return trials_by_state -def trial_progress_str( +def _trial_progress_str( trials: List[Trial], metric_columns: Union[List[str], Dict[str, str]], parameter_columns: Optional[Union[List[str], Dict[str, str]]] = None, @@ -776,7 +776,7 @@ def trial_progress_str( ) if force_table or (has_verbosity(Verbosity.V2_TRIAL_NORM) and done): - messages += trial_progress_table( + messages += _trial_progress_table( trials=trials, metric_columns=metric_columns, parameter_columns=parameter_columns, @@ -819,7 +819,7 @@ def _max_len(value: Any, max_len: int = 20, add_addr: bool = False) -> Any: return result -def trial_progress_table( +def _trial_progress_table( trials: List[Trial], metric_columns: Union[List[str], Dict[str, str]], parameter_columns: Optional[Union[List[str], Dict[str, str]]] = None, @@ -939,7 +939,7 @@ def trial_progress_table( return messages -def trial_errors_str( +def _trial_errors_str( trials: List[Trial], fmt: str = "psql", max_rows: Optional[int] = None ): """Returns a readable message regarding trial errors. @@ -973,7 +973,7 @@ def trial_errors_str( return delim.join(messages) -def best_trial_str( +def _best_trial_str( trial: Trial, metric: str, parameter_columns: Optional[Union[List[str], Dict[str, str]]] = None, @@ -1218,7 +1218,7 @@ def _print_result(self, result: Dict): return print_result_str -def detect_reporter(**kwargs) -> TuneReporterBase: +def _detect_reporter(**kwargs) -> TuneReporterBase: """Detect progress reporter class. Will return a :class:`JupyterNotebookReporter` if a IPython/Jupyter-like @@ -1234,7 +1234,7 @@ def detect_reporter(**kwargs) -> TuneReporterBase: return progress_reporter -def detect_progress_metrics( +def _detect_progress_metrics( trainable: Optional[Union["Trainable", Callable]] ) -> Optional[List[str]]: """Detect progress metrics to report.""" diff --git a/python/ray/tune/registry.py b/python/ray/tune/registry.py index af941ef7fdab..e9e520c18664 100644 --- a/python/ray/tune/registry.py +++ b/python/ray/tune/registry.py @@ -12,6 +12,7 @@ _internal_kv_put, ) from ray.tune.error import TuneError +from ray.util.annotations import DeveloperAPI, PublicAPI TRAINABLE_CLASS = "trainable_class" ENV_CREATOR = "env_creator" @@ -35,25 +36,28 @@ logger = logging.getLogger(__name__) -def has_trainable(trainable_name): +def _has_trainable(trainable_name): return _global_registry.contains(TRAINABLE_CLASS, trainable_name) +@DeveloperAPI def get_trainable_cls(trainable_name): validate_trainable(trainable_name) return _global_registry.get(TRAINABLE_CLASS, trainable_name) +@DeveloperAPI def validate_trainable(trainable_name): - if not has_trainable(trainable_name): + if not _has_trainable(trainable_name): # Make sure everything rllib-related is registered. from ray.rllib import _register_all _register_all() - if not has_trainable(trainable_name): + if not _has_trainable(trainable_name): raise TuneError("Unknown trainable: " + trainable_name) +@DeveloperAPI def is_function_trainable(trainable: Union[str, Callable, Type]) -> bool: """Check if a given trainable is a function trainable.""" if isinstance(trainable, str): @@ -66,6 +70,7 @@ def is_function_trainable(trainable: Union[str, Callable, Type]) -> bool: ) +@PublicAPI(stability="alpha") def register_trainable(name: str, trainable: Union[Callable, Type], warn: bool = True): """Register a trainable function or class. @@ -96,6 +101,7 @@ def register_trainable(name: str, trainable: Union[Callable, Type], warn: bool = _global_registry.register(TRAINABLE_CLASS, name, trainable) +@PublicAPI(stability="alpha") def register_env(name: str, env_creator: Callable): """Register a custom environment for use with RLlib. @@ -112,6 +118,7 @@ def register_env(name: str, env_creator: Callable): _global_registry.register(ENV_CREATOR, name, env_creator) +@PublicAPI(stability="alpha") def register_input(name: str, input_creator: Callable): """Register a custom input api for RLlib. @@ -125,15 +132,17 @@ def register_input(name: str, input_creator: Callable): _global_registry.register(RLLIB_INPUT, name, input_creator) +@DeveloperAPI def registry_contains_input(name: str) -> bool: return _global_registry.contains(RLLIB_INPUT, name) +@DeveloperAPI def registry_get_input(name: str) -> Callable: return _global_registry.get(RLLIB_INPUT, name) -def check_serializability(key, value): +def _check_serializability(key, value): _global_registry.register(TEST, key, value) diff --git a/python/ray/tune/resources.py b/python/ray/tune/resources.py index 6de29d624a7d..01a6dc8a6cd2 100644 --- a/python/ray/tune/resources.py +++ b/python/ray/tune/resources.py @@ -6,7 +6,7 @@ # For compatibility under py2 to consider unicode as str from typing import Optional -from ray.util.annotations import Deprecated +from ray.util.annotations import Deprecated, DeveloperAPI from six import string_types from ray._private.resource_spec import NODE_ID_PREFIX @@ -227,6 +227,7 @@ def to_json(self): return resources_to_json(self) +@DeveloperAPI def json_to_resources(data: Optional[str]): if data is None or data == "null": return None @@ -259,6 +260,7 @@ def json_to_resources(data: Optional[str]): ) +@DeveloperAPI def resources_to_json(resources: Optional[Resources]): if resources is None: return None diff --git a/python/ray/tune/schedulers/__init__.py b/python/ray/tune/schedulers/__init__.py index f6d93bdef048..6ca250b509be 100644 --- a/python/ray/tune/schedulers/__init__.py +++ b/python/ray/tune/schedulers/__init__.py @@ -11,6 +11,7 @@ PopulationBasedTrainingReplay, ) from ray.tune.schedulers.resource_changing_scheduler import ResourceChangingScheduler +from ray.util import PublicAPI def _pb2_importer(): @@ -39,6 +40,7 @@ def _pb2_importer(): } +@PublicAPI(stability="alpha") def create_scheduler( scheduler, **kwargs, diff --git a/python/ray/tune/schedulers/util.py b/python/ray/tune/schedulers/util.py index 729327efa6f5..0d0e012a8367 100644 --- a/python/ray/tune/schedulers/util.py +++ b/python/ray/tune/schedulers/util.py @@ -4,7 +4,7 @@ logger = logging.getLogger(__name__) -def set_search_properties_backwards_compatible( +def _set_search_properties_backwards_compatible( set_search_properties_func, metric: Optional[str], mode: Optional[str], **spec ) -> bool: """Wraps around set_search_properties() so that it is backward compatible. diff --git a/python/ray/tune/search/__init__.py b/python/ray/tune/search/__init__.py index 8139a45dfba1..af074911ed96 100644 --- a/python/ray/tune/search/__init__.py +++ b/python/ray/tune/search/__init__.py @@ -8,6 +8,7 @@ from ray.tune.search.search_generator import SearchGenerator from ray._private.utils import get_function_args +from ray.util import PublicAPI def _import_variant_generator(): @@ -111,6 +112,7 @@ def _import_hebo_search(): } +@PublicAPI(stability="alpha") def create_searcher( search_alg, **kwargs, diff --git a/python/ray/tune/search/basic_variant.py b/python/ray/tune/search/basic_variant.py index ff34a08c05cf..70bdaaf0c873 100644 --- a/python/ray/tune/search/basic_variant.py +++ b/python/ray/tune/search/basic_variant.py @@ -8,18 +8,18 @@ import numpy as np from ray.tune.error import TuneError -from ray.tune.experiment.config_parser import make_parser, create_trial_from_spec +from ray.tune.experiment.config_parser import _make_parser, _create_trial_from_spec from ray.tune.search.sample import np_random_generator, _BackwardsCompatibleNumpyRng from ray.tune.search.variant_generator import ( - count_variants, - count_spec_samples, + _count_variants, + _count_spec_samples, generate_variants, format_vars, - flatten_resolved_vars, - get_preset_variants, + _flatten_resolved_vars, + _get_preset_variants, ) from ray.tune.search.search_algorithm import SearchAlgorithm -from ray.tune.utils.util import atomic_save, load_newest_checkpoint +from ray.tune.utils.util import _atomic_save, _load_newest_checkpoint from ray.util import PublicAPI if TYPE_CHECKING: @@ -103,7 +103,7 @@ def __init__( Union[int, "np_random_generator", np.random.RandomState] ] = None, ): - self.parser = make_parser() + self.parser = _make_parser() self.num_samples = num_samples self.uuid_prefix = uuid_prefix self.num_samples_left = num_samples @@ -124,11 +124,11 @@ def create_trial(self, resolved_vars, spec): if resolved_vars: experiment_tag += "_{}".format(format_vars(resolved_vars)) self.counter += 1 - return create_trial_from_spec( + return _create_trial_from_spec( spec, self.output_path, self.parser, - evaluated_params=flatten_resolved_vars(resolved_vars), + evaluated_params=_flatten_resolved_vars(resolved_vars), trial_id=trial_id, experiment_tag=experiment_tag, ) @@ -158,7 +158,7 @@ def __next__(self): config = self.points_to_evaluate.pop(0) self.num_samples_left -= 1 self.variants = _VariantIterator( - get_preset_variants( + _get_preset_variants( self.unresolved_spec, config, constant_grid_search=self.constant_grid_search, @@ -327,12 +327,12 @@ def add_configurations( Arguments: experiments: Experiments to run. """ - from ray.tune.experiment import convert_to_experiment_list + from ray.tune.experiment import _convert_to_experiment_list - experiment_list = convert_to_experiment_list(experiments) + experiment_list = _convert_to_experiment_list(experiments) for experiment in experiment_list: - grid_vals = count_spec_samples(experiment.spec, num_samples=1) + grid_vals = _count_spec_samples(experiment.spec, num_samples=1) lazy_eval = grid_vals > SERIALIZATION_THRESHOLD if lazy_eval: warnings.warn( @@ -345,7 +345,7 @@ def add_configurations( previous_samples = self._total_samples points_to_evaluate = copy.deepcopy(self._points_to_evaluate) - self._total_samples += count_variants(experiment.spec, points_to_evaluate) + self._total_samples += _count_variants(experiment.spec, points_to_evaluate) iterator = _TrialIterator( uuid_prefix=self._uuid_prefix, num_samples=experiment.spec.get("num_samples", 1), @@ -404,7 +404,7 @@ def save_to_dir(self, dirpath, session_str): if any(iterator.lazy_eval for iterator in self._iterators): return False state_dict = self.get_state() - atomic_save( + _atomic_save( state=state_dict, checkpoint_dir=dirpath, file_name=self.CKPT_FILE_TMPL.format(session_str), @@ -417,7 +417,7 @@ def has_checkpoint(self, dirpath: str): def restore_from_dir(self, dirpath: str): """Restores self + searcher + search wrappers from dirpath.""" - state_dict = load_newest_checkpoint(dirpath, self.CKPT_FILE_TMPL.format("*")) + state_dict = _load_newest_checkpoint(dirpath, self.CKPT_FILE_TMPL.format("*")) if not state_dict: raise RuntimeError("Unable to find checkpoint in {}.".format(dirpath)) self.set_state(state_dict) diff --git a/python/ray/tune/search/concurrency_limiter.py b/python/ray/tune/search/concurrency_limiter.py index 54987398e123..383296ffa049 100644 --- a/python/ray/tune/search/concurrency_limiter.py +++ b/python/ray/tune/search/concurrency_limiter.py @@ -3,7 +3,7 @@ from typing import Dict, Optional, List from ray.tune.search.searcher import Searcher -from ray.tune.search.util import set_search_properties_backwards_compatible +from ray.tune.search.util import _set_search_properties_backwards_compatible from ray.util.annotations import PublicAPI @@ -85,7 +85,7 @@ def set_search_properties( self, metric: Optional[str], mode: Optional[str], config: Dict, **spec ) -> bool: self._set_searcher_max_concurrency() - return set_search_properties_backwards_compatible( + return _set_search_properties_backwards_compatible( self.searcher.set_search_properties, metric, mode, config, **spec ) diff --git a/python/ray/tune/search/repeater.py b/python/ray/tune/search/repeater.py index 41e477b11838..4d8331b20ad8 100644 --- a/python/ray/tune/search/repeater.py +++ b/python/ray/tune/search/repeater.py @@ -5,7 +5,7 @@ import numpy as np from ray.tune.search import Searcher -from ray.tune.search.util import set_search_properties_backwards_compatible +from ray.tune.search.util import _set_search_properties_backwards_compatible from ray.util import PublicAPI logger = logging.getLogger(__name__) @@ -188,6 +188,6 @@ def set_state(self, state: Dict): def set_search_properties( self, metric: Optional[str], mode: Optional[str], config: Dict, **spec ) -> bool: - return set_search_properties_backwards_compatible( + return _set_search_properties_backwards_compatible( self.searcher.set_search_properties, metric, mode, config, **spec ) diff --git a/python/ray/tune/search/search_generator.py b/python/ray/tune/search/search_generator.py index ebe4412c7834..a287dd52c27a 100644 --- a/python/ray/tune/search/search_generator.py +++ b/python/ray/tune/search/search_generator.py @@ -3,18 +3,18 @@ from typing import Dict, List, Optional, Union from ray.tune.error import TuneError -from ray.tune.experiment import Experiment, convert_to_experiment_list -from ray.tune.experiment.config_parser import make_parser, create_trial_from_spec +from ray.tune.experiment import Experiment, _convert_to_experiment_list +from ray.tune.experiment.config_parser import _make_parser, _create_trial_from_spec from ray.tune.search.search_algorithm import SearchAlgorithm from ray.tune.search import Searcher -from ray.tune.search.util import set_search_properties_backwards_compatible -from ray.tune.search.variant_generator import format_vars, resolve_nested_dict +from ray.tune.search.util import _set_search_properties_backwards_compatible +from ray.tune.search.variant_generator import format_vars, _resolve_nested_dict from ray.tune.experiment import Trial from ray.tune.utils.util import ( flatten_dict, merge_dicts, - atomic_save, - load_newest_checkpoint, + _atomic_save, + _load_newest_checkpoint, ) from ray.util.annotations import DeveloperAPI @@ -47,7 +47,7 @@ def __init__(self, searcher: Searcher): type(searcher), Searcher ), "Searcher should be subclassing Searcher." self.searcher = searcher - self._parser = make_parser() + self._parser = _make_parser() self._experiment = None self._counter = 0 # Keeps track of number of trials created. self._total_samples = 0 # int: total samples to evaluate. @@ -60,7 +60,7 @@ def metric(self): def set_search_properties( self, metric: Optional[str], mode: Optional[str], config: Dict, **spec ) -> bool: - return set_search_properties_backwards_compatible( + return _set_search_properties_backwards_compatible( self.searcher.set_search_properties, metric, mode, config, **spec ) @@ -78,7 +78,7 @@ def add_configurations( """ assert not self._experiment logger.debug("added configurations") - experiment_list = convert_to_experiment_list(experiments) + experiment_list = _convert_to_experiment_list(experiments) assert ( len(experiment_list) == 1 ), "SearchAlgorithms can only support 1 experiment at a time." @@ -119,10 +119,10 @@ def create_trial_if_possible( spec["config"] = merge_dicts(spec["config"], copy.deepcopy(suggested_config)) # Create a new trial_id if duplicate trial is created - flattened_config = resolve_nested_dict(spec["config"]) + flattened_config = _resolve_nested_dict(spec["config"]) self._counter += 1 tag = "{0}_{1}".format(str(self._counter), format_vars(flattened_config)) - trial = create_trial_from_spec( + trial = _create_trial_from_spec( spec, output_path, self._parser, @@ -159,7 +159,7 @@ def set_state(self, state: Dict): self._experiment = state["experiment"] def has_checkpoint(self, dirpath: str): - return bool(load_newest_checkpoint(dirpath, self.CKPT_FILE_TMPL.format("*"))) + return bool(_load_newest_checkpoint(dirpath, self.CKPT_FILE_TMPL.format("*"))) def save_to_dir(self, dirpath: str, session_str: str): """Saves self + searcher to dir. @@ -190,7 +190,7 @@ def save_to_dir(self, dirpath: str, session_str: str): # We save the base searcher separately for users to easily # separate the searcher. base_searcher.save_to_dir(dirpath, session_str) - atomic_save( + _atomic_save( state=search_alg_state, checkpoint_dir=dirpath, file_name=self.CKPT_FILE_TMPL.format(session_str), @@ -201,7 +201,7 @@ def restore_from_dir(self, dirpath: str): """Restores self + searcher + search wrappers from dirpath.""" searcher = self.searcher - search_alg_state = load_newest_checkpoint( + search_alg_state = _load_newest_checkpoint( dirpath, self.CKPT_FILE_TMPL.format("*") ) if not search_alg_state: diff --git a/python/ray/tune/search/searcher.py b/python/ray/tune/search/searcher.py index 5651372de4cd..7cec8bb5d23c 100644 --- a/python/ray/tune/search/searcher.py +++ b/python/ray/tune/search/searcher.py @@ -5,7 +5,7 @@ import warnings from typing import Dict, Optional, List, Union, Any, TYPE_CHECKING -from ray.tune.search.util import set_search_properties_backwards_compatible +from ray.tune.search.util import _set_search_properties_backwards_compatible from ray.util.annotations import DeveloperAPI, PublicAPI from ray.util.debug import log_once @@ -501,7 +501,7 @@ def set_search_properties( self, metric: Optional[str], mode: Optional[str], config: Dict, **spec ) -> bool: self._set_searcher_max_concurrency() - return set_search_properties_backwards_compatible( + return _set_search_properties_backwards_compatible( self.searcher.set_search_properties, metric, mode, config, **spec ) diff --git a/python/ray/tune/search/util.py b/python/ray/tune/search/util.py index 5f973b1c4064..9358bd32af63 100644 --- a/python/ray/tune/search/util.py +++ b/python/ray/tune/search/util.py @@ -4,7 +4,7 @@ logger = logging.getLogger(__name__) -def set_search_properties_backwards_compatible( +def _set_search_properties_backwards_compatible( set_search_properties_func, metric: Optional[str], mode: Optional[str], diff --git a/python/ray/tune/search/variant_generator.py b/python/ray/tune/search/variant_generator.py index 8760d3bc44df..4491a7b5ee86 100644 --- a/python/ray/tune/search/variant_generator.py +++ b/python/ray/tune/search/variant_generator.py @@ -8,11 +8,12 @@ import random from ray.tune.search.sample import Categorical, Domain, Function, RandomState -from ray.util.annotations import DeveloperAPI +from ray.util.annotations import DeveloperAPI, PublicAPI logger = logging.getLogger(__name__) +@DeveloperAPI def generate_variants( unresolved_spec: Dict, constant_grid_search: bool = False, @@ -48,7 +49,7 @@ def generate_variants( Yields: (Dict of resolved variables, Spec object) """ - for resolved_vars, spec in _generate_variants( + for resolved_vars, spec in _generate_variants_internal( unresolved_spec, constant_grid_search=constant_grid_search, random_state=random_state, @@ -57,6 +58,7 @@ def generate_variants( yield resolved_vars, spec +@PublicAPI(stability="alpha") def grid_search(values: Iterable) -> Dict[str, List]: """Convenience method for specifying grid search over a value. @@ -74,7 +76,7 @@ def grid_search(values: Iterable) -> Dict[str, List]: _MAX_RESOLUTION_PASSES = 20 -def resolve_nested_dict(nested_dict: Dict) -> Dict[Tuple, Any]: +def _resolve_nested_dict(nested_dict: Dict) -> Dict[Tuple, Any]: """Flattens a nested dict by joining keys into tuple of paths. Can then be passed into `format_vars`. @@ -82,13 +84,14 @@ def resolve_nested_dict(nested_dict: Dict) -> Dict[Tuple, Any]: res = {} for k, v in nested_dict.items(): if isinstance(v, dict): - for k_, v_ in resolve_nested_dict(v).items(): + for k_, v_ in _resolve_nested_dict(v).items(): res[(k,) + k_] = v_ else: res[(k,)] = v return res +@DeveloperAPI def format_vars(resolved_vars: Dict) -> str: """Format variables to be used as experiment tags. @@ -120,7 +123,7 @@ def format_vars(resolved_vars: Dict) -> str: ) -def flatten_resolved_vars(resolved_vars: Dict) -> Dict: +def _flatten_resolved_vars(resolved_vars: Dict) -> Dict: """Formats the resolved variable dict into a mapping of (str -> value).""" flattened_resolved_vars_dict = {} for pieces, value in resolved_vars.items(): @@ -142,6 +145,7 @@ def _clean_value(value: Any) -> str: return re.sub(invalid_alphabet, "_", str(value)).strip("_") +@DeveloperAPI def parse_spec_vars( spec: Dict, ) -> Tuple[List[Tuple[Tuple, Any]], List[Tuple[Tuple, Any]], List[Tuple[Tuple, Any]]]: @@ -163,7 +167,7 @@ def parse_spec_vars( return resolved_vars, domain_vars, grid_vars -def count_spec_samples(spec: Dict, num_samples=1) -> int: +def _count_spec_samples(spec: Dict, num_samples=1) -> int: """Count samples for a specific spec""" _, domain_vars, grid_vars = parse_spec_vars(spec) grid_count = 1 @@ -172,7 +176,7 @@ def count_spec_samples(spec: Dict, num_samples=1) -> int: return num_samples * grid_count -def count_variants(spec: Dict, presets: Optional[List[Dict]] = None) -> int: +def _count_variants(spec: Dict, presets: Optional[List[Dict]] = None) -> int: # Helper function: Deep update dictionary def deep_update(d, u): for k, v in u.items(): @@ -189,16 +193,16 @@ def deep_update(d, u): for preset in presets: preset_spec = copy.deepcopy(spec) deep_update(preset_spec["config"], preset) - total_samples += count_spec_samples(preset_spec, 1) + total_samples += _count_spec_samples(preset_spec, 1) total_num_samples -= 1 # Add the remaining samples if total_num_samples > 0: - total_samples += count_spec_samples(spec, total_num_samples) + total_samples += _count_spec_samples(spec, total_num_samples) return total_samples -def _generate_variants( +def _generate_variants_internal( spec: Dict, constant_grid_search: bool = False, random_state: "RandomState" = None ) -> Tuple[Dict, Dict]: spec = copy.deepcopy(spec) @@ -231,7 +235,7 @@ def _generate_variants( resolved_spec, to_resolve, random_state=random_state ) - for resolved, spec in _generate_variants( + for resolved, spec in _generate_variants_internal( resolved_spec, constant_grid_search=constant_grid_search, random_state=random_state, @@ -253,7 +257,7 @@ def _generate_variants( yield resolved_vars, spec -def get_preset_variants( +def _get_preset_variants( spec: Dict, config: Dict, constant_grid_search: bool = False, @@ -304,11 +308,12 @@ def get_preset_variants( ) assign_value(spec["config"], path, val) - return _generate_variants( + return _generate_variants_internal( spec, constant_grid_search=constant_grid_search, random_state=random_state ) +@DeveloperAPI def assign_value(spec: Dict, path: Tuple, value: Any): for k in path[:-1]: spec = spec[k] @@ -447,7 +452,7 @@ def _unresolved_values(spec: Dict) -> Dict[Tuple, Any]: return _split_resolved_unresolved_values(spec)[1] -def has_unresolved_values(spec: Dict) -> bool: +def _has_unresolved_values(spec: Dict) -> bool: return True if _unresolved_values(spec) else False diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index 7c7001ade46c..cbc64207e388 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -30,7 +30,7 @@ from ray.tune.experiment import Experiment from ray.tune.trainable import wrap_function from ray.tune.logger import Logger, LegacyLoggerCallback -from ray.tune.execution.ray_trial_executor import noop_logger_creator +from ray.tune.execution.ray_trial_executor import _noop_logger_creator from ray.tune.resources import Resources from ray.tune.result import ( TIMESTEPS_TOTAL, @@ -960,7 +960,9 @@ def _testDurableTrainable(self, trainable, function=False, cleanup=True): remote_checkpoint_dir = "memory:///unit-test/bucket" _ensure_directory(remote_checkpoint_dir) - log_creator = partial(noop_logger_creator, logdir="~/tmp/ray_results/exp/trial") + log_creator = partial( + _noop_logger_creator, logdir="~/tmp/ray_results/exp/trial" + ) test_trainable = trainable( logger_creator=log_creator, remote_checkpoint_dir=remote_checkpoint_dir ) @@ -981,7 +983,7 @@ def _testDurableTrainable(self, trainable, function=False, cleanup=True): self.assertEqual(test_trainable.state["hi"], 1) else: # Cannot re-use function trainable, create new - tune.trainable.session.shutdown() + tune.trainable.session._shutdown() test_trainable = trainable( logger_creator=log_creator, remote_checkpoint_dir=remote_checkpoint_dir, diff --git a/python/ray/tune/tests/test_experiment.py b/python/ray/tune/tests/test_experiment.py index 9b4c98d95c8c..815ac70c4af9 100644 --- a/python/ray/tune/tests/test_experiment.py +++ b/python/ray/tune/tests/test_experiment.py @@ -4,7 +4,7 @@ import ray from ray.rllib import _register_all from ray.tune import register_trainable -from ray.tune.experiment import Experiment, convert_to_experiment_list +from ray.tune.experiment import Experiment, _convert_to_experiment_list from ray.tune.error import TuneError from ray.tune.utils import diagnose_serialization @@ -25,12 +25,12 @@ def testConvertExperimentFromExperiment(self): exp1 = Experiment( **{"name": "foo", "run": "f1", "config": {"script_min_iter_time_s": 0}} ) - result = convert_to_experiment_list(exp1) + result = _convert_to_experiment_list(exp1) self.assertEqual(len(result), 1) self.assertEqual(type(result), list) def testConvertExperimentNone(self): - result = convert_to_experiment_list(None) + result = _convert_to_experiment_list(None) self.assertEqual(len(result), 0) self.assertEqual(type(result), list) @@ -38,7 +38,7 @@ def testConvertExperimentList(self): exp1 = Experiment( **{"name": "foo", "run": "f1", "config": {"script_min_iter_time_s": 0}} ) - result = convert_to_experiment_list([exp1, exp1]) + result = _convert_to_experiment_list([exp1, exp1]) self.assertEqual(len(result), 2) self.assertEqual(type(result), list) @@ -47,12 +47,12 @@ def testConvertExperimentJSON(self): "name": {"run": "f1", "config": {"script_min_iter_time_s": 0}}, "named": {"run": "f1", "config": {"script_min_iter_time_s": 0}}, } - result = convert_to_experiment_list(experiment) + result = _convert_to_experiment_list(experiment) self.assertEqual(len(result), 2) self.assertEqual(type(result), list) def testConvertExperimentIncorrect(self): - self.assertRaises(TuneError, lambda: convert_to_experiment_list("hi")) + self.assertRaises(TuneError, lambda: _convert_to_experiment_list("hi")) class ValidateUtilTest(unittest.TestCase): diff --git a/python/ray/tune/tests/test_legacy_import.py b/python/ray/tune/tests/test_legacy_import.py index 00e42595c7d6..012874caadec 100644 --- a/python/ray/tune/tests/test_legacy_import.py +++ b/python/ray/tune/tests/test_legacy_import.py @@ -58,7 +58,10 @@ def test_import_module_raises_warnings(module): def test_import_experiment_experiment(logging_setup): # No warning - original imports still work - from ray.tune.experiment import Experiment, convert_to_experiment_list # noqa: F401 + from ray.tune.experiment import ( # noqa: F401 + Experiment, # noqa: F401 + _convert_to_experiment_list, # noqa: F401 + ) # noqa: F401 def test_import_logger_all(logging_setup): diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py index 5cba2af06ede..433058c726c6 100644 --- a/python/ray/tune/tests/test_progress_reporter.py +++ b/python/ray/tune/tests/test_progress_reporter.py @@ -12,10 +12,10 @@ JupyterNotebookReporter, ProgressReporter, _fair_filter_trials, - best_trial_str, - detect_reporter, - time_passed_str, - trial_progress_str, + _best_trial_str, + _detect_reporter, + _time_passed_str, + _trial_progress_str, TuneReporterBase, ) from ray.tune.result import AUTO_RESULT_KEYS @@ -430,21 +430,21 @@ def testProgressStr(self): t.__str__ = lambda self: self.trial_id trials.append(t) # One metric, two parameters - prog1 = trial_progress_str( + prog1 = _trial_progress_str( trials, ["metric_1"], ["a", "b"], fmt="psql", max_rows=3, force_table=True ) print(prog1) assert prog1 == EXPECTED_RESULT_1 # No metric, all parameters - prog2 = trial_progress_str( + prog2 = _trial_progress_str( trials, [], None, fmt="psql", max_rows=None, force_table=True ) print(prog2) assert prog2 == EXPECTED_RESULT_2 # Two metrics, one parameter, all with custom representation - prog3 = trial_progress_str( + prog3 = _trial_progress_str( trials, {"nested/sub": "NestSub", "metric_2": "Metric 2"}, {"a": "A"}, @@ -456,7 +456,7 @@ def testProgressStr(self): assert prog3 == EXPECTED_RESULT_3 # Current best trial - best1 = best_trial_str(trials[1], "metric_1") + best1 = _best_trial_str(trials[1], "metric_1") assert best1 == EXPECTED_BEST_1 def testBestTrialStr(self): @@ -466,10 +466,10 @@ def testBestTrialStr(self): trial = Trial("", config=config, stub=True) trial.last_result = {"metric": 1, "config": config} - result = best_trial_str(trial, "metric") + result = _best_trial_str(trial, "metric") self.assertIn("nested_value", result) - result = best_trial_str(trial, "metric", parameter_columns=["nested/conf"]) + result = _best_trial_str(trial, "metric", parameter_columns=["nested/conf"]) self.assertIn("nested_value", result) def testBestTrialZero(self): @@ -499,12 +499,12 @@ def testTimeElapsed(self): # Local timezone output can be tricky, so we don't check the # day and the hour in this test. - output = time_passed_str(time_start, time_now) + output = _time_passed_str(time_start, time_now) self.assertIn("Current time: 2016-02-", output) self.assertIn(":50:02 (running for 01:31:22.00)", output) time_now += 2 * 60 * 60 * 24 # plus two days - output = time_passed_str(time_start, time_now) + output = _time_passed_str(time_start, time_now) self.assertIn("Current time: 2016-02-", output) self.assertIn(":50:02 (running for 2 days, 01:31:22.00)", output) @@ -700,12 +700,12 @@ def testVerboseReporting(self): def testReporterDetection(self): """Test if correct reporter is returned from ``detect_reporter()``""" - reporter = detect_reporter() + reporter = _detect_reporter() self.assertTrue(isinstance(reporter, CLIReporter)) self.assertFalse(isinstance(reporter, JupyterNotebookReporter)) with patch("ray.tune.progress_reporter.IS_NOTEBOOK", True): - reporter = detect_reporter() + reporter = _detect_reporter() self.assertFalse(isinstance(reporter, CLIReporter)) self.assertTrue(isinstance(reporter, JupyterNotebookReporter)) @@ -733,7 +733,7 @@ def testMaxLen(self): t.__str__ = lambda self: self.trial_id trials.append(t) - progress_str = trial_progress_str( + progress_str = _trial_progress_str( trials, metric_columns=["some_metric"], force_table=True ) assert any(len(row) <= 90 for row in progress_str.split("\n")) diff --git a/python/ray/tune/tests/test_syncer_callback.py b/python/ray/tune/tests/test_syncer_callback.py index 76184e7b5fb5..9c72002b62c1 100644 --- a/python/ray/tune/tests/test_syncer_callback.py +++ b/python/ray/tune/tests/test_syncer_callback.py @@ -17,7 +17,7 @@ SyncerCallback, _BackgroundProcess, ) -from ray.tune.utils.callback import create_default_callbacks +from ray.tune.utils.callback import _create_default_callbacks from ray.tune.utils.file_transfer import sync_dir_between_nodes @@ -118,7 +118,7 @@ def wait(self): def test_syncer_callback_disabled(): """Check that syncer=None disables callback""" - callbacks = create_default_callbacks( + callbacks = _create_default_callbacks( callbacks=[], sync_config=SyncConfig(syncer=None) ) syncer_callback = None @@ -147,7 +147,7 @@ def test_syncer_callback_disabled(): def test_syncer_callback_noop_on_trial_cloud_checkpointing(): """Check that trial using cloud checkpointing disables sync to driver""" - callbacks = create_default_callbacks(callbacks=[], sync_config=SyncConfig()) + callbacks = _create_default_callbacks(callbacks=[], sync_config=SyncConfig()) syncer_callback = None for cb in callbacks: if isinstance(cb, SyncerCallback): @@ -174,7 +174,7 @@ def test_syncer_callback_noop_on_trial_cloud_checkpointing(): def test_syncer_callback_op_on_no_cloud_checkpointing(): """Check that without cloud checkpointing sync to driver is enabled""" - callbacks = create_default_callbacks(callbacks=[], sync_config=SyncConfig()) + callbacks = _create_default_callbacks(callbacks=[], sync_config=SyncConfig()) syncer_callback = None for cb in callbacks: if isinstance(cb, SyncerCallback): diff --git a/python/ray/tune/tests/test_trial_runner_callbacks.py b/python/ray/tune/tests/test_trial_runner_callbacks.py index 100963ed49bb..0a226e5a0933 100644 --- a/python/ray/tune/tests/test_trial_runner_callbacks.py +++ b/python/ray/tune/tests/test_trial_runner_callbacks.py @@ -24,7 +24,7 @@ from ray.tune.experiment import Trial from ray.tune.execution.trial_runner import TrialRunner from ray.tune import Callback -from ray.tune.utils.callback import create_default_callbacks +from ray.tune.utils.callback import _create_default_callbacks from ray.tune.experiment import Experiment @@ -276,17 +276,17 @@ def get_positions(callbacks): return first_logger_pos, last_logger_pos, syncer_pos # Auto creation of loggers, no callbacks, no syncer - callbacks = create_default_callbacks(None, SyncConfig(), None) + callbacks = _create_default_callbacks(None, SyncConfig(), None) first_logger_pos, last_logger_pos, syncer_pos = get_positions(callbacks) self.assertLess(last_logger_pos, syncer_pos) # Auto creation of loggers with callbacks - callbacks = create_default_callbacks([Callback()], SyncConfig(), None) + callbacks = _create_default_callbacks([Callback()], SyncConfig(), None) first_logger_pos, last_logger_pos, syncer_pos = get_positions(callbacks) self.assertLess(last_logger_pos, syncer_pos) # Auto creation of loggers with existing logger (but no CSV/JSON) - callbacks = create_default_callbacks([LoggerCallback()], SyncConfig(), None) + callbacks = _create_default_callbacks([LoggerCallback()], SyncConfig(), None) first_logger_pos, last_logger_pos, syncer_pos = get_positions(callbacks) self.assertLess(last_logger_pos, syncer_pos) @@ -294,7 +294,7 @@ def get_positions(callbacks): [mc1, mc2, mc3] = [Callback(), Callback(), Callback()] # Has to be legacy logger to avoid logger callback creation lc = LegacyLoggerCallback(logger_classes=DEFAULT_LOGGERS) - callbacks = create_default_callbacks([mc1, mc2, lc, mc3], SyncConfig(), None) + callbacks = _create_default_callbacks([mc1, mc2, lc, mc3], SyncConfig(), None) first_logger_pos, last_logger_pos, syncer_pos = get_positions(callbacks) self.assertLess(last_logger_pos, syncer_pos) self.assertLess(callbacks.index(mc1), callbacks.index(mc2)) diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index 6f11e9113099..108331b650e4 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -7,7 +7,7 @@ from ray.air import RunConfig, Checkpoint, session, FailureConfig from ray.air._internal.remote_storage import download_from_uri from ray.tune import Callback -from ray.tune.execution.trial_runner import find_newest_experiment_checkpoint +from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint from ray.tune.experiment import Trial from ray.tune.tune_config import TuneConfig from ray.tune.tuner import Tuner @@ -322,7 +322,7 @@ def test_tuner_restore_from_cloud(ray_start_2_cpus, tmpdir): assert "tuner.pkl" in remote_contents assert "trainable.pkl" in remote_contents - prev_cp = find_newest_experiment_checkpoint(str(check_path / "exp_dir")) + prev_cp = _find_newest_experiment_checkpoint(str(check_path / "exp_dir")) prev_lstat = os.lstat(prev_cp) (tmpdir / "ray_results").remove(ignore_errors=True) @@ -335,7 +335,7 @@ def test_tuner_restore_from_cloud(ray_start_2_cpus, tmpdir): assert "tuner.pkl" in local_contents assert "trainable.pkl" in local_contents - after_cp = find_newest_experiment_checkpoint( + after_cp = _find_newest_experiment_checkpoint( str(tmpdir / "ray_results" / "exp_dir") ) after_lstat = os.lstat(after_cp) diff --git a/python/ray/tune/tests/test_var.py b/python/ray/tune/tests/test_var.py index d4d6621b1d95..ce08852ea3da 100644 --- a/python/ray/tune/tests/test_var.py +++ b/python/ray/tune/tests/test_var.py @@ -11,7 +11,7 @@ from ray.tune.search import grid_search, BasicVariantGenerator from ray.tune.search.variant_generator import ( RecursiveDependencyError, - resolve_nested_dict, + _resolve_nested_dict, ) @@ -337,7 +337,7 @@ def test_resolve_dict(self): }, "b": {"a": 3}, } - resolved = resolve_nested_dict(config) + resolved = _resolve_nested_dict(config) for k, v in [(("a", "b"), 1), (("a", "c"), 2), (("b", "a"), 3)]: self.assertEqual(resolved.get(k), v) diff --git a/python/ray/tune/trainable/function_trainable.py b/python/ray/tune/trainable/function_trainable.py index dc6d1d4c7270..20a3e76137f8 100644 --- a/python/ray/tune/trainable/function_trainable.py +++ b/python/ray/tune/trainable/function_trainable.py @@ -26,9 +26,9 @@ ) from ray.tune.trainable import Trainable, TrainableUtil from ray.tune.utils import ( - detect_checkpoint_function, - detect_config_single, - detect_reporter, + _detect_checkpoint_function, + _detect_config_single, + _detect_reporter, ) from ray.util.annotations import DeveloperAPI from ray.util.debug import log_once @@ -347,7 +347,7 @@ def setup(self, config): ) self._last_result = {} - session.init(self._status_reporter) + session._init(self._status_reporter) self._runner = None self._restore_tmpdir = None self.temp_checkpoint_dir = None @@ -551,7 +551,7 @@ def cleanup(self): # Check for any errors that might have been missed. self._report_thread_runner_error() - session.shutdown() + session._shutdown() if self.temp_checkpoint_dir is not None and os.path.exists( self.temp_checkpoint_dir @@ -591,6 +591,7 @@ def _report_thread_runner_error(self, block=False): pass +@DeveloperAPI def wrap_function( train_func: Callable[[Any], Any], warn: bool = True, name: Optional[str] = None ) -> Type["FunctionTrainable"]: @@ -600,9 +601,9 @@ def wrap_function( inherit_from = train_func.__mixins__ + inherit_from func_args = inspect.getfullargspec(train_func).args - use_checkpoint = detect_checkpoint_function(train_func) - use_config_single = detect_config_single(train_func) - use_reporter = detect_reporter(train_func) + use_checkpoint = _detect_checkpoint_function(train_func) + use_config_single = _detect_config_single(train_func) + use_reporter = _detect_reporter(train_func) if not any([use_checkpoint, use_config_single, use_reporter]): # use_reporter is hidden diff --git a/python/ray/tune/trainable/session.py b/python/ray/tune/trainable/session.py index bd777b3befa3..d286d087f123 100644 --- a/python/ray/tune/trainable/session.py +++ b/python/ray/tune/trainable/session.py @@ -89,7 +89,7 @@ def get_session(): return _session -def init(reporter, ignore_reinit_error=True): +def _init(reporter, ignore_reinit_error=True): """Initializes the global trial context for this process.""" global _session global _session_v2 @@ -122,8 +122,8 @@ def init(reporter, ignore_reinit_error=True): from ray import actor, remote_function if "TUNE_DISABLE_RESOURCE_CHECKS" not in os.environ: - actor._actor_launch_hook = tune_task_and_actor_launch_hook - remote_function._task_launch_hook = tune_task_and_actor_launch_hook + actor._actor_launch_hook = _tune_task_and_actor_launch_hook + remote_function._task_launch_hook = _tune_task_and_actor_launch_hook _session = reporter _session_v2 = _TuneSessionImpl(status_reporter=reporter) @@ -133,7 +133,7 @@ def init(reporter, ignore_reinit_error=True): _checked_resources: Set[frozenset] = set() -def tune_task_and_actor_launch_hook( +def _tune_task_and_actor_launch_hook( fn, resources: Dict[str, float], strategy: Optional[SchedulingStrategyT] ): """Launch hook to catch nested tasks that can't fit in the placement group. @@ -197,7 +197,7 @@ def tune_task_and_actor_launch_hook( ) -def shutdown(): +def _shutdown(): """Cleans up the trial and removes it from the global context.""" global _session diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index ed3af247be9d..5a21dd7b457f 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -46,8 +46,8 @@ from ray.tune.trainable.util import TrainableUtil from ray.tune.utils.util import ( Tee, - delete_external_checkpoint, - get_checkpoint_from_remote_node, + _delete_external_checkpoint, + _get_checkpoint_from_remote_node, retry_fn, ) from ray.util.annotations import PublicAPI @@ -621,7 +621,7 @@ def restore( # And the source IP is different to the current IP and checkpoint_node_ip != ray.util.get_node_ip_address() ): - checkpoint = get_checkpoint_from_remote_node( + checkpoint = _get_checkpoint_from_remote_node( checkpoint_path, checkpoint_node_ip ) if checkpoint: @@ -715,7 +715,7 @@ def delete_checkpoint(self, checkpoint_path: Union[str, Checkpoint]): else: checkpoint_uri = self._storage_path(checkpoint_dir) retry_fn( - lambda: delete_external_checkpoint(checkpoint_uri), + lambda: _delete_external_checkpoint(checkpoint_uri), subprocess.CalledProcessError, num_retries=3, sleep_time=1, diff --git a/python/ray/tune/trainable/util.py b/python/ray/tune/trainable/util.py index 73ec3b0ca6ee..12e1ed642532 100644 --- a/python/ray/tune/trainable/util.py +++ b/python/ray/tune/trainable/util.py @@ -15,7 +15,7 @@ ) from ray.tune.registry import _ParameterRegistry from ray.tune.resources import Resources -from ray.tune.utils import detect_checkpoint_function +from ray.tune.utils import _detect_checkpoint_function from ray.util import placement_group from ray.util.annotations import DeveloperAPI, PublicAPI @@ -342,7 +342,7 @@ def setup(self, config): return _Inner else: # Function trainable - use_checkpoint = detect_checkpoint_function(trainable, partial=True) + use_checkpoint = _detect_checkpoint_function(trainable, partial=True) keys = list(kwargs.keys()) def inner(config, checkpoint_dir=None): diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index f30142a1afbd..f1e56a8fa3e2 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -12,12 +12,12 @@ from ray.tune.analysis import ExperimentAnalysis from ray.tune.callback import Callback from ray.tune.error import TuneError -from ray.tune.experiment import Experiment, convert_to_experiment_list +from ray.tune.experiment import Experiment, _convert_to_experiment_list from ray.tune.progress_reporter import ( ProgressReporter, RemoteReporterMixin, - detect_reporter, - detect_progress_metrics, + _detect_reporter, + _detect_progress_metrics, ) from ray.tune.execution.ray_trial_executor import RayTrialExecutor from ray.tune.registry import get_trainable_cls, is_function_trainable @@ -31,7 +31,7 @@ TrialScheduler, ) from ray.tune.schedulers.util import ( - set_search_properties_backwards_compatible as scheduler_set_search_props, + _set_search_properties_backwards_compatible as scheduler_set_search_props, ) from ray.tune.stopper import Stopper from ray.tune.search import ( @@ -43,16 +43,16 @@ create_searcher, ) from ray.tune.search.util import ( - set_search_properties_backwards_compatible as searcher_set_search_props, + _set_search_properties_backwards_compatible as searcher_set_search_props, ) -from ray.tune.search.variant_generator import has_unresolved_values +from ray.tune.search.variant_generator import _has_unresolved_values from ray.tune.syncer import SyncConfig, SyncerCallback, _validate_upload_dir from ray.tune.trainable import Trainable from ray.tune.experiment import Trial from ray.tune.execution.trial_runner import TrialRunner -from ray.tune.utils.callback import create_default_callbacks +from ray.tune.utils.callback import _create_default_callbacks from ray.tune.utils.log import Verbosity, has_verbosity, set_verbosity -from ray.tune.utils.node import force_on_current_node +from ray.tune.utils.node import _force_on_current_node from ray.tune.execution.placement_groups import PlacementGroupFactory from ray.util.annotations import PublicAPI from ray.util.queue import Empty, Queue @@ -377,10 +377,10 @@ class and registered trainables. remote_run = ray.remote(num_cpus=0)(run) # Make sure tune.run is called on the sever node. - remote_run = force_on_current_node(remote_run) + remote_run = _force_on_current_node(remote_run) set_verbosity(verbose) - progress_reporter = progress_reporter or detect_reporter() + progress_reporter = progress_reporter or _detect_reporter() # JupyterNotebooks don't work with remote tune runs out of the box # (e.g. via Ray client) as they don't have access to the main @@ -388,7 +388,7 @@ class and registered trainables. # strings, which will then be displayed on the driver side. if isinstance(progress_reporter, RemoteReporterMixin): string_queue = Queue( - actor_options={"num_cpus": 0, **force_on_current_node(None)} + actor_options={"num_cpus": 0, **_force_on_current_node(None)} ) progress_reporter.output_queue = string_queue @@ -602,7 +602,7 @@ def _handle_string_queue(): config, **experiments[0].public_spec, ): - if has_unresolved_values(config): + if _has_unresolved_values(config): raise ValueError( "You passed a `config` parameter to `tune.run()` with " "unresolved parameters, but the search algorithm was already " @@ -621,12 +621,10 @@ def _handle_string_queue(): "from your scheduler or from your call to `tune.run()`" ) - progress_metrics = detect_progress_metrics(_get_trainable(run_or_experiment)) + progress_metrics = _detect_progress_metrics(_get_trainable(run_or_experiment)) # Create syncer callbacks - callbacks = create_default_callbacks( - callbacks, sync_config, metric=metric, progress_metrics=progress_metrics - ) + callbacks = _create_default_callbacks(callbacks, sync_config, metric=metric) runner = TrialRunner( search_alg=search_alg, @@ -710,7 +708,7 @@ def signal_interrupt_tune_run(sig: int, frame): if hasattr(signal, "SIGUSR1"): signal.signal(signal.SIGUSR1, signal_interrupt_tune_run) - progress_reporter = progress_reporter or detect_reporter() + progress_reporter = progress_reporter or _detect_reporter() tune_start = time.time() @@ -822,7 +820,7 @@ def run_experiments( remote_run = ray.remote(num_cpus=0)(run_experiments) # Make sure tune.run_experiments is run on the server node. - remote_run = force_on_current_node(remote_run) + remote_run = _force_on_current_node(remote_run) return ray.get( remote_run.remote( @@ -844,7 +842,7 @@ def run_experiments( # This is important to do this here # because it schematize the experiments # and it conducts the implicit registration. - experiments = convert_to_experiment_list(experiments) + experiments = _convert_to_experiment_list(experiments) if concurrent: return run( diff --git a/python/ray/tune/tuner.py b/python/ray/tune/tuner.py index 414cbf1e2075..6c72529bd94d 100644 --- a/python/ray/tune/tuner.py +++ b/python/ray/tune/tuner.py @@ -9,7 +9,7 @@ from ray.tune.trainable import Trainable from ray.tune.impl.tuner_internal import TunerInternal from ray.tune.tune_config import TuneConfig -from ray.tune.utils.node import force_on_current_node +from ray.tune.utils.node import _force_on_current_node from ray.util import PublicAPI if TYPE_CHECKING: @@ -143,7 +143,7 @@ def __init__( if not self._is_ray_client: self._local_tuner = TunerInternal(**kwargs) else: - self._remote_tuner = force_on_current_node( + self._remote_tuner = _force_on_current_node( ray.remote(num_cpus=0)(TunerInternal) ).remote(**kwargs) @@ -204,7 +204,7 @@ def restore( ) return Tuner(_tuner_internal=tuner_internal) else: - tuner_internal = force_on_current_node( + tuner_internal = _force_on_current_node( ray.remote(num_cpus=0)(TunerInternal) ).remote(restore_path=path, resume_config=resume_config) return Tuner(_tuner_internal=tuner_internal) diff --git a/python/ray/tune/utils/__init__.py b/python/ray/tune/utils/__init__.py index a1159134f0dc..d7df5b272563 100644 --- a/python/ray/tune/utils/__init__.py +++ b/python/ray/tune/utils/__init__.py @@ -8,9 +8,9 @@ validate_save_restore, warn_if_slow, diagnose_serialization, - detect_checkpoint_function, - detect_reporter, - detect_config_single, + _detect_checkpoint_function, + _detect_reporter, + _detect_config_single, wait_for_gpu, ) @@ -24,8 +24,8 @@ "validate_save_restore", "warn_if_slow", "diagnose_serialization", - "detect_checkpoint_function", - "detect_reporter", - "detect_config_single", + "_detect_checkpoint_function", + "_detect_reporter", + "_detect_config_single", "wait_for_gpu", ] diff --git a/python/ray/tune/utils/callback.py b/python/ray/tune/utils/callback.py index d5b3f2e63cd8..6bf9025ab895 100644 --- a/python/ray/tune/utils/callback.py +++ b/python/ray/tune/utils/callback.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) -def create_default_callbacks( +def _create_default_callbacks( callbacks: Optional[List[Callback]], sync_config: SyncConfig, metric: Optional[str] = None, diff --git a/python/ray/tune/utils/file_transfer.py b/python/ray/tune/utils/file_transfer.py index b2dd4ddc8868..91041ab06c6c 100644 --- a/python/ray/tune/utils/file_transfer.py +++ b/python/ray/tune/utils/file_transfer.py @@ -6,6 +6,7 @@ from typing import Optional, Tuple, Dict, Generator, Union import ray +from ray.util.annotations import DeveloperAPI from ray.air._internal.filelock import TempFileLock @@ -13,6 +14,7 @@ _DEFAULT_MAX_SIZE_BYTES = 1 * 1024 * 1024 * 1024 # 1 GiB +@DeveloperAPI def sync_dir_between_nodes( source_ip: str, source_path: str, @@ -174,6 +176,7 @@ def _sync_dir_between_different_nodes( return ray.get(unpack_future) +@DeveloperAPI def delete_on_node( node_ip: str, path: str, return_future: bool = False ) -> Union[bool, ray.ObjectRef]: diff --git a/python/ray/tune/utils/log.py b/python/ray/tune/utils/log.py index ab11939d453e..44489f591f1d 100644 --- a/python/ray/tune/utils/log.py +++ b/python/ray/tune/utils/log.py @@ -2,6 +2,7 @@ from typing import Union from ray.util import PublicAPI +from ray.util.annotations import DeveloperAPI @PublicAPI @@ -18,6 +19,7 @@ def __int__(self): verbosity: Union[int, Verbosity] = Verbosity.V3_TRIAL_DETAILS +@DeveloperAPI def set_verbosity(level: Union[int, Verbosity]): global verbosity @@ -27,6 +29,7 @@ def set_verbosity(level: Union[int, Verbosity]): verbosity = level +@DeveloperAPI def has_verbosity(level: Union[int, Verbosity]) -> bool: """Return True if passed level exceeds global verbosity level.""" global verbosity @@ -37,6 +40,7 @@ def has_verbosity(level: Union[int, Verbosity]) -> bool: return verbosity_level >= log_level +@DeveloperAPI def disable_ipython(): """Disable output of IPython HTML objects.""" try: diff --git a/python/ray/tune/utils/node.py b/python/ray/tune/utils/node.py index ef16c6b29f2b..418b16649453 100644 --- a/python/ray/tune/utils/node.py +++ b/python/ray/tune/utils/node.py @@ -20,7 +20,7 @@ def _get_current_node_resource_key() -> str: raise ValueError("Cannot found the node dictionary for current node.") -def force_on_current_node(task_or_actor=None): +def _force_on_current_node(task_or_actor=None): """Given a task or actor, place it on the current node. If using Ray Client, the current node is the client server node. Args: diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 6eb0c1233779..c706e1cb1910 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -5,18 +5,17 @@ import os import threading import time -import uuid from collections import defaultdict from datetime import datetime from threading import Thread -from typing import Dict, List, Union, Type, Callable, Any -from typing import Optional +from typing import Dict, List, Union, Type, Callable, Any, Optional import numpy as np import psutil import ray from ray.air.checkpoint import Checkpoint from ray.air._internal.remote_storage import delete_at_uri +from ray.util.annotations import DeveloperAPI, PublicAPI from ray.air._internal.json import SafeFallbackEncoder # noqa from ray.air._internal.util import ( # noqa: F401 is_nan, @@ -47,6 +46,7 @@ def _import_gputil(): START_OF_TIME = time.time() +@DeveloperAPI class UtilMonitor(Thread): """Class for system usage utilization monitoring. @@ -121,6 +121,7 @@ def stop(self): self.stopped = True +@DeveloperAPI def retry_fn( fn: Callable[[], Any], exception_type: Type[Exception], @@ -143,7 +144,7 @@ def _serialize_checkpoint(checkpoint_path) -> bytes: return checkpoint.to_bytes() -def get_checkpoint_from_remote_node( +def _get_checkpoint_from_remote_node( checkpoint_path: str, node_ip: str, timeout: float = 300.0 ) -> Optional[Checkpoint]: if not any( @@ -169,10 +170,11 @@ def get_checkpoint_from_remote_node( return Checkpoint.from_bytes(checkpoint_data) -def delete_external_checkpoint(checkpoint_uri: str): +def _delete_external_checkpoint(checkpoint_uri: str): delete_at_uri(checkpoint_uri) +@DeveloperAPI class warn_if_slow: """Prints a warning if a given operation is slower than 500ms. @@ -216,6 +218,7 @@ def __exit__(self, type, value, traceback): logger.warning(self.message.format(name=self.name, duration=duration)) +@DeveloperAPI class Tee(object): def __init__(self, stream1, stream2): self.stream1 = stream1 @@ -264,6 +267,7 @@ def tell(self, *args, **kwargs): raise NotImplementedError +@DeveloperAPI def date_str(): return datetime.today().strftime("%Y-%m-%d_%H-%M-%S") @@ -284,6 +288,7 @@ def _from_pinnable(obj): return obj[0] +@PublicAPI(stability="alpha") def diagnose_serialization(trainable: Callable): """Utility for detecting why your trainable function isn't serializing. @@ -318,13 +323,13 @@ def test(): assert diagnose_serialization(test) is True """ - from ray.tune.registry import register_trainable, check_serializability + from ray.tune.registry import register_trainable, _check_serializability def check_variables(objects, failure_set, printer): for var_name, variable in objects.items(): msg = None try: - check_serializability(var_name, variable) + _check_serializability(var_name, variable) status = "PASSED" except Exception as e: status = "FAILED" @@ -379,7 +384,7 @@ def check_variables(objects, failure_set, printer): return failure_set -def atomic_save(state: Dict, checkpoint_dir: str, file_name: str, tmp_file_name: str): +def _atomic_save(state: Dict, checkpoint_dir: str, file_name: str, tmp_file_name: str): """Atomically saves the state object to the checkpoint directory. This is automatically used by Tuner().fit during a Tune job. @@ -399,7 +404,7 @@ def atomic_save(state: Dict, checkpoint_dir: str, file_name: str, tmp_file_name: os.replace(tmp_search_ckpt_path, os.path.join(checkpoint_dir, file_name)) -def load_newest_checkpoint(dirpath: str, ckpt_pattern: str) -> dict: +def _load_newest_checkpoint(dirpath: str, ckpt_pattern: str) -> Optional[Dict]: """Returns the most recently modified checkpoint. Assumes files are saved with an ordered name, most likely by @@ -424,6 +429,7 @@ def load_newest_checkpoint(dirpath: str, ckpt_pattern: str) -> dict: return checkpoint_state +@PublicAPI(stability="alpha") def wait_for_gpu( gpu_id: Optional[Union[int, str]] = None, target_util: float = 0.01, @@ -524,6 +530,7 @@ def gpu_id_fn(g): raise RuntimeError("GPU memory was not freed.") +@PublicAPI(stability="alpha") def validate_save_restore( trainable_cls: Type, config: Optional[Dict] = None, @@ -571,7 +578,7 @@ def validate_save_restore( return True -def detect_checkpoint_function(train_func, abort=False, partial=False): +def _detect_checkpoint_function(train_func, abort=False, partial=False): """Use checkpointing if any arg has "checkpoint_dir" and args = 2""" func_sig = inspect.signature(train_func) validated = True @@ -595,7 +602,7 @@ def detect_checkpoint_function(train_func, abort=False, partial=False): return validated -def detect_reporter(func): +def _detect_reporter(func): """Use reporter if any arg has "reporter" and args = 2""" func_sig = inspect.signature(func) use_reporter = True @@ -607,7 +614,7 @@ def detect_reporter(func): return use_reporter -def detect_config_single(func): +def _detect_config_single(func): """Check if func({}) works.""" func_sig = inspect.signature(func) use_config_single = True @@ -619,33 +626,7 @@ def detect_config_single(func): return use_config_single -def create_logdir(dirname: str, local_dir: str): - """Create an empty logdir with name `dirname` in `local_dir`. - - If `local_dir`/`dirname` already exists, a unique string is appended - to the dirname. - - Args: - dirname: Dirname to create in `local_dir` - local_dir: Root directory for the log dir - - Returns: - full path to the newly created logdir. - """ - local_dir = os.path.expanduser(local_dir) - logdir = os.path.join(local_dir, dirname) - if os.path.exists(logdir): - old_dirname = dirname - dirname += "_" + uuid.uuid4().hex[:4] - logger.info( - f"Creating a new dirname {dirname} because " - f"trial dirname '{old_dirname}' already exists." - ) - logdir = os.path.join(local_dir, dirname) - os.makedirs(logdir, exist_ok=True) - return logdir - - +@PublicAPI() def validate_warmstart( parameter_names: List[str], points_to_evaluate: List[Union[List, Dict]], diff --git a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py index 28eb96651c8a..3ede1ee3177f 100644 --- a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py +++ b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py @@ -13,7 +13,7 @@ from filelock import FileLock from ray import serve, tune, train from ray.train import Trainer -from ray.tune.utils.node import force_on_current_node +from ray.tune.utils.node import _force_on_current_node from torch.utils.data import DataLoader, Subset from torchvision.datasets import MNIST from torchvision.models import resnet18 @@ -118,7 +118,7 @@ def train_mnist(test_mode=False, num_workers=1, use_gpu=False): def get_remote_model(remote_model_checkpoint_path): if ray.util.client.ray.is_connected(): remote_load = ray.remote(get_model) - remote_load = force_on_current_node(remote_load) + remote_load = _force_on_current_node(remote_load) return ray.get(remote_load.remote(remote_model_checkpoint_path)) else: get_best_model_remote = ray.remote(get_model) diff --git a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py index f56704e3210a..4eb194707e50 100644 --- a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py +++ b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py @@ -45,7 +45,7 @@ import ray import ray.cloudpickle as pickle -from ray.tune.execution.trial_runner import find_newest_experiment_checkpoint +from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint from ray.tune.utils.serialization import TuneFunctionDecoder TUNE_SCRIPT = os.path.join(os.path.dirname(__file__), "_tune_script.py") @@ -544,7 +544,7 @@ def fetch_bucket_contents_to_tmp_dir(bucket: str) -> str: def load_experiment_checkpoint_from_state_file( experiment_dir: str, ) -> ExperimentStateCheckpoint: - newest_ckpt_path = find_newest_experiment_checkpoint(experiment_dir) + newest_ckpt_path = _find_newest_experiment_checkpoint(experiment_dir) with open(newest_ckpt_path, "r") as f: runner_state = json.load(f, cls=TuneFunctionDecoder) diff --git a/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py b/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py index ce1ca92303df..83ce6c067cf6 100644 --- a/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py +++ b/release/tune_tests/scalability_tests/workloads/test_result_throughput_cluster.py @@ -16,7 +16,7 @@ import ray from ray import tune -from ray.tune.execution.cluster_info import is_ray_cluster +from ray.tune.execution.cluster_info import _is_ray_cluster from ray.tune.utils.release_test_util import timed_tune_run @@ -33,7 +33,7 @@ def main(): max_runtime = 130 - if is_ray_cluster(): + if _is_ray_cluster(): # Add constant overhead for SSH connection max_runtime = 130 diff --git a/rllib/train.py b/rllib/train.py index bdbc4e4e267c..4f66d2507205 100755 --- a/rllib/train.py +++ b/rllib/train.py @@ -6,7 +6,7 @@ import yaml import ray -from ray.tune.experiment.config_parser import make_parser +from ray.tune.experiment.config_parser import _make_parser from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.resources import resources_to_json from ray.tune.tune import run_experiments @@ -33,7 +33,7 @@ def create_parser(parser_creator=None): - parser = make_parser( + parser = _make_parser( parser_creator=parser_creator, formatter_class=argparse.RawDescriptionHelpFormatter, description="Train a reinforcement learning agent.",