diff --git a/python/ray/train/_internal/storage.py b/python/ray/train/_internal/storage.py index 7e92ecd01af0..c5ce59158b6c 100644 --- a/python/ray/train/_internal/storage.py +++ b/python/ray/train/_internal/storage.py @@ -272,6 +272,44 @@ def _create_directory(fs: pyarrow.fs.FileSystem, fs_path: str) -> None: ) +def get_fs_and_path( + storage_path: Union[str, os.PathLike], + storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, +) -> Tuple[pyarrow.fs.FileSystem, str]: + """Returns the fs and path from a storage path and an optional custom fs. + + Args: + storage_path: A storage path or URI. (ex: s3://bucket/path or /tmp/ray_results) + storage_filesystem: A custom filesystem to use. If not provided, + this will be auto-resolved by pyarrow. If provided, the storage_path + is assumed to be prefix-stripped already, and must be a valid path + on the filesystem. + + Raises: + ValueError: if the storage path is a URI and a custom filesystem is given. + """ + storage_path = str(storage_path) + + if storage_filesystem: + if is_uri(storage_path): + raise ValueError( + "If you specify a custom `storage_filesystem`, the corresponding " + "`storage_path` must be a *path* on that filesystem, not a URI.\n" + "For example: " + "(storage_filesystem=CustomS3FileSystem(), " + "storage_path='s3://bucket/path') should be changed to " + "(storage_filesystem=CustomS3FileSystem(), " + "storage_path='bucket/path')\n" + "This is what you provided: " + f"(storage_filesystem={storage_filesystem}, " + f"storage_path={storage_path})\n" + "Note that this may depend on the custom filesystem you use." + ) + return storage_filesystem, storage_path + + return pyarrow.fs.FileSystem.from_uri(storage_path) + + class _FilesystemSyncer(_BackgroundSyncer): """Syncer between local filesystem and a `storage_filesystem`.""" @@ -402,7 +440,7 @@ def __init__( trial_dir_name: Optional[str] = None, current_checkpoint_index: int = 0, ): - storage_path_provided = storage_path is not None + custom_fs_provided = storage_filesystem is not None self.storage_local_path = _get_defaults_results_dir() # If `storage_path=None`, then set it to the local path. @@ -414,32 +452,12 @@ def __init__( self.current_checkpoint_index = current_checkpoint_index self.sync_config = dataclasses.replace(sync_config) - if storage_filesystem: - # Custom pyarrow filesystem - self.storage_filesystem = storage_filesystem - if is_uri(self.storage_path): - raise ValueError( - "If you specify a custom `storage_filesystem`, the corresponding " - "`storage_path` must be a *path* on that filesystem, not a URI.\n" - "For example: " - "(storage_filesystem=CustomS3FileSystem(), " - "storage_path='s3://bucket/path') should be changed to " - "(storage_filesystem=CustomS3FileSystem(), " - "storage_path='bucket/path')\n" - "This is what you provided: " - f"(storage_filesystem={storage_filesystem}, " - f"storage_path={storage_path})\n" - "Note that this may depend on the custom filesystem you use." - ) - self.storage_fs_path = self.storage_path - else: - ( - self.storage_filesystem, - self.storage_fs_path, - ) = pyarrow.fs.FileSystem.from_uri(self.storage_path) + self.storage_filesystem, self.storage_fs_path = get_fs_and_path( + self.storage_path, storage_filesystem + ) - # The storage prefix is the URI that remains after stripping the - # URI prefix away from the user-provided `storage_path` (using `from_uri`). + # The storage prefix is part of the URI that is stripped away + # from the user-provided `storage_path` by pyarrow's `from_uri`. # Ex: `storage_path="s3://bucket/path?param=1` # -> `storage_prefix=URI` # See the doctests for more examples. @@ -450,14 +468,19 @@ def __init__( Path(self.storage_fs_path) ) - # Only initialize a syncer if a `storage_path` was provided. + # Syncing is always needed if a custom `storage_filesystem` is provided. + # Otherwise, syncing is only needed if storage_local_path + # and storage_fs_path point to different locations. + syncing_needed = ( + custom_fs_provided or self.storage_fs_path != self.storage_local_path + ) self.syncer: Optional[Syncer] = ( _FilesystemSyncer( storage_filesystem=self.storage_filesystem, sync_period=self.sync_config.sync_period, sync_timeout=self.sync_config.sync_timeout, ) - if storage_path_provided + if syncing_needed else None ) diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index a3f7860675c6..883709167e57 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -8,6 +8,8 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, Union import warnings +import pyarrow.fs + import ray import ray.cloudpickle as pickle from ray.air._internal.config import ensure_only_allowed_dataclass_keys_updated @@ -23,7 +25,11 @@ from ray.air.result import Result from ray.train._checkpoint import Checkpoint as NewCheckpoint from ray.train._internal import session -from ray.train._internal.storage import _use_storage_context +from ray.train._internal.storage import ( + _exists_at_fs_path, + _use_storage_context, + get_fs_and_path, +) from ray.train.constants import TRAIN_DATASET_KEY from ray.util import PublicAPI from ray.util.annotations import DeveloperAPI @@ -195,8 +201,9 @@ def __init__( self.preprocessor = preprocessor self.starting_checkpoint = resume_from_checkpoint - # This path should only be set through restore + # These attributes should only be set through `BaseTrainer.restore` self._restore_path = None + self._restore_storage_filesystem = None self._validate_attributes() @@ -214,7 +221,8 @@ def __init__( @classmethod def restore( cls: Type["BaseTrainer"], - path: str, + path: Union[str, os.PathLike], + storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, datasets: Optional[Dict[str, GenDataset]] = None, preprocessor: Optional["Preprocessor"] = None, scaling_config: Optional[ScalingConfig] = None, @@ -298,17 +306,23 @@ def training_loop(self): Returns: BaseTrainer: A restored instance of the class that is calling this method. """ - if not cls.can_restore(path): + if not cls.can_restore(path, storage_filesystem): raise ValueError( f"Invalid restore path: {path}. Make sure that this path exists and " "is the experiment directory that results from a call to " "`trainer.fit()`." ) - trainer_state_path = cls._maybe_sync_down_trainer_state(path) - assert trainer_state_path.exists() + if _use_storage_context(): + fs, fs_path = get_fs_and_path(path, storage_filesystem) + with fs.open_input_file(os.path.join(fs_path, _TRAINER_PKL)) as f: + trainer_cls, param_dict = pickle.loads(f.readall()) + else: + trainer_state_path = cls._maybe_sync_down_trainer_state(path) + assert trainer_state_path.exists() + + with open(trainer_state_path, "rb") as fp: + trainer_cls, param_dict = pickle.load(fp) - with open(trainer_state_path, "rb") as fp: - trainer_cls, param_dict = pickle.load(fp) if trainer_cls is not cls: warnings.warn( f"Invalid trainer type. You are attempting to restore a trainer of type" @@ -355,11 +369,16 @@ def training_loop(self): f"`{cls.__name__}.restore`\n" ) from e trainer._restore_path = path + trainer._restore_storage_filesystem = storage_filesystem return trainer @PublicAPI(stability="alpha") @classmethod - def can_restore(cls: Type["BaseTrainer"], path: Union[str, Path]) -> bool: + def can_restore( + cls: Type["BaseTrainer"], + path: Union[str, os.PathLike], + storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, + ) -> bool: """Checks whether a given directory contains a restorable Train experiment. Args: @@ -370,6 +389,10 @@ def can_restore(cls: Type["BaseTrainer"], path: Union[str, Path]) -> bool: Returns: bool: Whether this path exists and contains the trainer state to resume from """ + if _use_storage_context(): + fs, fs_path = get_fs_and_path(path, storage_filesystem) + return _exists_at_fs_path(fs, os.path.join(fs_path, _TRAINER_PKL)) + return _TRAINER_PKL in list_at_uri(str(path)) def __repr__(self): @@ -589,11 +612,12 @@ def fit(self) -> Result: if self._restore_path: tuner = Tuner.restore( - self._restore_path, + path=self._restore_path, trainable=trainable, param_space=param_space, resume_unfinished=True, resume_errored=True, + storage_filesystem=self._restore_storage_filesystem, ) else: tuner = Tuner( diff --git a/python/ray/train/data_parallel_trainer.py b/python/ray/train/data_parallel_trainer.py index b17b02c5054e..e1a413ef5bf4 100644 --- a/python/ray/train/data_parallel_trainer.py +++ b/python/ray/train/data_parallel_trainer.py @@ -338,9 +338,7 @@ def restore( Union[Callable[[], None], Callable[[Dict], None]] ] = None, train_loop_config: Optional[Dict] = None, - datasets: Optional[Dict[str, GenDataset]] = None, - preprocessor: Optional["Preprocessor"] = None, - scaling_config: Optional[ScalingConfig] = None, + **kwargs, ) -> "DataParallelTrainer": """Restores a DataParallelTrainer from a previously interrupted/failed run. @@ -366,9 +364,7 @@ def restore( path=path, train_loop_per_worker=train_loop_per_worker, train_loop_config=train_loop_config, - datasets=datasets, - preprocessor=preprocessor, - scaling_config=scaling_config, + **kwargs, ) def _validate_attributes(self): diff --git a/python/ray/train/lightning/lightning_trainer.py b/python/ray/train/lightning/lightning_trainer.py index 9ace32cff1a6..7bd0fc1fec0d 100644 --- a/python/ray/train/lightning/lightning_trainer.py +++ b/python/ray/train/lightning/lightning_trainer.py @@ -488,32 +488,6 @@ def _unify_checkpoint_configs( else: return air_ckpt_config - @PublicAPI(stability="alpha") - @classmethod - def restore( - cls: Type["LightningTrainer"], - path: str, - datasets: Optional[Dict[str, GenDataset]] = None, - preprocessor: Optional["Preprocessor"] = None, - scaling_config: Optional[ScalingConfig] = None, - **kwargs, - ) -> "LightningTrainer": - """Restores a LightningTrainer from a previously interrupted/failed run. - - See :meth:`BaseTrainer.restore() ` - for descriptions of the arguments. - - Returns: - LightningTrainer: A restored instance of `LightningTrainer` - """ - return super(LightningTrainer, cls).restore( - path=path, - datasets=datasets, - preprocessor=preprocessor, - scaling_config=scaling_config, - **kwargs, - ) - def _lightning_train_loop_per_worker(config): """Per-worker training loop for a Lightning Trainer.""" diff --git a/python/ray/train/tests/test_new_persistence.py b/python/ray/train/tests/test_new_persistence.py index e7e810717146..849d9cb0feae 100644 --- a/python/ray/train/tests/test_new_persistence.py +++ b/python/ray/train/tests/test_new_persistence.py @@ -9,10 +9,13 @@ import pyarrow.fs +import ray from ray import train, tune +from ray.air._internal.uri_utils import URI from ray.air.constants import EXPR_RESULT_FILE from ray.train._internal.storage import _download_from_fs_path, StorageContext from ray.train._checkpoint import Checkpoint as NewCheckpoint +from ray.train.base_trainer import TrainingFailedError from ray.train.data_parallel_trainer import DataParallelTrainer from ray.air.tests.test_checkpoints import mock_s3_bucket_uri @@ -26,11 +29,20 @@ def dummy_context_manager(): yield "dummy value" -@pytest.fixture(autouse=True) -def enable_new_persistence_mode(monkeypatch): - monkeypatch.setenv("RAY_AIR_NEW_PERSISTENCE_MODE", "1") +@pytest.fixture(scope="module") +def enable_new_persistence_mode(): + with pytest.MonkeyPatch.context() as mp: + mp.setenv("RAY_AIR_NEW_PERSISTENCE_MODE", "1") + yield + mp.setenv("RAY_AIR_NEW_PERSISTENCE_MODE", "0") + + +@pytest.fixture(autouse=True, scope="module") +def ray_start_4_cpus(enable_new_persistence_mode): + # Make sure to set the env var before calling ray.init() + ray.init(num_cpus=4) yield - monkeypatch.setenv("RAY_AIR_NEW_PERSISTENCE_MODE", "0") + ray.shutdown() def _create_mock_custom_fs(custom_fs_root_dir: Path) -> pyarrow.fs.FileSystem: @@ -309,11 +321,12 @@ def test_trainer( """ TODO(justinvyu): Test for these once implemented: - artifacts - - restoration, train.get_checkpoint {storage_path}/{exp_name} - ├── experiment_state-2023-07-28_10-00-38.json + ├── experiment_state-2023-07-28_10-00-38.json <- Initial exp state ├── basic-variant-state-2023-07-28_10-00-38.json + ├── experiment_state-2023-07-28_10-01-38.json <- Restored exp state + ├── basic-variant-state-2023-07-28_10-01-38.json ├── trainer.pkl ├── tuner.pkl └── DataParallelTrainer_46367_00000_0_... @@ -358,11 +371,19 @@ def test_trainer( name=exp_name, verbose=0, checkpoint_config=checkpoint_config, - failure_config=train.FailureConfig(max_failures=2), + failure_config=train.FailureConfig(max_failures=1), ), ) print("\nStarting initial run.\n") - result = trainer.fit() + with pytest.raises(TrainingFailedError): + result = trainer.fit() + + print("\nStarting manually restored run.\n") + restored_trainer = DataParallelTrainer.restore( + path=str(URI(storage_path or str(LOCAL_CACHE_DIR)) / exp_name), + storage_filesystem=storage_filesystem, + ) + result = restored_trainer.fit() with monkeypatch.context() as m: # This is so that the `resume_from_checkpoint` run doesn't mess up the @@ -390,10 +411,12 @@ def test_trainer( exp_dir = local_inspect_dir / exp_name # Files synced by the driver - assert len(list(exp_dir.glob("basic-variant-state-*"))) == 1 - assert len(list(exp_dir.glob("experiment_state-*"))) == 1 assert len(list(exp_dir.glob("tuner.pkl"))) == 1 assert len(list(exp_dir.glob("trainer.pkl"))) == 1 + # 2 copies of these files: + # 1 for the initial run, and 1 for the manually restored run. + assert len(list(exp_dir.glob("basic-variant-state-*"))) == 2 + assert len(list(exp_dir.glob("experiment_state-*"))) == 2 # Files synced by the worker assert len(list(exp_dir.glob("DataParallelTrainer_*"))) == 1 diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index 9d4530aa8908..867b0f5baf8f 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -21,6 +21,7 @@ EXPR_PARAM_FILE, TRAINING_ITERATION, ) +from ray.train._internal.storage import _use_storage_context from ray.tune.syncer import SyncConfig from ray.tune.utils import flatten_dict from ray.tune.utils.serialization import TuneFunctionDecoder @@ -980,7 +981,9 @@ def _get_trial_paths(self) -> List[str]: for trial_json_state, path in self._checkpoints_and_paths: try: trial = Trial.from_json_state(trial_json_state, stub=True) - trial.local_experiment_path = str(path) + # TODO(justinvyu): [handle_moved_storage_path] + if not _use_storage_context(): + trial.local_experiment_path = str(path) except Exception: logger.warning( f"Could not load trials from experiment checkpoint. " diff --git a/python/ray/tune/execution/tune_controller.py b/python/ray/tune/execution/tune_controller.py index 61e675baef7b..7aad4373a975 100644 --- a/python/ray/tune/execution/tune_controller.py +++ b/python/ray/tune/execution/tune_controller.py @@ -482,9 +482,6 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: and return them as a list of Trial objects. """ if _use_storage_context(): - raise NotImplementedError( - "Restoration is not fully working. TODO for a follow-up PR." - ) assert not experiment_dir, "Remove the `experiment_dir` argument." experiment_dir = self._storage.experiment_local_path else: @@ -534,11 +531,17 @@ def restore_from_dir(self, experiment_dir: Optional[str] = None) -> List[Trial]: # The following properties may be updated on restoration # Ex: moved local/cloud experiment directory - # ATTN: Set `local_experiment_path` to update trial checkpoints! - trial.local_experiment_path = self._legacy_local_experiment_path - trial.remote_experiment_path = self._legacy_remote_experiment_path - trial.sync_config = self._legacy_sync_config - trial.experiment_dir_name = self._legacy_experiment_dir_name + if _use_storage_context(): + # Propagate updated storage ctx properties to the trial's restored copy. + # TODO(justinvyu): [handle_moved_storage_path] + trial.storage.storage_path = self._storage.storage_path + trial.storage.experiment_dir_name = self._storage.experiment_dir_name + else: + # ATTN: Set `local_experiment_path` to update trial checkpoints! + trial.local_experiment_path = self._legacy_local_experiment_path + trial.remote_experiment_path = self._legacy_remote_experiment_path + trial.sync_config = self._legacy_sync_config + trial.experiment_dir_name = self._legacy_experiment_dir_name # Avoid creating logdir in client mode for returned trial results, # since the dir might not be creatable locally. @@ -603,7 +606,10 @@ def resume( trial_to_add.error_filename = None trial_to_add.pickled_error_filename = None trial_to_add.set_status(Trial.PENDING) - trial_to_add.restore_path = trial.checkpoint.dir_or_data + if not _use_storage_context(): + # TODO(justinvyu): Remove this. + # Not needed since trial.checkpoint will be used anyways. + trial_to_add.restore_path = trial.checkpoint.dir_or_data elif restart_errored: trial_to_add = trial.reset() trial_to_add.restore_path = None diff --git a/python/ray/tune/impl/tuner_internal.py b/python/ray/tune/impl/tuner_internal.py index c9740b4c4ae4..7f95c0541745 100644 --- a/python/ray/tune/impl/tuner_internal.py +++ b/python/ray/tune/impl/tuner_internal.py @@ -19,6 +19,8 @@ Tuple, ) +import pyarrow.fs + import ray import ray.cloudpickle as pickle from ray.util import inspect_serializability @@ -26,7 +28,11 @@ from ray.air._internal.uri_utils import URI from ray.air._internal.usage import AirEntrypoint from ray.air.config import RunConfig, ScalingConfig -from ray.train._internal.storage import _use_storage_context, StorageContext +from ray.train._internal.storage import ( + _use_storage_context, + StorageContext, + get_fs_and_path, +) from ray.tune import Experiment, TuneError, ExperimentAnalysis from ray.tune.execution.experiment_state import _ResumeConfig from ray.tune.tune import _Config @@ -87,6 +93,7 @@ class TunerInternal: def __init__( self, restore_path: str = None, + storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, resume_config: Optional[_ResumeConfig] = None, trainable: Optional[TrainableTypeOrTrainer] = None, param_space: Optional[Dict[str, Any]] = None, @@ -115,6 +122,7 @@ def __init__( trainable=trainable, overwrite_param_space=param_space, resume_config=resume_config, + storage_filesystem=storage_filesystem, ) return @@ -132,7 +140,7 @@ def __init__( self._is_restored = False self._tuner_kwargs = copy.deepcopy(_tuner_kwargs) or {} ( - self._experiment_checkpoint_dir, + self._legacy_experiment_checkpoint_dir, self._experiment_dir_name, ) = self.setup_create_experiment_checkpoint_dir( self.converted_trainable, self._run_config @@ -145,7 +153,7 @@ def __init__( # without allowing for checkpointing tuner and trainable. # Thus this has to happen before tune.run() so that we can have something # to restore from. - experiment_checkpoint_path = Path(self._experiment_checkpoint_dir) + experiment_checkpoint_path = Path(self._legacy_experiment_checkpoint_dir) with open(experiment_checkpoint_path / _TUNER_PKL, "wb") as fp: pickle.dump(self.__getstate__(), fp) @@ -335,7 +343,7 @@ def _set_param_space_on_restore( ) def _load_tuner_state( - self, tuner_pkl_path: Path + self, tuner_state: Dict[str, Any] ) -> Tuple[Optional[str], Optional[List[str]]]: """Loads Tuner state from the previously saved `tuner.pkl`. @@ -347,38 +355,13 @@ def _load_tuner_state( tuple: of `(old_trainable_name, flattened_param_space_keys)` used for validating the re-specified `trainable` and `param_space`. """ - if not tuner_pkl_path.exists(): - raise RuntimeError( - f"Could not find Tuner state in restore directory. Did you pass" - f"the correct path (the top-level experiment directory?) Got: " - f"{tuner_pkl_path.parent}" - ) - - with open(tuner_pkl_path, "rb") as fp: - tuner_state = pickle.load(fp) - - if isinstance(tuner_state, TunerInternal): - # TODO(ml-team): Remove in 2.7. - # Backwards compatibility: ray<=2.4 pickles the full Tuner object - # within `tuner.pkl`. ray>=2.5 pickles the object state as a dict. - tuner: TunerInternal = tuner_state - self.__setstate__(tuner.__getstate__()) - - logger.warning( - "You are restoring a Tune experiment that was run with an older " - "version of Ray. Note that backwards compatibility of restoring " - "this experiment will only be guaranteed until Ray 2.7." - ) - - old_trainable_name, flattened_param_space_keys = None, None - else: - # NOTE: These are magic keys used for validating restore args. - old_trainable_name = tuner_state.pop("__trainable_name", None) - flattened_param_space_keys = tuner_state.pop( - "__flattened_param_space_keys", None - ) + # NOTE: These are magic keys used for validating restore args. + old_trainable_name = tuner_state.pop("__trainable_name", None) + flattened_param_space_keys = tuner_state.pop( + "__flattened_param_space_keys", None + ) - self.__setstate__(tuner_state) + self.__setstate__(tuner_state) return old_trainable_name, flattened_param_space_keys @@ -388,16 +371,29 @@ def _restore_from_path_or_uri( trainable: TrainableTypeOrTrainer, overwrite_param_space: Optional[Dict[str, Any]], resume_config: _ResumeConfig, + storage_filesystem: Optional[pyarrow.fs.FileSystem], ): - # Sync down from cloud storage if needed - ( - restoring_from_cloud, - local_experiment_checkpoint_dir, - ) = self._maybe_sync_down_tuner_state(path_or_uri) - experiment_checkpoint_path = Path(local_experiment_checkpoint_dir) + if _use_storage_context(): + fs, fs_path = get_fs_and_path(path_or_uri, storage_filesystem) + with fs.open_input_file(os.path.join(fs_path, _TUNER_PKL)) as f: + tuner_state = pickle.loads(f.readall()) + + restoring_from_cloud = None + local_experiment_checkpoint_dir = None + experiment_checkpoint_path = None + else: + # Sync down from cloud storage if needed + ( + restoring_from_cloud, + local_experiment_checkpoint_dir, + ) = self._maybe_sync_down_tuner_state(path_or_uri) + experiment_checkpoint_path = Path(local_experiment_checkpoint_dir) + + with open(experiment_checkpoint_path / _TUNER_PKL, "rb") as fp: + tuner_state = pickle.load(fp) old_trainable_name, flattened_param_space_keys = self._load_tuner_state( - experiment_checkpoint_path / _TUNER_PKL + tuner_state ) # Perform validation and set the re-specified `trainable` and `param_space` @@ -410,9 +406,8 @@ def _restore_from_path_or_uri( ) # Update RunConfig to reflect changes in the experiment directory - path_or_uri_obj: Union[Path, URI] = ( - URI(path_or_uri) if restoring_from_cloud else experiment_checkpoint_path - ) + path_or_uri_obj = URI(path_or_uri) + # Infer the `storage_path` and run `name` of the restored run using the # experiment directory. # Ex: ~/ray_results/exp_name -> ~/ray_results, exp_name @@ -420,22 +415,32 @@ def _restore_from_path_or_uri( self._run_config.name = path_or_uri_obj.name self._run_config.storage_path = str(path_or_uri_obj.parent) - # Set the experiment directory - if not restoring_from_cloud: - self._experiment_checkpoint_dir = local_experiment_checkpoint_dir - else: - # If we synced, `experiment_checkpoint_dir` will contain a temporary - # directory. Create an experiment checkpoint dir instead and move - # our data there. - new_exp_path, new_exp_name = self.setup_create_experiment_checkpoint_dir( + if _use_storage_context(): + _, self._experiment_dir_name = self.setup_create_experiment_checkpoint_dir( self.converted_trainable, self._run_config ) - new_exp_path = Path(new_exp_path) - for file_dir in experiment_checkpoint_path.glob("*"): - file_dir.replace(new_exp_path / file_dir.name) - shutil.rmtree(experiment_checkpoint_path) - self._experiment_checkpoint_dir = str(new_exp_path) - self._experiment_dir_name = str(new_exp_name) + + self._legacy_experiment_checkpoint_dir = None + else: + # Set the experiment directory + if not restoring_from_cloud: + self._legacy_experiment_checkpoint_dir = local_experiment_checkpoint_dir + else: + # If we synced, `experiment_checkpoint_dir` will contain a temporary + # directory. Create an experiment checkpoint dir instead and move + # our data there. + ( + new_exp_path, + new_exp_name, + ) = self.setup_create_experiment_checkpoint_dir( + self.converted_trainable, self._run_config + ) + new_exp_path = Path(new_exp_path) + for file_dir in experiment_checkpoint_path.glob("*"): + file_dir.replace(new_exp_path / file_dir.name) + shutil.rmtree(experiment_checkpoint_path) + self._legacy_experiment_checkpoint_dir = str(new_exp_path) + self._experiment_dir_name = str(new_exp_name) # Load the experiment results at the point where it left off. try: @@ -551,7 +556,7 @@ def setup_create_experiment_checkpoint_dir( # This has to be done through a function signature (@property won't do). def get_experiment_checkpoint_dir(self) -> str: - return self._experiment_checkpoint_dir + return self._legacy_experiment_checkpoint_dir @property def trainable(self) -> TrainableTypeOrTrainer: @@ -601,7 +606,6 @@ def _convert_trainable(self, trainable: TrainableTypeOrTrainer) -> TrainableType def fit(self) -> ResultGrid: trainable = self.converted_trainable - assert self._experiment_checkpoint_dir param_space = copy.deepcopy(self.param_space) if not self._is_restored: analysis = self._fit_internal(trainable, param_space) @@ -690,7 +694,7 @@ def _get_tune_run_arguments(self, trainable: TrainableType) -> Dict[str, Any]: stop=self._run_config.stop, max_failures=self._run_config.failure_config.max_failures, checkpoint_config=checkpoint_config, - _experiment_checkpoint_dir=self._experiment_checkpoint_dir, + _experiment_checkpoint_dir=self._legacy_experiment_checkpoint_dir, raise_on_failed_trial=False, fail_fast=(self._run_config.failure_config.fail_fast), progress_reporter=self._run_config.progress_reporter, diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index f0ce90a9c36b..ea846359412b 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -9,7 +9,6 @@ import pytest import ray -import ray.cloudpickle as ray_pickle from ray import train, tune from ray.train import ( Checkpoint, @@ -1176,18 +1175,6 @@ def train_fn(config): assert r.config["test2"].name in ["11", "12", "13", "14"] -def test_tuner_pkl_backwards_compatibility(tmp_path, propagate_logs, caplog): - tuner_internal = Tuner( - _train_fn_sometimes_failing, param_space={"a": 1} - )._local_tuner - with open(tmp_path / "tuner.pkl", "wb") as f: - ray_pickle.dump(tuner_internal, f) - - with caplog.at_level(logging.WARNING, "ray.tune.impl.tuner_internal"): - tuner_internal._load_tuner_state(tmp_path / "tuner.pkl") - assert "run with an older version of Ray" in caplog.text - - if __name__ == "__main__": import sys diff --git a/python/ray/tune/tuner.py b/python/ray/tune/tuner.py index 792fb2af4933..88ff6c2416df 100644 --- a/python/ray/tune/tuner.py +++ b/python/ray/tune/tuner.py @@ -1,13 +1,19 @@ import logging -from pathlib import Path +import os from typing import Any, Callable, Dict, Optional, Type, Union, TYPE_CHECKING -import ray +import pyarrow.fs +import ray from ray.air.config import RunConfig from ray.air._internal.remote_storage import list_at_uri from ray.air._internal.usage import AirEntrypoint from ray.air.util.node import _force_on_current_node +from ray.train._internal.storage import ( + _exists_at_fs_path, + _use_storage_context, + get_fs_and_path, +) from ray.tune import TuneError from ray.tune.execution.experiment_state import _ResumeConfig from ray.tune.experimental.output import ( @@ -185,6 +191,7 @@ def restore( resume_errored: bool = False, restart_errored: bool = False, param_space: Optional[Dict[str, Any]] = None, + storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, ) -> "Tuner": """Restores Tuner after a previously failed run. @@ -246,6 +253,7 @@ def restore( resume_config=resume_config, trainable=trainable, param_space=param_space, + storage_filesystem=storage_filesystem, ) return Tuner(_tuner_internal=tuner_internal) else: @@ -256,11 +264,16 @@ def restore( resume_config=resume_config, trainable=trainable, param_space=param_space, + storage_filesystem=storage_filesystem, ) return Tuner(_tuner_internal=tuner_internal) @classmethod - def can_restore(cls, path: Union[str, Path]) -> bool: + def can_restore( + cls, + path: Union[str, os.PathLike], + storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, + ) -> bool: """Checks whether a given directory contains a restorable Tune experiment. Usage Pattern: @@ -301,6 +314,10 @@ def train_fn(config): Returns: bool: True if this path exists and contains the Tuner state to resume from """ + if _use_storage_context(): + fs, fs_path = get_fs_and_path(path, storage_filesystem) + return _exists_at_fs_path(fs, os.path.join(fs_path, _TUNER_PKL)) + return _TUNER_PKL in list_at_uri(str(path)) def _prepare_remote_tuner_for_jupyter_progress_reporting(self):