Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Allow more versatile experiment analysis loading #20181

Merged
merged 3 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from ray.tune.result import DEFAULT_METRIC, EXPR_PROGRESS_FILE, \
EXPR_RESULT_FILE, EXPR_PARAM_FILE, CONFIG_PREFIX, TRAINING_ITERATION
from ray.tune.trial import Trial
from ray.tune.trial_runner import (find_newest_experiment_checkpoint,
load_trials_from_experiment_checkpoint)
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils.util import unflattened_lookup

Expand Down Expand Up @@ -428,10 +430,25 @@ def __init__(self,
default_mode: Optional[str] = None):
experiment_checkpoint_path = os.path.expanduser(
experiment_checkpoint_path)
if not os.path.isfile(experiment_checkpoint_path):

if os.path.isdir(experiment_checkpoint_path):
# Case 1: Dir specified, find latest checkpoint.
latest_checkpoint = find_newest_experiment_checkpoint(
experiment_checkpoint_path)
if not latest_checkpoint:
raise ValueError(
f"The directory `{experiment_checkpoint_path}` does not "
f"contain a Ray Tune experiment checkpoint.")
elif not os.path.isfile(experiment_checkpoint_path):
# Case 2: File specified, but does not exist.
raise ValueError(
"{} is not a valid file.".format(experiment_checkpoint_path))
with open(experiment_checkpoint_path) as f:
f"The file `{experiment_checkpoint_path}` does not "
f"exist and cannot be loaded for experiment analysis.")
else:
# Case 3: File specified, use as latest checkpoint.
latest_checkpoint = experiment_checkpoint_path

with open(latest_checkpoint) as f:
_experiment_state = json.load(f, cls=TuneFunctionDecoder)
self._experiment_state = _experiment_state

Expand All @@ -444,9 +461,12 @@ def __init__(self,
]
self.trials = trials

super(ExperimentAnalysis, self).__init__(
os.path.dirname(experiment_checkpoint_path), default_metric,
default_mode)
# Use current dir per default
experiment_dir = os.path.dirname(
os.path.abspath(latest_checkpoint)) or os.getcwd()

super(ExperimentAnalysis, self).__init__(experiment_dir,
default_metric, default_mode)

@property
def best_trial(self) -> Trial:
Expand Down Expand Up @@ -501,6 +521,11 @@ def best_checkpoint(self) -> str:
"`get_best_checkpoint(trial, metric, mode)` method to set the "
"metric and mode explicitly.")
best_trial = self.best_trial
if not best_trial:
raise ValueError(
f"No best trial found. Please check if you specified the "
f"correct default metric ({self.default_metric}) and mode "
f"({self.default_mode}).")
return self.get_best_checkpoint(best_trial, self.default_metric,
self.default_mode)

Expand Down Expand Up @@ -758,6 +783,17 @@ def _get_trial_paths(self) -> List[str]:
_trial_paths = [
checkpoint["logdir"] for checkpoint in self._checkpoints
]
try:
self.trials = load_trials_from_experiment_checkpoint(
self._experiment_state, stub=True)
except Exception as e:
logger.warning(
f"Could not load trials from experiment checkpoint. "
f"This means your experiment checkpoint is likely faulty "
f"or incomplete, and you won't have access to all "
f"analysis methods. "
f"Observed error: {e}")

if not _trial_paths:
raise TuneError("No trials found.")
return _trial_paths
16 changes: 13 additions & 3 deletions python/ray/tune/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,20 @@ def __init__(self,
trial_name_creator=None,
trial_dirname_creator=None,
log_to_file=None,
max_failures=0):
max_failures=0,
stub=False):
"""Initialize a new trial.

The args here take the same meaning as the command line flags defined
in ray.tune.config_parser.
"""
validate_trainable(trainable_name)
# If this is set, trainables are not validated or looked up.
# This can be used e.g. to initialize Trial objects from checkpoints
# without loading the trainable first.
self.stub = stub
krfricke marked this conversation as resolved.
Show resolved Hide resolved

if not self.stub:
validate_trainable(trainable_name)
# Trial config
self.trainable_name = trainable_name
self.trial_id = Trial.generate_id() if trial_id is None else trial_id
Expand Down Expand Up @@ -687,6 +694,8 @@ def update_last_result(self, result, terminate=False):
self.invalidate_json_state()

def get_trainable_cls(self):
if self.stub:
return None
return get_trainable_cls(self.trainable_name)

def is_finished(self):
Expand Down Expand Up @@ -786,7 +795,8 @@ def __setstate__(self, state):
state[key] = cloudpickle.loads(hex_to_binary(state[key]))

self.__dict__.update(state)
validate_trainable(self.trainable_name)
if not self.stub:
validate_trainable(self.trainable_name)

# Avoid creating logdir in client mode for returned trial results,
# since the dir might not be creatable locally. TODO(ekl) thsi is kind
Expand Down
40 changes: 26 additions & 14 deletions python/ray/tune/trial_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, Union
from typing import Any, List, Mapping, Optional, Union

import click
from datetime import datetime
Expand Down Expand Up @@ -36,7 +36,7 @@
logger = logging.getLogger(__name__)


def _find_newest_ckpt(ckpt_dir) -> Optional[str]:
def find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]:
"""Returns path to most recently modified checkpoint."""
full_paths = [
os.path.join(ckpt_dir, fname) for fname in os.listdir(ckpt_dir)
Expand All @@ -47,6 +47,27 @@ def _find_newest_ckpt(ckpt_dir) -> Optional[str]:
return max(full_paths)


def load_trials_from_experiment_checkpoint(
experiment_checkpoint: Mapping[str, Any],
stub: bool = False) -> List[Trial]:
"""Create trial objects from experiment checkpoint.

Given an experiment checkpoint (TrialRunner state dict), return
list of trials."""
checkpoints = [
json.loads(cp, cls=TuneFunctionDecoder) if isinstance(cp, str) else cp
for cp in experiment_checkpoint["checkpoints"]
]

trials = []
for trial_cp in checkpoints:
new_trial = Trial(trial_cp["trainable_name"], stub=stub)
new_trial.__setstate__(trial_cp)
trials.append(new_trial)

return trials


class _ExperimentCheckpointManager:
"""Helper class for managing experiment-level checkpoints.

Expand Down Expand Up @@ -592,7 +613,8 @@ def resume(self, run_errored_only=False):
Requires user to manually re-register their objects. Also stops
all ongoing trials.
"""
newest_ckpt_path = _find_newest_ckpt(self._local_checkpoint_dir)
newest_ckpt_path = find_newest_experiment_checkpoint(
self._local_checkpoint_dir)

if not newest_ckpt_path:
raise ValueError(f"Tried to resume from checkpoint dir "
Expand All @@ -613,17 +635,7 @@ def resume(self, run_errored_only=False):
if self._search_alg.has_checkpoint(self._local_checkpoint_dir):
self._search_alg.restore_from_dir(self._local_checkpoint_dir)

checkpoints = [
json.loads(cp, cls=TuneFunctionDecoder)
if isinstance(cp, str) else cp
for cp in runner_state["checkpoints"]
]

trials = []
for trial_cp in checkpoints:
new_trial = Trial(trial_cp["trainable_name"])
new_trial.__setstate__(trial_cp)
trials += [new_trial]
trials = load_trials_from_experiment_checkpoint(runner_state)
for trial in sorted(
trials, key=lambda t: t.last_update_time, reverse=True):
if run_errored_only and trial.status == Trial.ERROR:
Expand Down
4 changes: 2 additions & 2 deletions release/tune_tests/cloud_tests/workloads/run_cloud_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

import ray
from ray.tune.syncer import NodeSyncer, detect_sync_to_driver, get_node_syncer
from ray.tune.trial_runner import _find_newest_ckpt
from ray.tune.trial_runner import find_newest_experiment_checkpoint
from ray.tune.utils.serialization import TuneFunctionDecoder

TUNE_SCRIPT = os.path.join(os.path.dirname(__file__), "_tune_script.py")
Expand Down Expand Up @@ -560,7 +560,7 @@ def fetch_bucket_contents_to_tmp_dir(bucket: str) -> str:

def load_experiment_checkpoint_from_state_file(
experiment_dir: str) -> ExperimentStateCheckpoint:
newest_ckpt_path = _find_newest_ckpt(experiment_dir)
newest_ckpt_path = find_newest_experiment_checkpoint(experiment_dir)
with open(newest_ckpt_path, "r") as f:
runner_state = json.load(f, cls=TuneFunctionDecoder)

Expand Down