Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Tutorial UX Changes #4990

Merged
merged 6 commits into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 71 additions & 23 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,23 @@ class ExperimentAnalysis(object):
>>> experiment_path="~/tune_results/my_exp")
"""

def __init__(self, experiment_path):
def __init__(self, experiment_path, trials=None):
"""Initializer.

Args:
experiment_path (str): Path to where experiment is located.
trials (list|None): List of trials that can be accessed via
`analysis.trials`.
"""
experiment_path = os.path.expanduser(experiment_path)
if not os.path.isdir(experiment_path):
raise TuneError(
"{} is not a valid directory.".format(experiment_path))
experiment_state_paths = glob.glob(
os.path.join(experiment_path, "experiment_state*.json"))
if not experiment_state_paths:
raise TuneError("No experiment state found!")
raise TuneError(
"No experiment state found in {}!".format(experiment_path))
experiment_filename = max(
list(experiment_state_paths)) # if more than one, pick latest
with open(os.path.join(experiment_path, experiment_filename)) as f:
Expand All @@ -65,10 +73,27 @@ def __init__(self, experiment_path):
raise TuneError("Experiment state invalid; no checkpoints found.")
self._checkpoints = self._experiment_state["checkpoints"]
self._scrubbed_checkpoints = unnest_checkpoints(self._checkpoints)
self.trials = trials
self._dataframe = None

def get_all_trial_dataframes(self):
trial_dfs = {}
for checkpoint in self._checkpoints:
logdir = checkpoint["logdir"]
progress = max(glob.glob(os.path.join(logdir, "progress.csv")))
trial_dfs[checkpoint["trial_id"]] = pd.read_csv(progress)
return trial_dfs

def dataframe(self, refresh=False):
"""Returns a pandas.DataFrame object constructed from the trials.

def dataframe(self):
"""Returns a pandas.DataFrame object constructed from the trials."""
return pd.DataFrame(self._scrubbed_checkpoints)
Args:
refresh (bool): Clears the cache which may have an existing copy.

"""
if self._dataframe is None or refresh:
self._dataframe = pd.DataFrame(self._scrubbed_checkpoints)
return self._dataframe

def stats(self):
"""Returns a dictionary of the statistics of the experiment."""
Expand All @@ -87,22 +112,45 @@ def trial_dataframe(self, trial_id):
return pd.read_csv(progress)
raise ValueError("Trial id {} not found".format(trial_id))

def get_best_trainable(self, metric, trainable_cls):
"""Returns the best Trainable based on the experiment metric."""
return trainable_cls(config=self.get_best_config(metric))

def get_best_config(self, metric):
"""Retrieve the best config from the best trial."""
return self._get_best_trial(metric)["config"]

def _get_best_trial(self, metric):
"""Retrieve the best trial based on the experiment metric."""
return max(
def get_best_trainable(self, metric, trainable_cls, mode="max"):
"""Returns the best Trainable based on the experiment metric.

Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].

"""
return trainable_cls(config=self.get_best_config(metric, mode=mode))

def get_best_config(self, metric, mode="max"):
"""Retrieve the best config from the best trial.

Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].

"""
return self.get_best_info(metric, flatten=False, mode=mode)["config"]

def get_best_logdir(self, metric, mode="max"):
df = self.dataframe()
if mode == "max":
return df.iloc[df[metric].idxmax()].logdir
elif mode == "min":
return df.iloc[df[metric].idxmin()].logdir

def get_best_info(self, metric, mode="max", flatten=True):
"""Retrieve the best trial based on the experiment metric.

Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
flatten (bool): Assumes trial info is flattened, where
nested entries are concatenated like `info:metric`.
"""
optimize_op = max if mode == "max" else min
if flatten:
return optimize_op(
self._scrubbed_checkpoints, key=lambda d: d.get(metric, 0))
return optimize_op(
self._checkpoints, key=lambda d: d["last_result"].get(metric, 0))

def _get_sorted_trials(self, metric):
"""Retrive trials in sorted order based on the experiment metric."""
return sorted(
self._checkpoints,
key=lambda d: d["last_result"].get(metric, 0),
reverse=True)
4 changes: 2 additions & 2 deletions python/ray/tune/examples/track_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D)

from ray.tune import track
from ray.tune.examples.utils import TuneKerasCallback, get_mnist_data
from ray.tune.examples.utils import TuneReporterCallback, get_mnist_data

parser = argparse.ArgumentParser()
parser.add_argument(
Expand Down Expand Up @@ -63,7 +63,7 @@ def train_mnist(args):
batch_size=batch_size,
epochs=epochs,
validation_data=(x_test, y_test),
callbacks=[TuneKerasCallback(track.metric)])
callbacks=[TuneReporterCallback(track.metric)])
track.shutdown()


Expand Down
8 changes: 4 additions & 4 deletions python/ray/tune/examples/tune_mnist_keras.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from keras.models import Sequential
from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D)

from ray.tune.examples.utils import (TuneKerasCallback, get_mnist_data,
set_keras_threads)
from ray.tune.integration.keras import TuneReporterCallback
from ray.tune.examples.utils import get_mnist_data, set_keras_threads

parser = argparse.ArgumentParser()
parser.add_argument(
Expand Down Expand Up @@ -52,7 +52,7 @@ def train_mnist(config, reporter):
epochs=epochs,
verbose=0,
validation_data=(x_test, y_test),
callbacks=[TuneKerasCallback(reporter)])
callbacks=[TuneReporterCallback(reporter)])


if __name__ == "__main__":
Expand All @@ -63,7 +63,7 @@ def train_mnist(config, reporter):

ray.init()
sched = AsyncHyperBandScheduler(
time_attr="timesteps_total",
time_attr="training_iteration",
metric="mean_accuracy",
mode="max",
max_t=400,
Expand Down
36 changes: 18 additions & 18 deletions python/ray/tune/examples/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,9 @@
import keras
from keras.datasets import mnist
from keras import backend as K


class TuneKerasCallback(keras.callbacks.Callback):
def __init__(self, reporter, logs={}):
self.reporter = reporter
self.iteration = 0
super(TuneKerasCallback, self).__init__()

def on_train_end(self, epoch, logs={}):
self.reporter(
timesteps_total=self.iteration,
done=1,
mean_accuracy=logs.get("acc"))

def on_batch_end(self, batch, logs={}):
self.iteration += 1
self.reporter(
timesteps_total=self.iteration, mean_accuracy=logs["acc"])
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder


def get_mnist_data():
Expand Down Expand Up @@ -53,6 +38,16 @@ def get_mnist_data():
return x_train, y_train, x_test, y_test, input_shape


def get_iris_data(test_size=0.2):
iris_data = load_iris()
x = iris_data.data
y = iris_data.target.reshape(-1, 1)
encoder = OneHotEncoder(sparse=False)
y = encoder.fit_transform(y)
train_x, test_x, train_y, test_y = train_test_split(x, y)
return train_x, train_y, test_x, test_y


def set_keras_threads(threads):
# We set threads here to avoid contention, as Keras
# is heavily parallelized across multiple cores.
Expand All @@ -61,3 +56,8 @@ def set_keras_threads(threads):
config=K.tf.ConfigProto(
intra_op_parallelism_threads=threads,
inter_op_parallelism_threads=threads)))


def TuneKerasCallback(*args, **kwargs):
raise DeprecationWarning("TuneKerasCallback is now "
"tune.integration.keras.TuneReporterCallback.")
8 changes: 8 additions & 0 deletions python/ray/tune/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ def _register_if_needed(cls, run_object):
else:
raise TuneError("Improper 'run' - not string nor trainable.")

@property
def local_dir(self):
return self.spec.get("local_dir")

@property
def checkpoint_dir(self):
return os.path.join(self.spec["local_dir"], self.name)


def convert_to_experiment_list(experiments):
"""Produces a list of Experiment objects.
Expand Down
Empty file.
34 changes: 34 additions & 0 deletions python/ray/tune/integration/keras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import keras
from ray.tune import track


class TuneReporterCallback(keras.callbacks.Callback):
def __init__(self, reporter=None, freq="batch", logs={}):
self.reporter = reporter or track.log
self.iteration = 0
if freq not in ["batch", "epoch"]:
raise ValueError("{} not supported as a frequency.".format(freq))
self.freq = freq
super(TuneReporterCallback, self).__init__()

def on_batch_end(self, batch, logs={}):
if not self.freq == "batch":
return
self.iteration += 1
for metric in list(logs):
if "loss" in metric and "neg_" not in metric:
logs["neg_" + metric] = -logs[metric]
self.reporter(keras_info=logs, mean_accuracy=logs["acc"])

def on_epoch_end(self, batch, logs={}):
if not self.freq == "epoch":
return
self.iteration += 1
for metric in list(logs):
if "loss" in metric and "neg_" not in metric:
logs["neg_" + metric] = -logs[metric]
self.reporter(keras_info=logs, mean_accuracy=logs["acc"])
6 changes: 4 additions & 2 deletions python/ray/tune/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from ray.tune.schedulers.trial_scheduler import TrialScheduler, FIFOScheduler
from ray.tune.schedulers.hyperband import HyperBandScheduler
from ray.tune.schedulers.async_hyperband import AsyncHyperBandScheduler
from ray.tune.schedulers.async_hyperband import (AsyncHyperBandScheduler,
ASHAScheduler)
from ray.tune.schedulers.median_stopping_rule import MedianStoppingRule
from ray.tune.schedulers.pbt import PopulationBasedTraining

__all__ = [
"TrialScheduler", "HyperBandScheduler", "AsyncHyperBandScheduler",
"MedianStoppingRule", "FIFOScheduler", "PopulationBasedTraining"
"ASHAScheduler", "MedianStoppingRule", "FIFOScheduler",
"PopulationBasedTraining"
]
2 changes: 2 additions & 0 deletions python/ray/tune/schedulers/async_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ def debug_str(self):
return "Bracket: " + iters


ASHAScheduler = AsyncHyperBandScheduler

if __name__ == "__main__":
sched = AsyncHyperBandScheduler(
grace_period=1, max_t=10, reduction_factor=2)
Expand Down
Loading