Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tune] Tuner.restore from a local directory that has moved #29920

Merged
merged 34 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0cd9b6e
Update run config local dir and exp name from Tuner restore path
justinvyu Nov 1, 2022
b67a6bb
Load trials from experiment checkpoint with respect to TrialRunner's …
justinvyu Nov 1, 2022
27e9bb3
Update Trial's CheckpointManager paths according to the paths found i…
justinvyu Nov 1, 2022
a296d6e
Update (pickled) error file to only save relative filename
justinvyu Nov 1, 2022
e70e512
Only update trial checkpoint state instead of passing extra kwargs (g…
justinvyu Nov 1, 2022
92e0ec0
Add restoration test from moved directory
justinvyu Nov 1, 2022
7bfa83e
Test that Tuner.get_results restores checkpoints properly
justinvyu Nov 1, 2022
c015ef6
Lint
justinvyu Nov 2, 2022
dfaa914
Simplify checkpoint path update logic
justinvyu Nov 3, 2022
dc21be3
Fix local dir of trial in test (test implicitly changed directories b…
justinvyu Nov 3, 2022
40b2f45
Fix num errors terminated test
justinvyu Nov 3, 2022
f903f0d
Move path updating logic to trial getstate/setstate
justinvyu Nov 4, 2022
df8cd8b
Add comments
justinvyu Nov 4, 2022
ac76a2d
Fix test to create exp checkpoint with TrialRunner, remove legacy test
justinvyu Nov 4, 2022
8890ac8
Init trials with the correct local dir
justinvyu Nov 4, 2022
477b8f3
Fix test_restore_retry env var spilling over to next test
justinvyu Nov 5, 2022
8d1ac56
Limit usage of overwrite_checkpoint_kwargs to just local_dir
justinvyu Nov 5, 2022
0ead365
Backwards compatibility for loading trial state (add back legacy test)
justinvyu Nov 5, 2022
e0a4166
Merge branch 'master' of https://github.com/ray-project/ray into tune…
justinvyu Nov 5, 2022
e913e4c
Remove commented checkpoint loading code
justinvyu Nov 7, 2022
fd890bc
Use new_local_dir param rather than generic kwargs
justinvyu Nov 8, 2022
1b9bf93
Small nit using trial_runner_state
justinvyu Nov 8, 2022
7b2e707
Add assertion that dir_or_data ref is resolved
justinvyu Nov 8, 2022
54df6d0
Fix ordering of params in dummy tracked checkpoint
justinvyu Nov 8, 2022
1cd11ed
Only save the checkpoint relative dirs
justinvyu Nov 8, 2022
bfb1be5
Test that loaded checkpoint manager deletes checkpoints properly
justinvyu Nov 8, 2022
f497cdc
Address comments
justinvyu Nov 10, 2022
cb03226
Limit env var to context within test
justinvyu Nov 10, 2022
8c35c46
Add unit-test for trial __getstate__ and __setstate__ changes
justinvyu Nov 11, 2022
061646a
Merge branch 'master' of https://github.com/ray-project/ray into tune…
justinvyu Nov 15, 2022
219a285
Add comments to tests where local_dir had to be matched with local_ch…
justinvyu Nov 16, 2022
fa74fa4
Merge branch 'master' of https://github.com/ray-project/ray into tune…
justinvyu Nov 16, 2022
7dbdcf0
Merge branch 'master' of https://github.com/ray-project/ray into tune…
justinvyu Nov 17, 2022
d18efc6
Merge branch 'master' of https://github.com/ray-project/ray into tune…
justinvyu Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,24 @@ def _find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
return max(full_paths)


def _load_trial_from_checkpoint(trial_cp: dict, stub: bool = False, **kwargs):
def _load_trial_from_checkpoint(
trial_cp: dict, stub: bool = False, **overwrite_checkpoint_kwargs
):
# Update trial checkpoint with kwargs passed in
new_trial = Trial(
trial_cp["trainable_name"], stub=stub, _setup_default_resource=False, **kwargs
trial_cp["trainable_name"],
stub=stub,
_setup_default_resource=False,
)
trial_cp.update(overwrite_checkpoint_kwargs)
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
new_trial.__setstate__(trial_cp)
return new_trial


def _load_trials_from_experiment_checkpoint(
experiment_checkpoint: Mapping[str, Any], stub: bool = False
experiment_checkpoint: Mapping[str, Any],
stub: bool = False,
**overwrite_checkpoint_kwargs,
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
) -> List[Trial]:
"""Create trial objects from experiment checkpoint.

Expand All @@ -87,7 +95,11 @@ def _load_trials_from_experiment_checkpoint(

trials = []
for trial_cp in checkpoints:
trials.append(_load_trial_from_checkpoint(trial_cp, stub=stub))
trials.append(
_load_trial_from_checkpoint(
trial_cp, stub=stub, **overwrite_checkpoint_kwargs
)
)

return trials

Expand Down Expand Up @@ -751,18 +763,27 @@ def resume(
)
)

trial_runner_data = runner_state["runner_data"]
# Don't overwrite the current `_local_checkpoint_dir`
# The current directory could be different from the checkpointed
# directory, if the experiment directory has changed.
trial_runner_data.pop("_local_checkpoint_dir", None)

self.__setstate__(runner_state["runner_data"])
if self._search_alg.has_checkpoint(self._local_checkpoint_dir):
self._search_alg.restore_from_dir(self._local_checkpoint_dir)

trials = _load_trials_from_experiment_checkpoint(runner_state)
# Load trials with respect to the `_local_checkpoint_dir`
trials = _load_trials_from_experiment_checkpoint(
runner_state, local_dir=self._local_checkpoint_dir
)
for trial in sorted(trials, key=lambda t: t.last_update_time, reverse=True):
trial_to_add = trial
if trial.status == Trial.ERROR:
if resume_errored:
# Keep trial ID on resume
trial_to_add.error_file = None
trial_to_add.pickled_error_file = None
trial_to_add.error_filename = None
trial_to_add.pickled_error_filename = None
trial_to_add.set_status(Trial.PENDING)
trial_to_add.restore_path = trial.checkpoint.dir_or_data
elif restart_errored:
Expand Down
35 changes: 31 additions & 4 deletions python/ray/tune/experiment/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ def __init__(
self.relative_logdir = None
self.runner = None
self.last_debug = 0
self.error_file = None
self.pickled_error_file = None
self.error_filename = None
self.pickled_error_filename = None

self.trial_name_creator = trial_name_creator
self.trial_dirname_creator = trial_dirname_creator
self.custom_trial_name = None
Expand Down Expand Up @@ -652,6 +653,18 @@ def set_experiment_tag(self, experiment_tag):
self.experiment_tag = experiment_tag
self.invalidate_json_state()

@property
def error_file(self):
if not self.logdir or not self.error_filename:
return None
return os.path.join(self.logdir, self.error_filename)

@property
def pickled_error_file(self):
if not self.logdir or not self.pickled_error_filename:
return None
return os.path.join(self.logdir, self.pickled_error_filename)

def handle_error(self, exc: Optional[Union[TuneError, RayTaskError]] = None):
if isinstance(exc, _TuneRestoreError):
exc = exc.exc
Expand All @@ -667,10 +680,10 @@ def handle_error(self, exc: Optional[Union[TuneError, RayTaskError]] = None):
self.num_failures += 1

if self.logdir:
self.error_file = os.path.join(self.logdir, "error.txt")
self.error_filename = "error.txt"
if isinstance(exc, RayTaskError):
# Piping through the actual error to result grid.
self.pickled_error_file = os.path.join(self.logdir, "error.pkl")
self.pickled_error_filename = "error.pkl"
with open(self.pickled_error_file, "wb") as f:
cloudpickle.dump(exc, f)
with open(self.error_file, "a+") as f:
Expand Down Expand Up @@ -908,6 +921,20 @@ def __setstate__(self, state):
self.__dict__.update(state)
self.stub = stub or getattr(self, "stub", False)

# Update CheckpointManager checkpoint paths with those found in the trial logdir
checkpoint_paths_df = TrainableUtil.get_checkpoints_paths(self.logdir)
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
checkpoint_paths_per_iter = {
row["training_iteration"]: row["chkpt_path"]
for _, row in checkpoint_paths_df.iterrows()
}
for tracked_checkpoint in self.get_trial_checkpoints():
if tracked_checkpoint.storage_mode == CheckpointStorage.PERSISTENT:
training_iteration = tracked_checkpoint.metrics[TRAINING_ITERATION]
if training_iteration in checkpoint_paths_per_iter:
tracked_checkpoint.dir_or_data = checkpoint_paths_per_iter[
training_iteration
]

if not self.stub:
validate_trainable(self.trainable_name)

Expand Down
6 changes: 6 additions & 0 deletions python/ray/tune/impl/tuner_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ def _restore_from_path_or_uri(
if not synced:
# If we didn't sync, use the restore_path local dir
self._experiment_checkpoint_dir = os.path.expanduser(path_or_uri)

# Update local_dir to use the parent of the experiment path
# provided to `Tuner.restore`
experiment_path = Path(self._experiment_checkpoint_dir)
self._run_config.local_dir = str(experiment_path.parent)
self._run_config.name = experiment_path.name
else:
# If we synced, `experiment_checkpoint_dir` will contain a temporary
# directory. Create an experiment checkpoint dir instead and move
Expand Down
64 changes: 60 additions & 4 deletions python/ray/tune/tests/test_result_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

import ray
from ray.air._internal.checkpoint_manager import CheckpointStorage, _TrackedCheckpoint
from ray import tune
from ray.air.checkpoint import Checkpoint
from ray import air, tune
from ray.air import Checkpoint, session
from ray.tune.registry import get_trainable_cls
from ray.tune.result_grid import ResultGrid
from ray.tune.experiment import Trial
Expand Down Expand Up @@ -122,8 +122,8 @@ def test_result_grid_future_checkpoint(ray_start_2_cpus, to_object):
trial.on_checkpoint(
_TrackedCheckpoint(checkpoint_data, storage_mode=CheckpointStorage.MEMORY)
)
trial.pickled_error_file = None
trial.error_file = None
trial.pickled_error_filename = None
trial.error_filename = None
result_grid = ResultGrid(None)

# Internal result grid conversion
Expand Down Expand Up @@ -283,6 +283,62 @@ def test_num_errors_terminated(tmpdir):
shutil.rmtree(experiment_dir)


def test_result_grid_moved_experiment_path(ray_start_2_cpus, tmpdir):
def train_func(config):
data = {"it": 0}
if session.get_checkpoint():
data = session.get_checkpoint().to_dict()

while True:
data["it"] += 1
checkpoint = Checkpoint.from_dict(data)
session.report(data, checkpoint=checkpoint)

num_to_keep = 2
total_iters = 6
tuner = tune.Tuner(
train_func,
tune_config=tune.TuneConfig(
num_samples=1,
),
run_config=air.RunConfig(
name="exp_dir",
local_dir=str(tmpdir / "ray_results"),
stop={"it": total_iters},
checkpoint_config=air.CheckpointConfig(
# Keep the latest checkpoints
checkpoint_score_attribute="it",
num_to_keep=num_to_keep,
),
),
)
result_grid = tuner.fit()

assert result_grid[0].checkpoint
for (checkpoint, metric) in result_grid[0].best_checkpoints:
assert checkpoint
assert len(result_grid[0].best_checkpoints) == num_to_keep

# Move the experiment directory
shutil.move(tmpdir / "ray_results", tmpdir / "moved_ray_results")
os.rename(
tmpdir / "moved_ray_results" / "exp_dir",
tmpdir / "moved_ray_results" / "new_exp_dir",
)

result_grid = tune.Tuner.restore(
str(tmpdir / "moved_ray_results" / "new_exp_dir")
).get_results()
checkpoint_data = []

assert len(result_grid[0].best_checkpoints) == num_to_keep
for (checkpoint, _) in result_grid[0].best_checkpoints:
assert checkpoint
assert "moved_ray_results" in checkpoint._local_path
checkpoint_data.append(checkpoint.to_dict()["it"])
assert set(checkpoint_data) == set([5, 6])


if __name__ == "__main__":
import sys

Expand Down
75 changes: 75 additions & 0 deletions python/ray/tune/tests/test_tuner_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray.tune import Trainable, Callback
from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint
from ray.tune.experiment import Trial
from ray.tune.result_grid import ResultGrid
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner

Expand Down Expand Up @@ -497,6 +498,80 @@ def load_checkpoint(self, checkpoint_path):
assert result.metrics["score"] == 2


@pytest.mark.parametrize("use_tune_run", [True, False])
def test_tuner_restore_from_moved_experiment_path(
ray_start_2_cpus, tmp_path, use_tune_run
):
"""Check that restoring a Tuner from a moved experiment directory works."""
# Create a fail_marker dummy file that causes the first Tune run to fail and
# the second run to succeed
fail_marker = tmp_path / "fail_marker"
fail_marker.write_text("", encoding="utf-8")

old_local_dir = tmp_path / "ray_results"
old_exp_name = "exp_dir"

new_local_dir = tmp_path / "new_ray_results"
new_exp_name = "new_exp_dir"

# Initial training run (that errors out in the middle)
tuner = Tuner(
_train_fn_sometimes_failing,
tune_config=TuneConfig(
num_samples=1,
),
run_config=RunConfig(
name=old_exp_name,
local_dir=str(old_local_dir),
),
param_space={
"failing_hanging": (fail_marker, None),
},
)

results = tuner.fit()
assert len(results.errors) == 1
training_iteration = results[0].metrics["training_iteration"]
assert (
training_iteration == 1
), f"Should only have 1 session.report before erroring, got {training_iteration}"

# Move experiment from `tmp_path/ray_results/exp_dir`
# to `tmp_path/moved_ray_results/new_exp_dir`, changing both `local_dir` and
# the experiment `name`
shutil.move(str(old_local_dir), str(new_local_dir))
os.rename(str(new_local_dir / old_exp_name), str(new_local_dir / new_exp_name))

del tuner
# Remove fail_marker so that the restored Tuner doesn't error again
fail_marker.unlink()

# Restore from moved experiment directory location, and launch resumed training
if use_tune_run:
analysis = tune.run(
_train_fn_sometimes_failing,
name=new_exp_name,
local_dir=str(new_local_dir),
resume="AUTO+ERRORED",
)
results = ResultGrid(analysis)
else:
restore_path = str(new_local_dir / new_exp_name)
tuner = Tuner.restore(restore_path, resume_errored=True)
results = tuner.fit()

assert len(results.errors) == 0
# Check that we restored iter=1, then made 2 calls to session.report -> iter=3
training_iteration = results[0].metrics["training_iteration"]
assert training_iteration == 3, training_iteration

# Make sure that checkpoints are loaded properly
assert results[0].checkpoint and isinstance(results[0].checkpoint, Checkpoint)

# Make sure that we did not create a logdir in the old location
assert not old_local_dir.exists()


if __name__ == "__main__":
import sys

Expand Down