From 1a9629a36e29b8a13158ea7d9636d93134795f12 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Thu, 16 Mar 2023 19:32:31 -0700 Subject: [PATCH] [tune] Cleanup path-related properties in experiment classes (#33370) The naming of different path-related components in Ray Tune is currently messy. For instance, `Experiment.local_dir` refers to main results directory, e.g. `~/ray_results`, while `Trial.local_dir` refers to the experiment directory, e.g. `~/ray_results/experiment`. The same is true for properties, where it's unclear if it refers to the object's sub directory or to its parent directory. To disentangle this information, this PR introduces a new naming convention. - All entities receiving a "parent path" receive it in a unambiguous naming scheme. - For instance, `Experiment(storage_path)`, `TrialRunner(experiment_path)`, `Trial(experiment_path)` - Outputs are also normalized. E.g. `Trial.remote_experiment_path` and `Trial.local_experiment_path` - We keep existing arguments and properties for backwards compatibility Signed-off-by: Kai Fricke Signed-off-by: Jack He --- python/ray/_private/test_utils.py | 9 + python/ray/air/integrations/mlflow.py | 2 +- python/ray/air/integrations/wandb.py | 2 +- .../ray/air/tests/mocked_wandb_integration.py | 2 +- .../ray/air/tests/test_integration_mlflow.py | 2 +- .../ray/air/tests/test_integration_wandb.py | 2 +- .../ray/tune/analysis/experiment_analysis.py | 23 ++- python/ray/tune/callback.py | 2 +- .../ray/tune/execution/ray_trial_executor.py | 8 +- python/ray/tune/execution/trial_runner.py | 94 +++++---- python/ray/tune/experiment/config_parser.py | 5 +- python/ray/tune/experiment/experiment.py | 95 ++++++--- python/ray/tune/experiment/trial.py | 186 +++++++++++++----- python/ray/tune/logger/aim.py | 10 +- python/ray/tune/logger/csv.py | 4 +- python/ray/tune/logger/json.py | 8 +- python/ray/tune/logger/logger.py | 4 +- python/ray/tune/logger/tensorboardx.py | 4 +- python/ray/tune/progress_reporter.py | 4 +- python/ray/tune/result.py | 2 + python/ray/tune/result_grid.py | 4 +- python/ray/tune/schedulers/pbt.py | 8 +- python/ray/tune/syncer.py | 4 +- python/ray/tune/tests/test_actor_reuse.py | 26 ++- python/ray/tune/tests/test_api.py | 38 ++-- python/ray/tune/tests/test_cluster.py | 2 +- .../tune/tests/test_experiment_analysis.py | 4 +- .../test_integration_pytorch_lightning.py | 4 +- python/ray/tune/tests/test_logger.py | 25 ++- .../ray/tune/tests/test_progress_reporter.py | 8 +- python/ray/tune/tests/test_result_grid.py | 2 +- python/ray/tune/tests/test_run_experiment.py | 18 +- python/ray/tune/tests/test_syncer_callback.py | 1 + .../tune/tests/test_trial_relative_logdir.py | 14 +- python/ray/tune/tests/test_trial_runner.py | 2 +- python/ray/tune/tests/test_trial_runner_2.py | 8 +- python/ray/tune/tests/test_trial_runner_3.py | 6 +- python/ray/tune/tests/test_trial_scheduler.py | 14 +- .../ray/tune/tests/test_tune_save_restore.py | 2 +- python/ray/tune/trainable/trainable.py | 10 +- python/ray/tune/trainable/util.py | 7 +- python/ray/tune/tune.py | 4 +- .../cloud_tests/workloads/run_cloud_test.py | 10 +- 43 files changed, 443 insertions(+), 246 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index dd78ab87ab57..8a73a0e60b26 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1595,6 +1595,12 @@ def simulate_storage( elif storage_type == "s3": from moto.server import ThreadedMotoServer + old_env = os.environ + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + root = root or uuid.uuid4().hex s3_server = f"http://localhost:{port}" server = ThreadedMotoServer(port=port) @@ -1602,6 +1608,9 @@ def simulate_storage( url = f"s3://{root}?region={region}&endpoint_override={s3_server}" yield url server.stop() + + os.environ = old_env + else: raise NotImplementedError(f"Unknown storage type: {storage_type}") diff --git a/python/ray/air/integrations/mlflow.py b/python/ray/air/integrations/mlflow.py index e4b3129797d1..eebd74c7ef1d 100644 --- a/python/ray/air/integrations/mlflow.py +++ b/python/ray/air/integrations/mlflow.py @@ -310,7 +310,7 @@ def log_trial_end(self, trial: "Trial", failed: bool = False): # Log the artifact if set_artifact is set to True. if self.should_save_artifact: - self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.logdir) + self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.local_path) # Stop the run once trial finishes. status = "FINISHED" if not failed else "FAILED" diff --git a/python/ray/air/integrations/wandb.py b/python/ray/air/integrations/wandb.py index 22b149c1fa36..e998a264785a 100644 --- a/python/ray/air/integrations/wandb.py +++ b/python/ray/air/integrations/wandb.py @@ -653,7 +653,7 @@ def _start_logging_actor( actor_options={"num_cpus": 0, **_force_on_current_node()} ) self._trial_logging_actors[trial] = self._remote_logger_class.remote( - logdir=trial.logdir, + logdir=trial.local_path, queue=self._trial_queues[trial], exclude=exclude_results, to_config=self.AUTO_CONFIG_KEYS, diff --git a/python/ray/air/tests/mocked_wandb_integration.py b/python/ray/air/tests/mocked_wandb_integration.py index 49b50f6d63db..75a301f3c8f2 100644 --- a/python/ray/air/tests/mocked_wandb_integration.py +++ b/python/ray/air/tests/mocked_wandb_integration.py @@ -20,7 +20,7 @@ class Trial( "trial_name", "experiment_dir_name", "placement_group_factory", - "logdir", + "local_path", ], ) ): diff --git a/python/ray/air/tests/test_integration_mlflow.py b/python/ray/air/tests/test_integration_mlflow.py index dc08b8255800..a8a266c35aad 100644 --- a/python/ray/air/tests/test_integration_mlflow.py +++ b/python/ray/air/tests/test_integration_mlflow.py @@ -19,7 +19,7 @@ class MockTrial( - namedtuple("MockTrial", ["config", "trial_name", "trial_id", "logdir"]) + namedtuple("MockTrial", ["config", "trial_name", "trial_id", "local_path"]) ): def __hash__(self): return hash(self.trial_id) diff --git a/python/ray/air/tests/test_integration_wandb.py b/python/ray/air/tests/test_integration_wandb.py index f62504a8c4ed..b82dddbd6ca8 100644 --- a/python/ray/air/tests/test_integration_wandb.py +++ b/python/ray/air/tests/test_integration_wandb.py @@ -334,7 +334,7 @@ def test_wandb_logger_exclude_config(self): trial_name="trial_0", experiment_dir_name="trainable", placement_group_factory=PlacementGroupFactory([{"CPU": 1}]), - logdir=tempfile.gettempdir(), + local_path=tempfile.gettempdir(), ) logger = WandbTestExperimentLogger( project="test_project", diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index da57fe46c949..f9ce5fa5e46d 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -100,8 +100,7 @@ def __init__( # If only a mode was passed, use anonymous metric self.default_metric = DEFAULT_METRIC - self._local_base_dir = self._checkpoints_and_paths[0][1].parent - + self._local_experiment_path = self._checkpoints_and_paths[0][1] if not pd: logger.warning( "pandas not installed. Run `pip install pandas` for " @@ -110,15 +109,19 @@ def __init__( else: self.fetch_trial_dataframes() - self._sync_config = sync_config + remote_storage_path = None + if sync_config and sync_config.upload_dir: + remote_storage_path = sync_config.upload_dir + + self._remote_storage_path = remote_storage_path def _parse_cloud_path(self, local_path: str): """Convert local path into cloud storage path""" - if not self._sync_config or not self._sync_config.upload_dir: + if not self._remote_storage_path: return None return local_path.replace( - str(self._local_base_dir), self._sync_config.upload_dir + str(Path(self._local_experiment_path).parent), self._remote_storage_path ) def _load_checkpoints(self, experiment_checkpoint_path: str) -> List[str]: @@ -686,7 +689,7 @@ def get_best_logdir( based on `mode`, and compare trials based on `mode=[min,max]`. """ best_trial = self.get_best_trial(metric, mode, scope) - return best_trial.logdir if best_trial else None + return best_trial.local_path if best_trial else None def get_last_checkpoint(self, trial=None, metric="training_iteration", mode="max"): """Gets the last persistent checkpoint path of the provided trial, @@ -783,8 +786,8 @@ def runner_data(self) -> Dict: def _get_trial_paths(self) -> List[str]: if self.trials: # We do not need to set the relative path here - # Maybe assert that t.logdir is in local_base_path? - _trial_paths = [str(t.logdir) for t in self.trials] + # Maybe assert that t.local_path is in local_base_path? + _trial_paths = [str(t.local_path) for t in self.trials] else: logger.info( "No `self.trials`. Drawing logdirs from checkpoint " @@ -795,7 +798,7 @@ def _get_trial_paths(self) -> List[str]: for trial_json_state, path in self._checkpoints_and_paths: try: trial = Trial.from_json_state(trial_json_state, stub=True) - trial.local_dir = str(path) + trial.local_experiment_path = str(path) except Exception: logger.warning( f"Could not load trials from experiment checkpoint. " @@ -808,7 +811,7 @@ def _get_trial_paths(self) -> List[str]: self.trials.append(trial) self.trials.sort(key=lambda trial: trial.trial_id) - _trial_paths = [str(trial.logdir) for trial in self.trials] + _trial_paths = [str(trial.local_path) for trial in self.trials] if not _trial_paths: raise TuneError("No trials found.") diff --git a/python/ray/tune/callback.py b/python/ray/tune/callback.py index 2961e89684df..da071f1c7db3 100644 --- a/python/ray/tune/callback.py +++ b/python/ray/tune/callback.py @@ -108,7 +108,7 @@ def train(config): """ # File templates for any artifacts written by this callback - # These files should live in the `trial.logdir` for each trial. + # These files should live in the `trial.local_path` for each trial. # TODO(ml-team): Make this more visible to users to override. Internal use for now. _SAVED_FILE_TEMPLATES = [] diff --git a/python/ray/tune/execution/ray_trial_executor.py b/python/ray/tune/execution/ray_trial_executor.py index a2a3598a02b3..301ab7aa8dfc 100644 --- a/python/ray/tune/execution/ray_trial_executor.py +++ b/python/ray/tune/execution/ray_trial_executor.py @@ -747,7 +747,7 @@ def reset_trial( trainable.reset.remote( extra_config, logger_creator=logger_creator, - remote_checkpoint_dir=trial.remote_checkpoint_dir, + remote_checkpoint_dir=trial.remote_path, ), timeout=DEFAULT_GET_TIMEOUT, ) @@ -938,8 +938,8 @@ def save( metrics=result, local_to_remote_path_fn=partial( TrainableUtil.get_remote_storage_path, - logdir=trial.logdir, - remote_checkpoint_dir=trial.remote_checkpoint_dir, + logdir=trial.local_path, + remote_checkpoint_dir=trial.remote_path, ) if trial.uses_cloud_checkpointing else None, @@ -1070,7 +1070,7 @@ def _change_working_directory(self, trial): if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE: old_dir = os.getcwd() try: - os.chdir(trial.logdir) + os.chdir(trial.local_path) yield finally: os.chdir(old_dir) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 11224233260e..bc373f5dd07a 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -9,9 +9,9 @@ import warnings import ray +from ray.air._internal.uri_utils import URI from ray.air.config import CheckpointConfig from ray.air._internal.checkpoint_manager import CheckpointStorage -from ray.air._internal.uri_utils import URI from ray.exceptions import RayTaskError from ray.tune.error import _TuneStopTrialError, _TuneRestoreError from ray.tune.execution.experiment_state import ( @@ -83,7 +83,7 @@ class TrialRunner: search_alg: SearchAlgorithm for generating Trial objects. scheduler: Defaults to FIFOScheduler. - local_checkpoint_dir: Path where global experiment state checkpoints + experiment_path: Path where global experiment state checkpoints are saved and restored from. sync_config: See :class:`~ray.tune.syncer.SyncConfig`. Within sync config, the `upload_dir` specifies cloud storage, and @@ -122,7 +122,7 @@ def __init__( search_alg: Optional[SearchAlgorithm] = None, placeholder_resolvers: Optional[Dict[Tuple, Any]] = None, scheduler: Optional[TrialScheduler] = None, - local_checkpoint_dir: Optional[str] = None, + experiment_path: Optional[str] = None, sync_config: Optional[SyncConfig] = None, experiment_dir_name: Optional[str] = None, stopper: Optional[Stopper] = None, @@ -134,6 +134,8 @@ def __init__( callbacks: Optional[List[Callback]] = None, metric: Optional[str] = None, trial_checkpoint_config: Optional[CheckpointConfig] = None, + # Deprecated + local_checkpoint_dir: Optional[str] = None, ): self._search_alg = search_alg or BasicVariantGenerator() self._placeholder_resolvers = placeholder_resolvers @@ -147,6 +149,34 @@ def __init__( self._sync_config = sync_config or SyncConfig() + self._experiment_dir_name = experiment_dir_name + + if local_checkpoint_dir: + if experiment_path: + raise ValueError( + "Only one of `local_checkpoint_dir` or `experiment_path` " + "can be passed to `TrialRunner()`." + ) + + warnings.warn( + "The `local_checkpoint_dir` argument is deprecated and will be " + "removed in the future. Use `experiment_path` instead." + ) + + experiment_path = local_checkpoint_dir + + # Rename for better code readability + local_experiment_path = experiment_path + remote_experiment_path = None + + if self._sync_config.upload_dir and self._experiment_dir_name: + remote_experiment_path = str( + URI(self._sync_config.upload_dir) / self._experiment_dir_name + ) + + self._local_experiment_path = local_experiment_path + self._remote_experiment_path = remote_experiment_path + self.trial_executor.setup( max_pending_trials=self._max_pending_trials, # TODO(ml-team): Remove these in 2.6. @@ -193,12 +223,9 @@ def __init__( self._stop_queue = [] self._should_stop_experiment = False # used by TuneServer - self._local_checkpoint_dir = local_checkpoint_dir - if self._local_checkpoint_dir: - os.makedirs(self._local_checkpoint_dir, exist_ok=True) - - self._experiment_dir_name = experiment_dir_name + if self._local_experiment_path: + os.makedirs(self._local_experiment_path, exist_ok=True) self._stopper = stopper or NoopStopper() @@ -248,7 +275,9 @@ def experiment_state_file_name(self) -> str: @property def experiment_state_path(self) -> str: - return os.path.join(self._local_checkpoint_dir, self.experiment_state_file_name) + return os.path.join( + self._local_experiment_path, self.experiment_state_file_name + ) def setup_experiments( self, experiments: List[Experiment], total_num_samples: int @@ -274,8 +303,8 @@ def end_experiment_callbacks(self) -> None: def _create_checkpoint_manager(self): return _ExperimentCheckpointManager( - local_checkpoint_dir=self._local_checkpoint_dir, - remote_checkpoint_dir=self._remote_checkpoint_dir, + local_checkpoint_dir=self._local_experiment_path, + remote_checkpoint_dir=self._remote_experiment_path, checkpoint_period=self._checkpoint_period, sync_config=self._sync_config, sync_every_n_trial_checkpoints=self._trial_checkpoint_config.num_to_keep, @@ -293,12 +322,6 @@ def search_alg(self): def scheduler_alg(self): return self._scheduler_alg - @property - def _remote_checkpoint_dir(self): - if self._sync_config.upload_dir and self._experiment_dir_name: - return str(URI(self._sync_config.upload_dir) / self._experiment_dir_name) - return None - @classmethod def checkpoint_exists(cls, directory: str) -> bool: if not os.path.exists(directory): @@ -315,7 +338,7 @@ def save_to_dir(self, experiment_dir: Optional[str] = None): This method will save the trial runner state, the searcher state, and the callback states into the experiment directory. """ - experiment_dir = experiment_dir or self._local_checkpoint_dir + experiment_dir = experiment_dir or self._local_experiment_path # Get state from trial executor and runner runner_state = { @@ -341,10 +364,10 @@ def save_to_dir(self, experiment_dir: Optional[str] = None): ) self._search_alg.save_to_dir( - self._local_checkpoint_dir, session_str=self._session_str + self._local_experiment_path, session_str=self._session_str ) self._callbacks.save_to_dir( - self._local_checkpoint_dir, session_str=self._session_str + self._local_experiment_path, session_str=self._session_str ) def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: @@ -357,20 +380,20 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: and the callback states. It will then parse the trial states and return them as a list of Trial objects. """ - experiment_dir = experiment_dir or self._local_checkpoint_dir + experiment_dir = experiment_dir or self._local_experiment_path # Update local checkpoint dir - self._local_checkpoint_dir = experiment_dir + self._local_experiment_path = experiment_dir # Find newest state file newest_state_path = _find_newest_experiment_checkpoint( - self._local_checkpoint_dir + self._local_experiment_path ) if not newest_state_path: raise ValueError( - f"Tried to resume from checkpoint dir " - f"`{self._local_checkpoint_dir}`, but no " + f"Tried to resume experiment from directory " + f"`{self._local_experiment_path}`, but no " f"experiment checkpoint data was found." ) @@ -380,7 +403,7 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: ) logger.warning( - f"Attempting to resume experiment from {self._local_checkpoint_dir}. " + f"Attempting to resume experiment from {self._local_experiment_path}. " "This will ignore any new changes to the specification." ) @@ -392,11 +415,11 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: self.__setstate__(runner_state["runner_data"]) # 2. Restore search algorithm and callback state - if self._search_alg.has_checkpoint(self._local_checkpoint_dir): - self._search_alg.restore_from_dir(self._local_checkpoint_dir) + if self._search_alg.has_checkpoint(self._local_experiment_path): + self._search_alg.restore_from_dir(self._local_experiment_path) - if self._callbacks.can_restore(self._local_checkpoint_dir): - self._callbacks.restore_from_dir(self._local_checkpoint_dir) + if self._callbacks.can_restore(self._local_experiment_path): + self._callbacks.restore_from_dir(self._local_experiment_path) # 3. Load trials trials = [] @@ -405,7 +428,9 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: # The following properties may be updated on restoration # Ex: moved local/cloud experiment directory - trial.local_dir = self._local_checkpoint_dir + # ATTN: Set `local_experiment_path` to update trial checkpoints! + trial.local_experiment_path = self._local_experiment_path + trial.remote_experiment_path = self._remote_experiment_path trial.sync_config = self._sync_config trial.experiment_dir_name = self._experiment_dir_name @@ -413,14 +438,14 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: # since the dir might not be creatable locally. # TODO(ekl) this is kind of a hack. if not ray.util.client.ray.is_connected(): - trial.init_logdir() # Create logdir if it does not exist + trial.init_local_path() # Create logdir if it does not exist trials.append(trial) return trials def checkpoint(self, force: bool = False, wait: bool = False): - """Saves execution state to `self._local_checkpoint_dir`. + """Saves execution state to `self._local_experiment_path`. Overwrites the current session checkpoint, which starts when self is instantiated. Throttle depends on self._checkpoint_period. @@ -1206,7 +1231,8 @@ def __getstate__(self): "trial_executor", "_callbacks", "_checkpoint_manager", - "_local_checkpoint_dir", + "_local_experiment_path", + "_remote_experiment_path", "_sync_config", "_experiment_dir_name", "_insufficient_resources_manager", diff --git a/python/ray/tune/experiment/config_parser.py b/python/ray/tune/experiment/config_parser.py index c163a125131c..9dbd51427004 100644 --- a/python/ray/tune/experiment/config_parser.py +++ b/python/ray/tune/experiment/config_parser.py @@ -1,6 +1,5 @@ import argparse import json -import os # For compatibility under py2 to consider unicode as str from ray.air import CheckpointConfig @@ -199,7 +198,7 @@ def _create_trial_from_spec( if resources: trial_kwargs["placement_group_factory"] = resources - experiment_dir_name = spec.get("experiment_dir_name") + experiment_dir_name = spec.get("experiment_dir_name") or output_path sync_config = spec.get("sync_config", SyncConfig()) if ( @@ -223,9 +222,9 @@ def _create_trial_from_spec( trainable_name=spec["run"], # json.load leads to str -> unicode in py2.7 config=spec.get("config", {}), - local_dir=os.path.join(spec["local_dir"], output_path), # json.load leads to str -> unicode in py2.7 stopping_criterion=spec.get("stop", {}), + experiment_path=spec["experiment_path"], experiment_dir_name=experiment_dir_name, sync_config=sync_config, checkpoint_config=checkpoint_config, diff --git a/python/ray/tune/experiment/experiment.py b/python/ray/tune/experiment/experiment.py index 7b9c6fe028d6..35cb2d0330f2 100644 --- a/python/ray/tune/experiment/experiment.py +++ b/python/ray/tune/experiment/experiment.py @@ -1,5 +1,6 @@ import copy import datetime +import warnings from functools import partial import grpc import logging @@ -29,8 +30,9 @@ from ray.tune.stopper import CombinedStopper, FunctionStopper, Stopper, TimeoutStopper from ray.tune.syncer import SyncConfig from ray.tune.utils import date_str +from ray.util import log_once -from ray.util.annotations import DeveloperAPI +from ray.util.annotations import DeveloperAPI, Deprecated if TYPE_CHECKING: from ray.tune.experiment import Trial @@ -133,7 +135,7 @@ def __init__( None, Mapping[str, Union[float, int, Mapping]], "PlacementGroupFactory" ] = None, num_samples: int = 1, - local_dir: Optional[str] = None, + storage_path: Optional[str] = None, _experiment_checkpoint_dir: Optional[str] = None, sync_config: Optional[Union[SyncConfig, dict]] = None, checkpoint_config: Optional[Union[CheckpointConfig, dict]] = None, @@ -143,26 +145,57 @@ def __init__( export_formats: Optional[Sequence] = None, max_failures: int = 0, restore: Optional[str] = None, + # Deprecated + local_dir: Optional[str] = None, ): + if isinstance(sync_config, dict): + sync_config = SyncConfig(**sync_config) + else: + sync_config = sync_config or SyncConfig() + + self.sync_config = sync_config + + local_storage_path = storage_path + remote_storage_path = self.sync_config.upload_dir + + if local_dir: + if local_storage_path: + raise ValueError( + "Only one of `local_dir` and `storage_path` can be passed to " + "``Experiment().` Since `local_dir` is deprecated, pass " + "only `storage_path` instead." + ) + + if log_once("tune_experiment_local_dir"): + warnings.warn( + "The `local_dir` argument of `Experiment is deprecated. " + "Use `storage_path` or set the `TUNE_RESULT_DIR` " + "environment variable instead." + ) + + local_storage_path = local_dir + + full_local_storage_path = _get_local_dir_with_expand_user(local_storage_path) - local_dir = _get_local_dir_with_expand_user(local_dir) # `_experiment_checkpoint_dir` is for internal use only for better # support of Tuner API. # If set, it should be a subpath under `local_dir`. Also deduce `dir_name`. - self._experiment_checkpoint_dir = _experiment_checkpoint_dir if _experiment_checkpoint_dir: experiment_checkpoint_dir_path = Path(_experiment_checkpoint_dir) - local_dir_path = Path(local_dir) - assert local_dir_path in experiment_checkpoint_dir_path.parents + local_dir_path = Path(full_local_storage_path) + assert local_dir_path in experiment_checkpoint_dir_path.parents, ( + local_dir_path, + str(list(experiment_checkpoint_dir_path.parents)), + ) # `dir_name` is set by `_experiment_checkpoint_dir` indirectly. - self.dir_name = os.path.relpath(_experiment_checkpoint_dir, local_dir) + self.dir_name = os.path.relpath( + _experiment_checkpoint_dir, full_local_storage_path + ) - config = config or {} + self._local_storage_path = full_local_storage_path + self._remote_storage_path = remote_storage_path - if isinstance(sync_config, dict): - sync_config = SyncConfig(**sync_config) - else: - sync_config = sync_config or SyncConfig() + config = config or {} if isinstance(checkpoint_config, dict): checkpoint_config = CheckpointConfig(**checkpoint_config) @@ -202,8 +235,6 @@ def __init__( self.name = name or self._run_identifier - self.sync_config = sync_config - if not _experiment_checkpoint_dir: self.dir_name = _get_dir_name(run, name, self.name) @@ -259,7 +290,7 @@ def __init__( "config": config, "resources_per_trial": resources_per_trial, "num_samples": num_samples, - "local_dir": local_dir, + "experiment_path": self.local_path, "experiment_dir_name": self.dir_name, "sync_config": sync_config, "checkpoint_config": checkpoint_config, @@ -425,27 +456,43 @@ def stopper(self): return self._stopper @property + def local_path(self) -> Optional[str]: + if not self._local_storage_path: + return None + return str(Path(self._local_storage_path) / self.dir_name) + + @property + @Deprecated("Replaced by `local_path`") def local_dir(self): - return self.spec.get("local_dir") + # Deprecate: Raise in 2.5, Remove in 2.6 + return self.local_path + + @property + def remote_path(self) -> Optional[str]: + if not self._remote_storage_path: + return None + return str(URI(self._remote_storage_path) / self.dir_name) + + @property + def path(self) -> Optional[str]: + return self.remote_path or self.local_path @property def checkpoint_config(self): return self.spec.get("checkpoint_config") @property + @Deprecated("Replaced by `checkpoint_dir`") def checkpoint_dir(self): + # Deprecate: Raise in 2.5, Remove in 2.6 # Provided when initializing Experiment, if so, return directly. - if self._experiment_checkpoint_dir: - return self._experiment_checkpoint_dir - assert self.local_dir - return os.path.join(self.local_dir, self.dir_name) + return self.local_path @property + @Deprecated("Replaced by `remote_path`") def remote_checkpoint_dir(self) -> Optional[str]: - if not self.sync_config.upload_dir or not self.dir_name: - return None - uri = URI(self.sync_config.upload_dir) - return str(uri / self.dir_name) + # Deprecate: Raise in 2.5, Remove in 2.6 + return self.remote_path @property def run_identifier(self): diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index 62f5bdbdd4a2..6e88cc433d39 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -39,6 +39,7 @@ TRIAL_INFO, STDOUT_FILE, STDERR_FILE, + DEFAULT_EXPERIMENT_NAME, ) from ray.tune.syncer import SyncConfig from ray.tune.execution.placement_groups import ( @@ -48,7 +49,7 @@ from ray.tune.utils.serialization import TuneFunctionDecoder, TuneFunctionEncoder from ray.tune.trainable.util import TrainableUtil from ray.tune.utils import date_str, flatten_dict -from ray.util.annotations import DeveloperAPI +from ray.util.annotations import DeveloperAPI, Deprecated from ray.util.debug import log_once from ray._private.utils import binary_to_hex, hex_to_binary @@ -212,11 +213,11 @@ def _get_trainable_kwargs( additional_kwargs: Optional[Dict[str, Any]] = None, should_chdir: bool = False, ) -> Dict[str, Any]: - trial.init_logdir() + trial.init_local_path() logger_creator = partial( _noop_logger_creator, - logdir=trial.logdir, + logdir=trial.local_path, should_chdir=should_chdir, ) @@ -232,7 +233,9 @@ def _get_trainable_kwargs( } if trial.uses_cloud_checkpointing: - kwargs["remote_checkpoint_dir"] = trial.remote_checkpoint_dir + # We keep these kwargs separate for backwards compatibility + # with trainables that don't provide these keyword arguments + kwargs["remote_checkpoint_dir"] = trial.remote_path kwargs["sync_config"] = trial.sync_config if additional_kwargs: @@ -249,7 +252,7 @@ class Trial: the event loop for submitting trial runs to a Ray cluster. Trials start in the PENDING state, and transition to RUNNING once started. - On error it transitions to ERROR, otherwise TERMINATED on success. + On error, it transitions to ERROR, otherwise TERMINATED on success. There are resources allocated to each trial. These should be specified using ``PlacementGroupFactory``. @@ -258,12 +261,13 @@ class Trial: trainable_name: Name of the trainable object to be executed. config: Provided configuration dictionary with evaluated params. trial_id: Unique identifier for the trial. - local_dir: ``local_dir`` as passed to ``air.RunConfig()`` joined - with the name of the experiment. - logdir: Directory where the trial logs are saved. - relative_logdir: Same as ``logdir``, but relative to the parent of - the ``local_dir`` (equal to ``local_dir`` argument passed - to ``air.RunConfig()``). + path: Path where results for this trial are stored. Can be on + the local node or on cloud storage. + local_path: Path on the local disk where results are stored. + remote_path: Path on cloud storage where results are stored, + or None if not set. + relative_logdir: Directory of the trial relative to its + experiment directory. evaluated_params: Evaluated parameters by search algorithm, experiment_tag: Identifying trial name to show in the console status: One of PENDING, RUNNING, PAUSED, TERMINATED, ERROR/ @@ -293,12 +297,12 @@ def __init__( *, config: Optional[Dict] = None, trial_id: Optional[str] = None, - local_dir: Optional[str] = DEFAULT_RESULTS_DIR, + experiment_path: Optional[str] = None, + experiment_dir_name: Optional[str] = None, evaluated_params: Optional[Dict] = None, experiment_tag: str = "", placement_group_factory: Optional[PlacementGroupFactory] = None, stopping_criterion: Optional[Dict[str, float]] = None, - experiment_dir_name: Optional[str] = None, sync_config: Optional[SyncConfig] = None, checkpoint_config: Optional[CheckpointConfig] = None, export_formats: Optional[List[str]] = None, @@ -309,6 +313,8 @@ def __init__( max_failures: int = 0, stub: bool = False, _setup_default_resource: bool = True, + # Deprecated + local_dir: Optional[str] = None, ): """Initialize a new trial. @@ -331,7 +337,48 @@ def __init__( # Trial config self.trainable_name = trainable_name self.trial_id = Trial.generate_id() if trial_id is None else trial_id - self._local_dir = local_dir # This remains unexpanded for syncing. + + # Sync config + self.sync_config = sync_config or SyncConfig() + + # Set to pass through on `Trial.reset()` + self._orig_experiment_path = experiment_path + self._orig_experiment_dir_name = experiment_dir_name + + # Rename for better code readability + local_experiment_path = experiment_path + remote_experiment_path = None + + # Backwards compatibility for `local_dir` + if local_dir: + if local_experiment_path: + raise ValueError( + "Only one of `local_dir` or `experiment_path` " + "can be passed to `Trial()`." + ) + local_experiment_path = local_dir + + # Derive experiment dir name from local path + if not experiment_dir_name and local_experiment_path: + # Maybe derive experiment dir name from local storage dir + experiment_dir_name = Path(local_experiment_path).name + elif not experiment_dir_name: + experiment_dir_name = DEFAULT_EXPERIMENT_NAME + + # Set default experiment dir name + if not local_experiment_path: + local_experiment_path = str(Path(DEFAULT_RESULTS_DIR) / experiment_dir_name) + os.makedirs(local_experiment_path, exist_ok=True) + + # Set remote experiment path if upload_dir is set + if self.sync_config.upload_dir: + remote_experiment_path = str( + URI(self.sync_config.upload_dir) / experiment_dir_name + ) + + # Finally, set properties + self._local_experiment_path = local_experiment_path + self._remote_experiment_path = remote_experiment_path self.config = config or {} # Save a copy of the original unresolved config so that we can swap @@ -546,48 +593,69 @@ def get_runner_ip(self) -> Optional[str]: return self.location.hostname @property + @Deprecated("Replaced by `local_experiment_path`") def local_dir(self): - return self._local_dir + return self.local_experiment_path + + @property + def remote_experiment_path(self) -> str: + return str(self._remote_experiment_path) + + @remote_experiment_path.setter + def remote_experiment_path(self, remote_path: str): + self._remote_experiment_path = remote_path + + @property + def local_experiment_path(self) -> str: + return str(self._local_experiment_path) - @local_dir.setter - def local_dir(self, local_dir): + @local_experiment_path.setter + def local_experiment_path(self, local_path: str): relative_checkpoint_dirs = [] - if self.logdir: + if self.local_path: # Save the relative paths of persistent trial checkpoints, which are saved # relative to the old `local_dir`/`logdir` for checkpoint in self.get_trial_checkpoints(): checkpoint_dir = checkpoint.dir_or_data assert isinstance(checkpoint_dir, str) relative_checkpoint_dirs.append( - os.path.relpath(checkpoint_dir, self.logdir) + os.path.relpath(checkpoint_dir, self.local_path) ) - # Update the underlying `_local_dir`, which also updates the trial `logdir` - self._local_dir = local_dir + # Update the underlying `_local_experiment_path`, + # which also updates the trial `local_path` + self._local_experiment_path = local_path - if self.logdir: + if self.local_path: for checkpoint, relative_checkpoint_dir in zip( self.get_trial_checkpoints(), relative_checkpoint_dirs ): # Reconstruct the checkpoint dir using the (possibly updated) # trial logdir and the relative checkpoint directory. checkpoint.dir_or_data = os.path.join( - self.logdir, relative_checkpoint_dir + self.local_path, relative_checkpoint_dir ) @property - def logdir(self): - if not self.local_dir or not self.relative_logdir: + @Deprecated("Replaced by `local_path`") + def logdir(self) -> Optional[str]: + # Deprecate: Raise in 2.5, Remove in 2.6 + return self.local_path + + @property + def local_path(self) -> Optional[str]: + if not self.local_experiment_path or not self.relative_logdir: return None - return str(Path(self.local_dir).joinpath(self.relative_logdir)) + return str(Path(self.local_experiment_path).joinpath(self.relative_logdir)) - @logdir.setter - def logdir(self, logdir): - relative_logdir = Path(logdir).relative_to(self.local_dir) + @local_path.setter + def local_path(self, logdir): + relative_logdir = Path(logdir).relative_to(self.local_experiment_path) if ".." in str(relative_logdir): raise ValueError( - f"The `logdir` points to a directory outside the trial's `local_dir` " - f"({self.local_dir}), which is unsupported. Use a logdir within the " + f"The `local_path` points to a directory outside the trial's " + f"`local_experiment_path` ({self.local_experiment_path}), " + f"which is unsupported. Use a logdir within the " f"local directory instead. Got: {logdir}" ) if log_once("logdir_setter"): @@ -597,6 +665,24 @@ def logdir(self, logdir): ) self.relative_logdir = relative_logdir + @property + @Deprecated("Replaced by `remote_path`") + def remote_checkpoint_dir(self) -> Optional[str]: + # Deprecate: Raise in 2.5, Remove in 2.6 + return self.remote_path + + @property + def remote_path(self) -> Optional[str]: + assert self.local_path, "Trial {}: logdir not initialized.".format(self) + if not self._remote_experiment_path or not self.relative_logdir: + return None + uri = URI(self._remote_experiment_path) + return str(uri / self.relative_logdir) + + @property + def path(self) -> Optional[str]: + return self.remote_path or self.local_path + @property def has_reported_at_least_once(self) -> bool: return bool(self._last_result) @@ -639,21 +725,9 @@ def checkpoint(self): def generate_id(cls): return str(uuid.uuid4().hex)[:8] - @property - def remote_checkpoint_dir(self) -> str: - """This is the **per trial** remote checkpoint dir. - - This is different from **per experiment** remote checkpoint dir. - """ - assert self.logdir, "Trial {}: logdir not initialized.".format(self) - if not self.sync_config.upload_dir or not self.experiment_dir_name: - return None - uri = URI(self.sync_config.upload_dir) - return str(uri / self.experiment_dir_name / self.relative_logdir) - @property def uses_cloud_checkpointing(self): - return bool(self.remote_checkpoint_dir) + return bool(self.remote_path) def reset(self): # If there is `default_resource_request` associated with the trainable, @@ -673,7 +747,8 @@ def reset(self): self.trainable_name, config=self.config, trial_id=None, - local_dir=self.local_dir, + experiment_path=self._orig_experiment_path, + experiment_dir_name=self._orig_experiment_dir_name, evaluated_params=self.evaluated_params, experiment_tag=self.experiment_tag, placement_group_factory=placement_group_factory, @@ -688,14 +763,19 @@ def reset(self): max_failures=self.max_failures, ) + @Deprecated("Replaced by `init_local_path()`") def init_logdir(self): + # Deprecate: Raise in 2.5, Remove in 2.6 + self.init_local_path() + + def init_local_path(self): """Init logdir.""" if not self.relative_logdir: self.relative_logdir = _create_unique_logdir_name( - self.local_dir, self._generate_dirname() + str(self.local_experiment_path), self._generate_dirname() ) - assert self.logdir - logdir_path = Path(self.logdir) + assert self.local_path + logdir_path = Path(self.local_path) logdir_path.mkdir(parents=True, exist_ok=True) self.invalidate_json_state() @@ -757,15 +837,15 @@ def set_experiment_tag(self, experiment_tag): @property def error_file(self): - if not self.logdir or not self.error_filename: + if not self.local_path or not self.error_filename: return None - return os.path.join(self.logdir, self.error_filename) + return os.path.join(self.local_path, self.error_filename) @property def pickled_error_file(self): - if not self.logdir or not self.pickled_error_filename: + if not self.local_path or not self.pickled_error_filename: return None - return os.path.join(self.logdir, self.pickled_error_filename) + return os.path.join(self.local_path, self.pickled_error_filename) def handle_error(self, exc: Optional[Union[TuneError, RayTaskError]] = None): if isinstance(exc, _TuneRestoreError): @@ -781,7 +861,7 @@ def handle_error(self, exc: Optional[Union[TuneError, RayTaskError]] = None): else: self.num_failures += 1 - if self.logdir: + if self.local_path: self.error_filename = "error.txt" if isinstance(exc, RayTaskError): # Piping through the actual error to result grid. diff --git a/python/ray/tune/logger/aim.py b/python/ray/tune/logger/aim.py index 595e0041b631..d90aecafa655 100644 --- a/python/ray/tune/logger/aim.py +++ b/python/ray/tune/logger/aim.py @@ -90,7 +90,7 @@ def _create_run(self, trial: "Trial") -> Run: Returns: Run: The created aim run for a specific trial. """ - experiment_dir = trial.local_dir + experiment_dir = trial.local_experiment_path run = Run( repo=self._repo_path or experiment_dir, experiment=self._experiment_name or trial.experiment_dir_name, @@ -98,9 +98,9 @@ def _create_run(self, trial: "Trial") -> Run: ) # Attach a few useful trial properties run["trial_id"] = trial.trial_id - run["trial_log_dir"] = trial.logdir - if trial.remote_checkpoint_dir: - run["trial_remote_log_dir"] = trial.remote_checkpoint_dir + run["trial_log_dir"] = trial.local_path + if trial.remote_path: + run["trial_remote_log_dir"] = trial.remote_path trial_ip = trial.get_runner_ip() if trial_ip: run["trial_ip"] = trial_ip @@ -111,7 +111,7 @@ def log_trial_start(self, trial: "Trial"): # Cleanup an existing run if the trial has been restarted self._trial_to_run[trial].close() - trial.init_logdir() + trial.init_local_path() self._trial_to_run[trial] = self._create_run(trial) if trial.evaluated_params: diff --git a/python/ray/tune/logger/csv.py b/python/ray/tune/logger/csv.py index 15849760ed34..f8509990b3a7 100644 --- a/python/ray/tune/logger/csv.py +++ b/python/ray/tune/logger/csv.py @@ -88,8 +88,8 @@ def _setup_trial(self, trial: "Trial"): self._trial_files[trial].close() # Make sure logdir exists - trial.init_logdir() - local_file = os.path.join(trial.logdir, EXPR_PROGRESS_FILE) + trial.init_local_path() + local_file = os.path.join(trial.local_path, EXPR_PROGRESS_FILE) self._trial_continue[trial] = ( os.path.exists(local_file) and os.path.getsize(local_file) > 0 ) diff --git a/python/ray/tune/logger/json.py b/python/ray/tune/logger/json.py index 6172c0779bc2..ef59a455a3ba 100644 --- a/python/ray/tune/logger/json.py +++ b/python/ray/tune/logger/json.py @@ -87,8 +87,8 @@ def log_trial_start(self, trial: "Trial"): self.update_config(trial, trial.config) # Make sure logdir exists - trial.init_logdir() - local_file = os.path.join(trial.logdir, EXPR_RESULT_FILE) + trial.init_local_path() + local_file = os.path.join(trial.local_path, EXPR_RESULT_FILE) self._trial_files[trial] = open(local_file, "at") def log_trial_result(self, iteration: int, trial: "Trial", result: Dict): @@ -108,7 +108,7 @@ def log_trial_end(self, trial: "Trial", failed: bool = False): def update_config(self, trial: "Trial", config: Dict): self._trial_configs[trial] = config - config_out = os.path.join(trial.logdir, EXPR_PARAM_FILE) + config_out = os.path.join(trial.local_path, EXPR_PARAM_FILE) with open(config_out, "w") as f: json.dump( self._trial_configs[trial], @@ -118,6 +118,6 @@ def update_config(self, trial: "Trial", config: Dict): cls=SafeFallbackEncoder, ) - config_pkl = os.path.join(trial.logdir, EXPR_PARAM_PICKLE_FILE) + config_pkl = os.path.join(trial.local_path, EXPR_PARAM_PICKLE_FILE) with open(config_pkl, "wb") as f: cloudpickle.dump(self._trial_configs[trial], f) diff --git a/python/ray/tune/logger/logger.py b/python/ray/tune/logger/logger.py index 3998beb44a3d..bb09662d4c50 100644 --- a/python/ray/tune/logger/logger.py +++ b/python/ray/tune/logger/logger.py @@ -175,12 +175,12 @@ def __init__(self, logger_classes: Iterable[Type[Logger]]): self._class_trial_loggers: Dict[Type[Logger], Dict["Trial", Logger]] = {} def log_trial_start(self, trial: "Trial"): - trial.init_logdir() + trial.init_local_path() for logger_class in self.logger_classes: trial_loggers = self._class_trial_loggers.get(logger_class, {}) if trial not in trial_loggers: - logger = logger_class(trial.config, trial.logdir, trial) + logger = logger_class(trial.config, trial.local_path, trial) trial_loggers[trial] = logger self._class_trial_loggers[logger_class] = trial_loggers diff --git a/python/ray/tune/logger/tensorboardx.py b/python/ray/tune/logger/tensorboardx.py index 096ffc9e8612..9e083319c630 100644 --- a/python/ray/tune/logger/tensorboardx.py +++ b/python/ray/tune/logger/tensorboardx.py @@ -177,9 +177,9 @@ def __init__(self): def log_trial_start(self, trial: "Trial"): if trial in self._trial_writer: self._trial_writer[trial].close() - trial.init_logdir() + trial.init_local_path() self._trial_writer[trial] = self._summary_writer_cls( - trial.logdir, flush_secs=30 + trial.local_path, flush_secs=30 ) self._trial_result[trial] = {} diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index 8d63fc069c8e..74aaee9f321c 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -860,7 +860,7 @@ def _trial_progress_str( num_trials = len(trials) trials_by_state = _get_trials_by_state(trials) - for local_dir in sorted({t.local_dir for t in trials}): + for local_dir in sorted({t.local_experiment_path for t in trials}): messages.append("Result logdir: {}".format(local_dir)) num_trials_strs = [ @@ -1385,7 +1385,7 @@ def print_result(self, trial: Trial, result: Dict, error: bool, done: bool): elif has_verbosity(Verbosity.V2_TRIAL_NORM): metric_name = self._metric or "_metric" metric_value = result.get(metric_name, -99.0) - error_file = os.path.join(trial.logdir, "error.txt") + error_file = os.path.join(trial.local_path, "error.txt") info = "" if done: diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index a62729a122c1..b3e47bab0def 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -124,6 +124,8 @@ or os.path.expanduser("~/ray_results") ) +DEFAULT_EXPERIMENT_NAME = "default" + # Meta file about status under each experiment directory, can be # parsed by automlboard if exists. JOB_META_FILE = "job_status.json" diff --git a/python/ray/tune/result_grid.py b/python/ray/tune/result_grid.py index 61e5ec9550be..f8337c9056e0 100644 --- a/python/ray/tune/result_grid.py +++ b/python/ray/tune/result_grid.py @@ -232,9 +232,9 @@ def _trial_to_result(self, trial: Trial) -> Result: checkpoint=checkpoint, metrics=trial.last_result.copy(), error=self._populate_exception(trial), - log_dir=Path(trial.logdir) if trial.logdir else None, + log_dir=Path(trial.local_path) if trial.local_path else None, metrics_dataframe=self._experiment_analysis.trial_dataframes.get( - trial.logdir + trial.local_path ) if self._experiment_analysis.trial_dataframes else None, diff --git a/python/ray/tune/schedulers/pbt.py b/python/ray/tune/schedulers/pbt.py index ac150c83be41..11b5013ad089 100644 --- a/python/ray/tune/schedulers/pbt.py +++ b/python/ray/tune/schedulers/pbt.py @@ -666,7 +666,9 @@ def _log_config_on_step( trial_name, trial_to_clone_name = (trial_state.orig_tag, new_state.orig_tag) trial_id = trial.trial_id trial_to_clone_id = trial_to_clone.trial_id - trial_path = os.path.join(trial.local_dir, "pbt_policy_" + trial_id + ".txt") + trial_path = os.path.join( + trial.local_experiment_path, "pbt_policy_" + trial_id + ".txt" + ) trial_to_clone_path = os.path.join( trial_to_clone.local_dir, "pbt_policy_" + trial_to_clone_id + ".txt" ) @@ -679,7 +681,9 @@ def _log_config_on_step( new_config, ] # Log to global file. - with open(os.path.join(trial.local_dir, "pbt_global.txt"), "a+") as f: + with open( + os.path.join(trial.local_experiment_path, "pbt_global.txt"), "a+" + ) as f: print(json.dumps(policy, cls=SafeFallbackEncoder), file=f) # Overwrite state in target trial from trial_to_clone. if os.path.exists(trial_to_clone_path): diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 5ebabde833cb..45e0b52f2976 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -726,10 +726,10 @@ def _mark_as_synced(self, trial: "Trial"): self._sync_times[trial.trial_id] = time.time() def _local_trial_logdir(self, trial: "Trial"): - return trial.logdir + return trial.local_path def _remote_trial_logdir(self, trial: "Trial"): - return trial.logdir + return trial.local_path def _sync_trial_dir( self, trial: "Trial", force: bool = False, wait: bool = True diff --git a/python/ray/tune/tests/test_actor_reuse.py b/python/ray/tune/tests/test_actor_reuse.py index a2281af8190f..a043e0d3e4f7 100644 --- a/python/ray/tune/tests/test_actor_reuse.py +++ b/python/ray/tune/tests/test_actor_reuse.py @@ -240,15 +240,15 @@ def test_trial_reuse_log_to_file(ray_start_1_cpu): # Check trial 1 assert trial1.last_result["num_resets"] == 2 - assert os.path.exists(os.path.join(trial1.logdir, "stdout")) - assert os.path.exists(os.path.join(trial1.logdir, "stderr")) + assert os.path.exists(os.path.join(trial1.local_path, "stdout")) + assert os.path.exists(os.path.join(trial1.local_path, "stderr")) # We expect that only "First" output is found in the first trial output - with open(os.path.join(trial1.logdir, "stdout"), "rt") as fp: + with open(os.path.join(trial1.local_path, "stdout"), "rt") as fp: content = fp.read() assert "PRINT_STDOUT: First" in content assert "PRINT_STDOUT: Second" not in content - with open(os.path.join(trial1.logdir, "stderr"), "rt") as fp: + with open(os.path.join(trial1.local_path, "stderr"), "rt") as fp: content = fp.read() assert "PRINT_STDERR: First" in content assert "LOG_STDERR: First" in content @@ -257,15 +257,15 @@ def test_trial_reuse_log_to_file(ray_start_1_cpu): # Check trial 2 assert trial2.last_result["num_resets"] == 3 - assert os.path.exists(os.path.join(trial2.logdir, "stdout")) - assert os.path.exists(os.path.join(trial2.logdir, "stderr")) + assert os.path.exists(os.path.join(trial2.local_path, "stdout")) + assert os.path.exists(os.path.join(trial2.local_path, "stderr")) # We expect that only "Second" output is found in the first trial output - with open(os.path.join(trial2.logdir, "stdout"), "rt") as fp: + with open(os.path.join(trial2.local_path, "stdout"), "rt") as fp: content = fp.read() assert "PRINT_STDOUT: Second" in content assert "PRINT_STDOUT: First" not in content - with open(os.path.join(trial2.logdir, "stderr"), "rt") as fp: + with open(os.path.join(trial2.local_path, "stderr"), "rt") as fp: content = fp.read() assert "PRINT_STDERR: Second" in content assert "LOG_STDERR: Second" in content @@ -434,16 +434,22 @@ def load_checkpoint(self, *args, **kwargs): # Make sure that `remote_checkpoint_dir` gets updated correctly trial_id = self.config.get("id") remote_trial_dir = get_remote_trial_dir(trial_id) + if self.remote_checkpoint_dir != "file://" + remote_trial_dir: # Delay raising the exception, since raising here would cause # an unhandled exception that doesn't fail the test. self._should_raise = True def step(self): + trial_id = self.config.get("id") + remote_trial_dir = get_remote_trial_dir(trial_id) + if self._should_raise: raise RuntimeError( - f"Failing! {self.remote_checkpoint_dir} not updated properly " - f"for trial {self.config.get('id')}" + f"Failing! Remote path not updated properly " + f"for trial {self.config.get('id')}. " + f"\nExpected: file://{remote_trial_dir}" + f"\nGot: {self.remote_checkpoint_dir}" ) return super().step() diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index cf5de211658b..3eb015d3549b 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -669,7 +669,7 @@ def custom_trial_dir(trial): trial_dirname_creator=custom_trial_dir, local_dir=self.tmpdir, ).trials - logdirs = {t.logdir for t in trials} + logdirs = {t.local_path for t in trials} assert len(logdirs) == 3 assert all(custom_name in dirpath for dirpath in logdirs) @@ -681,7 +681,7 @@ def train(config, reporter): trials = tune.run( train, config={"t1": tune.grid_search([1, 2, 3])}, local_dir=self.tmpdir ).trials - logdirs = {t.logdir for t in trials} + logdirs = {t.local_path for t in trials} for i in [1, 2, 3]: assert any(f"t1={i}" in dirpath for dirpath in logdirs) for t in trials: @@ -862,7 +862,7 @@ def cleanup(self): analysis = tune.run(TestTrainable, num_samples=10, stop={TRAINING_ITERATION: 1}) for trial in analysis.trials: - path = os.path.join(trial.logdir, "marker") + path = os.path.join(trial.local_path, "marker") assert os.path.exists(path) def testReportTimeStep(self): @@ -1173,27 +1173,27 @@ def train(config, reporter): # Do not log to file [trial] = tune.run("f1", log_to_file=False).trials - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout"))) - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "stdout"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "stderr"))) # Log to default files [trial] = tune.run("f1", log_to_file=True).trials - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "stdout"))) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "stderr"))) - with open(os.path.join(trial.logdir, "stdout"), "rt") as fp: + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "stdout"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "stderr"))) + with open(os.path.join(trial.local_path, "stdout"), "rt") as fp: content = fp.read() self.assertIn("PRINT_STDOUT", content) - with open(os.path.join(trial.logdir, "stderr"), "rt") as fp: + with open(os.path.join(trial.local_path, "stderr"), "rt") as fp: content = fp.read() self.assertIn("PRINT_STDERR", content) self.assertIn("LOG_STDERR", content) # Log to one file [trial] = tune.run("f1", log_to_file="combined").trials - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout"))) - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr"))) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "combined"))) - with open(os.path.join(trial.logdir, "combined"), "rt") as fp: + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "stdout"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "stderr"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "combined"))) + with open(os.path.join(trial.local_path, "combined"), "rt") as fp: content = fp.read() self.assertIn("PRINT_STDOUT", content) self.assertIn("PRINT_STDERR", content) @@ -1201,15 +1201,15 @@ def train(config, reporter): # Log to two files [trial] = tune.run("f1", log_to_file=("alt.stdout", "alt.stderr")).trials - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout"))) - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr"))) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "alt.stdout"))) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "alt.stderr"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "stdout"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "stderr"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "alt.stdout"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "alt.stderr"))) - with open(os.path.join(trial.logdir, "alt.stdout"), "rt") as fp: + with open(os.path.join(trial.local_path, "alt.stdout"), "rt") as fp: content = fp.read() self.assertIn("PRINT_STDOUT", content) - with open(os.path.join(trial.logdir, "alt.stderr"), "rt") as fp: + with open(os.path.join(trial.local_path, "alt.stderr"), "rt") as fp: content = fp.read() self.assertIn("PRINT_STDERR", content) self.assertIn("LOG_STDERR", content) diff --git a/python/ray/tune/tests/test_cluster.py b/python/ray/tune/tests/test_cluster.py index 041d07660751..d5eeefb8c658 100644 --- a/python/ray/tune/tests/test_cluster.py +++ b/python/ray/tune/tests/test_cluster.py @@ -372,7 +372,7 @@ def test_migration_checkpoint_removal( t1.checkpoint.dir_or_data = os.path.join( tmpdir, t1.relative_logdir, - os.path.relpath(t1.checkpoint.dir_or_data, t1.logdir), + os.path.relpath(t1.checkpoint.dir_or_data, t1.local_path), ) while not runner.is_finished(): diff --git a/python/ray/tune/tests/test_experiment_analysis.py b/python/ray/tune/tests/test_experiment_analysis.py index b99d3fbbce57..eafa500d926d 100644 --- a/python/ray/tune/tests/test_experiment_analysis.py +++ b/python/ray/tune/tests/test_experiment_analysis.py @@ -91,7 +91,7 @@ def testStats(self): def testTrialDataframe(self): checkpoints = self.ea._checkpoints_and_paths idx = random.randint(0, len(checkpoints) - 1) - logdir_from_trial = self.ea.trials[idx].logdir + logdir_from_trial = self.ea.trials[idx].local_path trial_df = self.ea.trial_dataframes[logdir_from_trial] self.assertTrue(isinstance(trial_df, pd.DataFrame)) @@ -308,7 +308,7 @@ def train(config): self.assertEqual(ea.best_trial, trials[2]) self.assertEqual(ea.best_config, trials[2].config) - self.assertEqual(ea.best_logdir, trials[2].logdir) + self.assertEqual(ea.best_logdir, trials[2].local_path) self.assertEqual( ea.best_checkpoint._local_path, trials[2].checkpoint.dir_or_data ) diff --git a/python/ray/tune/tests/test_integration_pytorch_lightning.py b/python/ray/tune/tests/test_integration_pytorch_lightning.py index a9da351d087f..b6dcf59e1151 100644 --- a/python/ray/tune/tests/test_integration_pytorch_lightning.py +++ b/python/ray/tune/tests/test_integration_pytorch_lightning.py @@ -123,7 +123,7 @@ def train(config): checkpoints = [ dir - for dir in os.listdir(analysis.trials[0].logdir) + for dir in os.listdir(analysis.trials[0].local_path) if dir.startswith("checkpoint") ] # 10 checkpoints after each batch, 1 checkpoint at end @@ -154,7 +154,7 @@ def train(config): checkpoints = [ dir - for dir in os.listdir(analysis.trials[0].logdir) + for dir in os.listdir(analysis.trials[0].local_path) if dir.startswith("checkpoint") ] # 1 checkpoint after the validation step diff --git a/python/ray/tune/tests/test_logger.py b/python/ray/tune/tests/test_logger.py index 0a35be6015ac..9384f2b73d7a 100644 --- a/python/ray/tune/tests/test_logger.py +++ b/python/ray/tune/tests/test_logger.py @@ -5,6 +5,7 @@ import os import unittest import tempfile +from pathlib import Path from typing import Optional import shutil import numpy as np @@ -34,7 +35,7 @@ class Trial: evaluated_params: dict trial_id: str logdir: str - local_dir: Optional[str] = None + experiment_path: Optional[str] = None experiment_dir_name: Optional[str] = None remote_checkpoint_dir: Optional[str] = None @@ -42,9 +43,25 @@ class Trial: def config(self): return self.evaluated_params - def init_logdir(self): + def init_local_path(self): return + @property + def local_path(self): + if self.logdir: + return self.logdir + if not self.experiment_dir_name: + return None + return str(Path(self.experiment_path) / self.experiment_dir_name) + + @property + def local_experiment_path(self): + return self.experiment_path + + @property + def remote_path(self): + return self.remote_checkpoint_dir + def __hash__(self): return hash(self.trial_id) @@ -314,7 +331,7 @@ class Dummy: Trial( evaluated_params=self.config, trial_id="aim_1", - local_dir=self.test_dir, + experiment_path=self.test_dir, logdir=trial_logdir, experiment_dir_name="aim_test", remote_checkpoint_dir="s3://bucket/aim_test/trial_0_logdir", @@ -322,7 +339,7 @@ class Dummy: Trial( evaluated_params=self.config, trial_id="aim_2", - local_dir=self.test_dir, + experiment_path=self.test_dir, logdir=trial_logdir, experiment_dir_name="aim_test", remote_checkpoint_dir="s3://bucket/aim_test/trial_1_logdir", diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py index 33cce827a559..a43a7a9f770a 100644 --- a/python/ray/tune/tests/test_progress_reporter.py +++ b/python/ray/tune/tests/test_progress_reporter.py @@ -439,7 +439,7 @@ def testProgressStr(self): else: t.status = "RUNNING" t.trial_id = "%05d" % i - t.local_dir = "/foo" + t.local_experiment_path = "/foo" t.location = "here" t.config = {"a": i, "b": i * 2, "n": {"k": [i, 2 * i]}} t.evaluated_params = {"a": i, "b": i * 2, "n/k/0": i, "n/k/1": 2 * i} @@ -569,7 +569,7 @@ def testCurrentBestTrial(self): t = Mock() t.status = "RUNNING" t.trial_id = "%05d" % i - t.local_dir = "/foo" + t.local_experiment_path = "/foo" t.location = "here" t.config = {"a": i, "b": i * 2, "n": {"k": [i, 2 * i]}} t.evaluated_params = {"a": i} @@ -603,7 +603,7 @@ def testSortByMetric(self): else: t.status = "RUNNING" t.trial_id = "%05d" % i - t.local_dir = "/foo" + t.local_experiment_path = "/foo" t.location = "here" t.config = {"a": i} t.evaluated_params = {"a": i} @@ -795,7 +795,7 @@ def testMaxLen(self): t = Mock() t.status = "TERMINATED" t.trial_id = "%05d" % i - t.local_dir = "/foo" + t.local_experiment_path = "/foo" t.location = "here" t.config = {"verylong" * 20: i} t.evaluated_params = {"verylong" * 20: i} diff --git a/python/ray/tune/tests/test_result_grid.py b/python/ray/tune/tests/test_result_grid.py index 81086370fb50..075c56ceb4a5 100644 --- a/python/ray/tune/tests/test_result_grid.py +++ b/python/ray/tune/tests/test_result_grid.py @@ -342,7 +342,7 @@ def test_num_errors_terminated(tmpdir): trial.relative_logdir = trials[0].relative_logdir # Store a shared error file inside - error_path = Path(trials[0].logdir) / error_filename + error_path = Path(trials[0].local_path) / error_filename with open(error_path, "w") as fp: fp.write("Test error\n") diff --git a/python/ray/tune/tests/test_run_experiment.py b/python/ray/tune/tests/test_run_experiment.py index e55cb7329da8..22d2c0efe1bc 100644 --- a/python/ray/tune/tests/test_run_experiment.py +++ b/python/ray/tune/tests/test_run_experiment.py @@ -138,7 +138,7 @@ def _export_model(self, export_formats, export_dir): trials = run_experiments({"foo": {"run": train, "export_formats": ["format"]}}) for trial in trials: self.assertEqual(trial.status, Trial.TERMINATED) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "exported"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "exported"))) def testInvalidExportFormats(self): class train(Trainable): @@ -186,19 +186,19 @@ def on_result(self, result): {"foo": {"run": "__fake", "stop": {"training_iteration": 1}}}, callbacks=[LegacyLoggerCallback(logger_classes=[CustomLogger])], ) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "test.log"))) - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "params.json"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "test.log"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "params.json"))) [trial] = run_experiments( {"foo": {"run": "__fake", "stop": {"training_iteration": 1}}} ) - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "params.json"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "params.json"))) [trial] = run_experiments( {"foo": {"run": "__fake", "stop": {"training_iteration": 1}}}, callbacks=[LegacyLoggerCallback(logger_classes=[])], ) - self.assertFalse(os.path.exists(os.path.join(trial.logdir, "params.json"))) + self.assertFalse(os.path.exists(os.path.join(trial.local_path, "params.json"))) def testCustomLoggerWithAutoLogging(self): """Creates CSV/JSON logger callbacks automatically""" @@ -214,19 +214,19 @@ def on_result(self, result): {"foo": {"run": "__fake", "stop": {"training_iteration": 1}}}, callbacks=[LegacyLoggerCallback(logger_classes=[CustomLogger])], ) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "test.log"))) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "params.json"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "test.log"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "params.json"))) [trial] = run_experiments( {"foo": {"run": "__fake", "stop": {"training_iteration": 1}}} ) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "params.json"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "params.json"))) [trial] = run_experiments( {"foo": {"run": "__fake", "stop": {"training_iteration": 1}}}, callbacks=[LegacyLoggerCallback(logger_classes=[])], ) - self.assertTrue(os.path.exists(os.path.join(trial.logdir, "params.json"))) + self.assertTrue(os.path.exists(os.path.join(trial.local_path, "params.json"))) def testCustomTrialString(self): [trial] = run_experiments( diff --git a/python/ray/tune/tests/test_syncer_callback.py b/python/ray/tune/tests/test_syncer_callback.py index 4dcd28ed590c..bc59548fb181 100644 --- a/python/ray/tune/tests/test_syncer_callback.py +++ b/python/ray/tune/tests/test_syncer_callback.py @@ -77,6 +77,7 @@ def __init__(self, trial_id: str, logdir: str, on_dead_node: bool = False): self.sync_on_checkpoint = True self.logdir = logdir + self.local_path = logdir self._local_ip = ray.util.get_node_ip_address() self._on_dead_node = on_dead_node diff --git a/python/ray/tune/tests/test_trial_relative_logdir.py b/python/ray/tune/tests/test_trial_relative_logdir.py index 79863abf5d07..03392530db9f 100644 --- a/python/ray/tune/tests/test_trial_relative_logdir.py +++ b/python/ray/tune/tests/test_trial_relative_logdir.py @@ -49,9 +49,9 @@ def testDotsInLogdir(self): trial = Trial(trainable_name="rel_logdir", local_dir=local_dir) with self.assertRaises(ValueError): - trial.logdir = "/tmp/test_rel/../dots" + trial.local_path = "/tmp/test_rel/../dots" with self.assertRaises(ValueError): - trial.logdir = local_dir + "/../" + trial.local_path = local_dir + "/../" if shutil.rmtree.avoids_symlink_attacks: if local_dir_path.exists(): @@ -266,7 +266,7 @@ def test_load_trial_from_json_state(tmpdir): trial.init_logdir() trial.status = Trial.TERMINATED - checkpoint_logdir = os.path.join(trial.logdir, "checkpoint_00000") + checkpoint_logdir = os.path.join(trial.local_path, "checkpoint_00000") trial.checkpoint_manager.on_checkpoint( _TrackedCheckpoint( dir_or_data=checkpoint_logdir, @@ -288,7 +288,7 @@ def test_change_trial_local_dir(tmpdir): trial.init_logdir() trial.status = Trial.TERMINATED - checkpoint_logdir = os.path.join(trial.logdir, "checkpoint_00000") + checkpoint_logdir = os.path.join(trial.local_path, "checkpoint_00000") trial.checkpoint_manager.on_checkpoint( _TrackedCheckpoint( dir_or_data=checkpoint_logdir, @@ -297,14 +297,14 @@ def test_change_trial_local_dir(tmpdir): ) ) - assert trial.logdir.startswith(str(tmpdir)) + assert trial.local_path.startswith(str(tmpdir)) assert trial.get_trial_checkpoints()[0].dir_or_data.startswith(str(tmpdir)) # Specify a new local dir, and the logdir/checkpoint path should be updated with tempfile.TemporaryDirectory() as new_local_dir: - trial.local_dir = new_local_dir + trial.local_experiment_path = new_local_dir - assert trial.logdir.startswith(new_local_dir) + assert trial.local_path.startswith(new_local_dir) assert trial.get_trial_checkpoints()[0].dir_or_data.startswith(new_local_dir) diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index abd78023da4b..02ccad99f58e 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -54,7 +54,7 @@ def train(config, reporter): if not trial: break trial_executor.start_trial(trial) - self.assertLessEqual(len(os.path.basename(trial.logdir)), 200) + self.assertLessEqual(len(os.path.basename(trial.local_path)), 200) trial_executor.stop_trial(trial) def testExtraResources(self): diff --git a/python/ray/tune/tests/test_trial_runner_2.py b/python/ray/tune/tests/test_trial_runner_2.py index 484d27085d36..672895aa74f0 100644 --- a/python/ray/tune/tests/test_trial_runner_2.py +++ b/python/ray/tune/tests/test_trial_runner_2.py @@ -365,7 +365,7 @@ def testPauseResumeCheckpointCount(self): # Example: # local_checkpoint_dir/ # experiment_state.json - # trial.logdir/ + # trial.local_path/ # checkpoint_00000/ trial = Trial( "__fake", @@ -377,7 +377,7 @@ def testPauseResumeCheckpointCount(self): def write_checkpoint(trial: Trial, index: int): checkpoint_dir = TrainableUtil.make_checkpoint_dir( - trial.logdir, index=index + trial.local_path, index=index ) result = {"training_iteration": index} with open(os.path.join(checkpoint_dir, "cp.json"), "w") as f: @@ -393,7 +393,9 @@ def write_checkpoint(trial: Trial, index: int): return checkpoint_dir def get_checkpoint_dirs(trial: Trial): - return [d for d in os.listdir(trial.logdir) if d.startswith("checkpoint_")] + return [ + d for d in os.listdir(trial.local_path) if d.startswith("checkpoint_") + ] runner = TrialRunner( local_checkpoint_dir=tempdir, diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 02f95591a45f..1ef23a938b3d 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -784,7 +784,7 @@ def testCheckpointFreqBuffered(self): def num_checkpoints(trial): return sum( - item.startswith("checkpoint_") for item in os.listdir(trial.logdir) + item.startswith("checkpoint_") for item in os.listdir(trial.local_path) ) ray.init(num_cpus=2) @@ -821,7 +821,7 @@ def testCheckpointAtEndNotBuffered(self): def num_checkpoints(trial): return sum( - item.startswith("checkpoint_") for item in os.listdir(trial.logdir) + item.startswith("checkpoint_") for item in os.listdir(trial.local_path) ) ray.init(num_cpus=2) @@ -918,7 +918,7 @@ def testUserCheckpointBuffered(self): def num_checkpoints(trial): return sum( - item.startswith("checkpoint_") for item in os.listdir(trial.logdir) + item.startswith("checkpoint_") for item in os.listdir(trial.local_path) ) ray.init(num_cpus=3) diff --git a/python/ray/tune/tests/test_trial_scheduler.py b/python/ray/tune/tests/test_trial_scheduler.py index 2be12ecbbdfc..20d7fc8d80e5 100644 --- a/python/ray/tune/tests/test_trial_scheduler.py +++ b/python/ray/tune/tests/test_trial_scheduler.py @@ -854,7 +854,7 @@ def __init__(self, i, config): self.placement_group_factory = PlacementGroupFactory([{"CPU": 1}]) self.custom_trial_name = None self.custom_dirname = None - self._local_dir = None + self._local_experiment_path = None self.relative_logdir = None self._default_result_or_future = None self.checkpoint_manager = _CheckpointManager( @@ -1483,7 +1483,7 @@ def check_policy(policy): trials = runner.get_trials() tmpdir = tempfile.mkdtemp() for i, trial in enumerate(trials): - trial.local_dir = tmpdir + trial.local_experiment_path = tmpdir trial.last_result = {TRAINING_ITERATION: i} self.on_trial_result(pbt, runner, trials[0], result(15, -100)) self.on_trial_result(pbt, runner, trials[0], result(20, -100)) @@ -1519,7 +1519,7 @@ def check_policy(policy): trials = runner.get_trials() tmpdir = tempfile.mkdtemp() for i, trial in enumerate(trials): - trial.local_dir = tmpdir + trial.local_experiment_path = tmpdir trial.last_result = {TRAINING_ITERATION: i} self.on_trial_result(pbt, runner, trials[i], result(10, i)) log_files = ["pbt_global.txt", "pbt_policy_0.txt", "pbt_policy_1.txt"] @@ -1568,7 +1568,7 @@ def forward(self, t): trial_state = [] for i, trial in enumerate(trials): - trial.local_dir = tmpdir + trial.local_experiment_path = tmpdir trial.last_result = {TRAINING_ITERATION: 0} trial_state.append(_TrialState(trial.config)) @@ -1725,7 +1725,7 @@ def forward(self, t): trial_state = [] for i, trial in enumerate(trials): - trial.local_dir = tmpdir + trial.local_experiment_path = tmpdir trial.last_result = {TRAINING_ITERATION: 0} trial_state.append(_TrialState(trial.config)) @@ -1881,7 +1881,7 @@ def testFastPerturb(self): tmpdir = tempfile.mkdtemp() for i, trial in enumerate(trials): - trial.local_dir = tmpdir + trial.local_experiment_path = tmpdir trial.last_result = {} self.on_trial_result( pbt, runner, trials[1], result(1, 10), TrialScheduler.CONTINUE @@ -1931,7 +1931,7 @@ def on_trial_result(self, trial_runner, trial, result): ever_active = set() active = set() for trial in out.trials: - with open(os.path.join(trial.logdir, "status.txt"), "rt") as fp: + with open(os.path.join(trial.local_path, "status.txt"), "rt") as fp: status = fp.read() print(f"Status for trial {trial}: {status}") if "Activate" in status: diff --git a/python/ray/tune/tests/test_tune_save_restore.py b/python/ray/tune/tests/test_tune_save_restore.py index 76f7444b8365..6a30fb8b057e 100644 --- a/python/ray/tune/tests/test_tune_save_restore.py +++ b/python/ray/tune/tests/test_tune_save_restore.py @@ -83,7 +83,7 @@ def _train(self, exp_name, local_dir, absolute_local_dir): self.assertIsNone(trial.error_file) self.assertEqual(trial.local_dir, exp_dir) - self.assertEqual(trial.logdir, abs_trial_dir) + self.assertEqual(trial.local_path, abs_trial_dir) self.assertTrue(os.path.isdir(absolute_local_dir), absolute_local_dir) self.assertTrue(os.path.isdir(exp_dir)) diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index 25885ab32217..f64c244b44c6 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -179,7 +179,7 @@ def __init__( self._monitor = UtilMonitor(start=log_sys_usage) self.remote_checkpoint_dir = remote_checkpoint_dir - # If no sync_config is provided, but we saving to a remote_checkpoint_dir, + # If no sync_config is provided, but we save to a remote_checkpoint_dir, # then provide a default syncer. `upload_dir` here is just a dummy directory # that tells the SyncConfig to create a default syncer. self.sync_config = sync_config or SyncConfig( @@ -213,7 +213,7 @@ def __init__( def uses_cloud_checkpointing(self): return bool(self.remote_checkpoint_dir) - def _storage_path(self, local_path): + def _remote_storage_path(self, local_path): """Converts a `local_path` to be based off of `self.remote_checkpoint_dir`.""" return TrainableUtil.get_remote_storage_path( @@ -653,7 +653,7 @@ def _maybe_save_to_cloud(self, local_dir: str, exclude: List[str] = None) -> boo syncer = self.sync_config.syncer assert syncer - checkpoint_uri = self._storage_path(local_dir) + checkpoint_uri = self._remote_storage_path(local_dir) syncer.sync_up(local_dir=local_dir, remote_dir=checkpoint_uri, exclude=exclude) try: @@ -719,7 +719,7 @@ def _maybe_load_artifacts_from_cloud(self) -> bool: if not self.sync_config.sync_artifacts: return False - remote_dir = self._storage_path(self.logdir) + remote_dir = self._remote_storage_path(self.logdir) with warn_if_slow( name="trial_artifact_cloud_download", message=( @@ -956,7 +956,7 @@ def delete_checkpoint(self, checkpoint_path: Union[str, Checkpoint]): syncer = self.sync_config.syncer assert syncer - checkpoint_uri = self._storage_path(checkpoint_dir) + checkpoint_uri = self._remote_storage_path(checkpoint_dir) syncer.delete(checkpoint_uri) try: syncer.wait_or_retry( diff --git a/python/ray/tune/trainable/util.py b/python/ray/tune/trainable/util.py index 44714da40376..6366338bd821 100644 --- a/python/ray/tune/trainable/util.py +++ b/python/ray/tune/trainable/util.py @@ -94,9 +94,10 @@ def find_rel_checkpoint_dir(logdir, checkpoint_path): `checkpoint_path`. For example, returns `checkpoint00000`. """ - assert checkpoint_path.startswith( - logdir - ), "expecting `logdir` to be a prefix of `checkpoint_path`" + assert checkpoint_path.startswith(logdir), ( + f"expecting `logdir` to be a prefix of `checkpoint_path`, got " + f"{checkpoint_path} (not in {logdir})" + ) rel_path = os.path.relpath(checkpoint_path, logdir) tokens = rel_path.split(os.sep) return os.path.join(tokens[0]) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 5684e1a8e70e..413faf50600d 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -745,7 +745,7 @@ class and registered trainables. search_alg=search_alg, placeholder_resolvers=placeholder_resolvers, scheduler=scheduler, - local_checkpoint_dir=experiments[0].checkpoint_dir, + experiment_path=experiments[0].local_path, experiment_dir_name=experiments[0].dir_name, sync_config=sync_config, stopper=experiments[0].stopper, @@ -797,7 +797,7 @@ class and registered trainables. _report_progress(runner, progress_reporter, done=True) all_trials = runner.get_trials() - experiment_checkpoint = runner.checkpoint_file + experiment_checkpoint = runner.experiment_state_path # Wait for syncing to finish for callback in callbacks: diff --git a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py index b12cf3cd7add..682a940f10fb 100644 --- a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py +++ b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py @@ -78,7 +78,7 @@ def __init__( trial_id: str, status: str, config: Dict[str, Any], - _local_dir: str, + _local_experiment_path: str, experiment_tag: str, _last_result: Dict[str, Any], relative_logdir: str, @@ -89,7 +89,7 @@ def __init__( self.trial_id = trial_id self.status = status self.config = config - self.local_dir = _local_dir + self.local_experiment_path = _local_experiment_path self.experiment_tag = experiment_tag self.last_result = _last_result self.relative_logdir = relative_logdir @@ -471,10 +471,10 @@ def fetch_trial_node_dirs_to_tmp_dir(trials: List[TrialStub]) -> Dict[TrialStub, if trial.was_on_driver_node: # Trial was run on driver shutil.rmtree(tmpdir) - shutil.copytree(trial.local_dir, tmpdir) + shutil.copytree(trial.local_experiment_path, tmpdir) print( "Copied local node experiment dir", - trial.local_dir, + trial.local_experiment_path, "to", tmpdir, "for trial", @@ -484,7 +484,7 @@ def fetch_trial_node_dirs_to_tmp_dir(trials: List[TrialStub]) -> Dict[TrialStub, else: # Trial was run on remote node fetch_remote_directory_content( - trial.node_ip, remote_dir=trial.local_dir, local_dir=tmpdir + trial.node_ip, remote_dir=trial.local_experiment_path, local_dir=tmpdir ) dirmap[trial] = tmpdir