diff --git a/python/ray/tune/__init__.py b/python/ray/tune/__init__.py index 810256e07138..560a67e6b35b 100644 --- a/python/ray/tune/__init__.py +++ b/python/ray/tune/__init__.py @@ -14,5 +14,5 @@ __all__ = [ "Trainable", "TuneError", "grid_search", "register_env", "register_trainable", "run", "run_experiments", "Experiment", "function", - "sample_from", "uniform", "choice", "randint", "randn" + "sample_from", "track", "uniform", "choice", "randint", "randn" ] diff --git a/python/ray/tune/automlboard/backend/collector.py b/python/ray/tune/automlboard/backend/collector.py index 5566f479960f..dd87df1a450d 100644 --- a/python/ray/tune/automlboard/backend/collector.py +++ b/python/ray/tune/automlboard/backend/collector.py @@ -14,7 +14,7 @@ from ray.tune.automlboard.models.models import JobRecord, \ TrialRecord, ResultRecord from ray.tune.result import DEFAULT_RESULTS_DIR, JOB_META_FILE, \ - EXPR_PARARM_FILE, EXPR_RESULT_FILE, EXPR_META_FILE + EXPR_PARAM_FILE, EXPR_RESULT_FILE, EXPR_META_FILE class CollectorService(object): @@ -327,7 +327,7 @@ def _build_trial_meta(cls, expr_dir): if not meta: job_id = expr_dir.split("/")[-2] trial_id = expr_dir[-8:] - params = parse_json(os.path.join(expr_dir, EXPR_PARARM_FILE)) + params = parse_json(os.path.join(expr_dir, EXPR_PARAM_FILE)) meta = { "trial_id": trial_id, "job_id": job_id, @@ -349,7 +349,7 @@ def _build_trial_meta(cls, expr_dir): if meta.get("end_time", None): meta["end_time"] = timestamp2date(meta["end_time"]) - meta["params"] = parse_json(os.path.join(expr_dir, EXPR_PARARM_FILE)) + meta["params"] = parse_json(os.path.join(expr_dir, EXPR_PARAM_FILE)) return meta diff --git a/python/ray/tune/examples/track_example.py b/python/ray/tune/examples/track_example.py new file mode 100644 index 000000000000..1ccec39462d0 --- /dev/null +++ b/python/ray/tune/examples/track_example.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import keras +from keras.datasets import mnist +from keras.models import Sequential +from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D) + +from ray.tune import track +from ray.tune.examples.utils import TuneKerasCallback, get_mnist_data + +parser = argparse.ArgumentParser() +parser.add_argument( + "--smoke-test", action="store_true", help="Finish quickly for testing") +parser.add_argument( + "--lr", + type=float, + default=0.01, + metavar="LR", + help="learning rate (default: 0.01)") +parser.add_argument( + "--momentum", + type=float, + default=0.5, + metavar="M", + help="SGD momentum (default: 0.5)") +parser.add_argument( + "--hidden", type=int, default=64, help="Size of hidden layer.") +args, _ = parser.parse_known_args() + + +def train_mnist(args): + track.init(trial_name="track-example", trial_config=vars(args)) + batch_size = 128 + num_classes = 10 + epochs = 1 if args.smoke_test else 12 + mnist.load() + x_train, y_train, x_test, y_test, input_shape = get_mnist_data() + + model = Sequential() + model.add( + Conv2D( + 32, kernel_size=(3, 3), activation="relu", + input_shape=input_shape)) + model.add(Conv2D(64, (3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.5)) + model.add(Flatten()) + model.add(Dense(args.hidden, activation="relu")) + model.add(Dropout(0.5)) + model.add(Dense(num_classes, activation="softmax")) + + model.compile( + loss="categorical_crossentropy", + optimizer=keras.optimizers.SGD(lr=args.lr, momentum=args.momentum), + metrics=["accuracy"]) + + model.fit( + x_train, + y_train, + batch_size=batch_size, + epochs=epochs, + validation_data=(x_test, y_test), + callbacks=[TuneKerasCallback(track.metric)]) + track.shutdown() + + +if __name__ == "__main__": + train_mnist(args) diff --git a/python/ray/tune/examples/utils.py b/python/ray/tune/examples/utils.py index 3c73bce2bae7..a5ab1dbdb6a1 100644 --- a/python/ray/tune/examples/utils.py +++ b/python/ray/tune/examples/utils.py @@ -15,7 +15,9 @@ def __init__(self, reporter, logs={}): def on_train_end(self, epoch, logs={}): self.reporter( - timesteps_total=self.iteration, done=1, mean_accuracy=logs["acc"]) + timesteps_total=self.iteration, + done=1, + mean_accuracy=logs.get("acc")) def on_batch_end(self, batch, logs={}): self.iteration += 1 diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 551f1702759f..e30e2bdf5cf0 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -5,9 +5,11 @@ import logging import sys import time +import inspect import threading from six.moves import queue +from ray.tune import track from ray.tune import TuneError from ray.tune.trainable import Trainable from ray.tune.result import TIME_THIS_ITER_S, RESULT_DUPLICATE @@ -244,6 +246,17 @@ def _report_thread_runner_error(self, block=False): def wrap_function(train_func): + + use_track = False + try: + func_args = inspect.getargspec(train_func).args + use_track = ("reporter" not in func_args and len(func_args) == 1) + if use_track: + logger.info("tune.track signature detected.") + except Exception: + logger.info( + "Function inspection failed - assuming reporter signature.") + class WrappedFunc(FunctionRunner): def _trainable_func(self, config, reporter): output = train_func(config, reporter) @@ -253,4 +266,12 @@ def _trainable_func(self, config, reporter): reporter(**{RESULT_DUPLICATE: True}) return output - return WrappedFunc + class WrappedTrackFunc(FunctionRunner): + def _trainable_func(self, config, reporter): + track.init(_tune_reporter=reporter) + output = train_func(config) + reporter(**{RESULT_DUPLICATE: True}) + track.shutdown() + return output + + return WrappedTrackFunc if use_track else WrappedFunc diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 9d472cac36fe..4b9d5a914aa1 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -50,6 +50,11 @@ def on_result(self, result): raise NotImplementedError + def update_config(self, config): + """Updates the config for all loggers.""" + + pass + def close(self): """Releases all resources used by this logger.""" @@ -68,17 +73,7 @@ def on_result(self, result): class JsonLogger(Logger): def _init(self): - config_out = os.path.join(self.logdir, "params.json") - with open(config_out, "w") as f: - json.dump( - self.config, - f, - indent=2, - sort_keys=True, - cls=_SafeFallbackEncoder) - config_pkl = os.path.join(self.logdir, "params.pkl") - with open(config_pkl, "wb") as f: - cloudpickle.dump(self.config, f) + self.update_config(self.config) local_file = os.path.join(self.logdir, "result.json") self.local_out = open(local_file, "a") @@ -96,6 +91,15 @@ def flush(self): def close(self): self.local_out.close() + def update_config(self, config): + self.config = config + config_out = os.path.join(self.logdir, "params.json") + with open(config_out, "w") as f: + json.dump(self.config, f, cls=_SafeFallbackEncoder) + config_pkl = os.path.join(self.logdir, "params.pkl") + with open(config_pkl, "wb") as f: + cloudpickle.dump(self.config, f) + def to_tf_values(result, path): values = [] @@ -231,6 +235,10 @@ def on_result(self, result): self._log_syncer.set_worker_ip(result.get(NODE_IP)) self._log_syncer.sync_if_needed() + def update_config(self, config): + for _logger in self._loggers: + _logger.update_config(config) + def close(self): for _logger in self._loggers: _logger.close() diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 2978fe540d18..51a67d5931a7 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -68,7 +68,7 @@ EXPR_META_FILE = "trial_status.json" # File that stores parameters of the trial. -EXPR_PARARM_FILE = "params.json" +EXPR_PARAM_FILE = "params.json" # File that stores the progress of the trial. EXPR_PROGRESS_FILE = "progress.csv" diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py new file mode 100644 index 000000000000..d3b6c38d745a --- /dev/null +++ b/python/ray/tune/tests/test_track.py @@ -0,0 +1,84 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import pandas as pd +import unittest + +import ray +from ray import tune +from ray.tune import track +from ray.tune.result import EXPR_PARAM_FILE, EXPR_RESULT_FILE + + +def _check_json_val(fname, key, val): + with open(fname, "r") as f: + df = pd.read_json(f, typ="frame", lines=True) + return key in df.columns and (df[key].tail(n=1) == val).all() + + +class TrackApiTest(unittest.TestCase): + def tearDown(self): + track.shutdown() + ray.shutdown() + + def testSessionInitShutdown(self): + self.assertTrue(track._session is None) + + # Checks that the singleton _session is created/destroyed + # by track.init() and track.shutdown() + for _ in range(2): + # do it twice to see that we can reopen the session + track.init(trial_name="test_init") + self.assertTrue(track._session is not None) + track.shutdown() + self.assertTrue(track._session is None) + + def testLogCreation(self): + """Checks that track.init() starts logger and creates log files.""" + track.init(trial_name="test_init") + session = track.get_session() + self.assertTrue(session is not None) + + self.assertTrue(os.path.isdir(session.logdir)) + + params_path = os.path.join(session.logdir, EXPR_PARAM_FILE) + result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) + + self.assertTrue(os.path.exists(params_path)) + self.assertTrue(os.path.exists(result_path)) + self.assertTrue(session.logdir == track.trial_dir()) + + def testMetric(self): + track.init(trial_name="test_log") + session = track.get_session() + for i in range(5): + track.log(test=i) + result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) + self.assertTrue(_check_json_val(result_path, "test", i)) + + def testRayOutput(self): + """Checks that local and remote log format are the same.""" + ray.init() + + def testme(config): + for i in range(config["iters"]): + track.log(iteration=i, hi="test") + + trials = tune.run(testme, config={"iters": 5}) + trial_res = trials[0].last_result + self.assertTrue(trial_res["hi"], "test") + self.assertTrue(trial_res["training_iteration"], 5) + + def testLocalMetrics(self): + """Checks that metric state is updated correctly.""" + track.init(trial_name="test_logs") + session = track.get_session() + self.assertEqual(set(session.trial_config.keys()), {"trial_id"}) + + result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) + track.log(test=1) + self.assertTrue(_check_json_val(result_path, "test", 1)) + track.log(iteration=1, test=2) + self.assertTrue(_check_json_val(result_path, "test", 2)) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py new file mode 100644 index 000000000000..a35511e89350 --- /dev/null +++ b/python/ray/tune/track/__init__.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +from ray.tune.track.session import TrackSession + +logger = logging.getLogger(__name__) + +_session = None + + +def get_session(): + global _session + if not _session: + raise ValueError("Session not detected. Try `track.init()`?") + return _session + + +def init(ignore_reinit_error=True, **session_kwargs): + """Initializes the global trial context for this process. + + This creates a TrackSession object and the corresponding hooks for logging. + + Examples: + >>> from ray.tune import track + >>> track.init() + """ + global _session + + if _session: + # TODO(ng): would be nice to stack crawl at creation time to report + # where that initial trial was created, and that creation line + # info is helpful to keep around anyway. + reinit_msg = "A session already exists in the current context." + if ignore_reinit_error: + if not _session.is_tune_session: + logger.warning(reinit_msg) + return + else: + raise ValueError(reinit_msg) + + _session = TrackSession(**session_kwargs) + + +def shutdown(): + """Cleans up the trial and removes it from the global context.""" + + global _session + if _session: + _session.close() + _session = None + + +def log(**kwargs): + """Applies TrackSession.log to the trial in the current context.""" + _session = get_session() + return _session.log(**kwargs) + + +def trial_dir(): + """Returns the directory where trial results are saved. + + This includes json data containing the session's parameters and metrics. + """ + _session = get_session() + return _session.logdir + + +__all__ = ["TrackSession", "session", "log", "trial_dir", "init", "shutdown"] diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py new file mode 100644 index 000000000000..faf850e5fea2 --- /dev/null +++ b/python/ray/tune/track/session.py @@ -0,0 +1,110 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +from datetime import datetime + +from ray.tune.trial import Trial +from ray.tune.result import DEFAULT_RESULTS_DIR, TRAINING_ITERATION +from ray.tune.logger import UnifiedLogger, Logger + + +class _ReporterHook(Logger): + def __init__(self, tune_reporter): + self.tune_reporter = tune_reporter + + def on_result(self, metrics): + return self.tune_reporter(**metrics) + + +class TrackSession(object): + """Manages results for a single session. + + Represents a single Trial in an experiment. + + Attributes: + trial_name (str): Custom trial name. + experiment_dir (str): Directory where results for all trials + are stored. Each session is stored into a unique directory + inside experiment_dir. + upload_dir (str): Directory to sync results to. + trial_config (dict): Parameters that will be logged to disk. + _tune_reporter (StatusReporter): For rerouting when using Tune. + Will not instantiate logging if not None. + """ + + def __init__(self, + trial_name="", + experiment_dir=None, + upload_dir=None, + trial_config=None, + _tune_reporter=None): + self._experiment_dir = None + self._logdir = None + self._upload_dir = None + self.trial_config = None + self._iteration = -1 + self.is_tune_session = bool(_tune_reporter) + self.trial_id = Trial.generate_id() + if trial_name: + self.trial_id = trial_name + "_" + self.trial_id + if self.is_tune_session: + self._logger = _ReporterHook(_tune_reporter) + else: + self._initialize_logging(trial_name, experiment_dir, upload_dir, + trial_config) + + def _initialize_logging(self, + trial_name="", + experiment_dir=None, + upload_dir=None, + trial_config=None): + + # TODO(rliaw): In other parts of the code, this is `local_dir`. + if experiment_dir is None: + experiment_dir = os.path.join(DEFAULT_RESULTS_DIR, "default") + + self._experiment_dir = os.path.expanduser(experiment_dir) + + # TODO(rliaw): Refactor `logdir` to `trial_dir`. + self._logdir = Trial.create_logdir(trial_name, self._experiment_dir) + self._upload_dir = upload_dir + self.trial_config = trial_config or {} + + # misc metadata to save as well + self.trial_config["trial_id"] = self.trial_id + self._logger = UnifiedLogger(self.trial_config, self._logdir, + self._upload_dir) + + def log(self, **metrics): + """Logs all named arguments specified in **metrics. + + This will log trial metrics locally, and they will be synchronized + with the driver periodically through ray. + + Arguments: + metrics: named arguments with corresponding values to log. + """ + + # TODO: Implement a batching mechanism for multiple calls to `log` + # within the same iteration. + self._iteration += 1 + metrics_dict = metrics.copy() + metrics_dict.update({"trial_id": self.trial_id}) + + # TODO: Move Trainable autopopulation to a util function + metrics_dict.setdefault(TRAINING_ITERATION, self._iteration) + self._logger.on_result(metrics_dict) + + def close(self): + self.trial_config["trial_completed"] = True + self.trial_config["end_time"] = datetime.now().isoformat() + # TODO(rliaw): Have Tune support updated configs + self._logger.update_config(self.trial_config) + self._logger.close() + + @property + def logdir(self): + """Trial logdir (subdir of given experiment directory)""" + return self._logdir diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index ad61e8d4b393..272945ba1cf4 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -8,6 +8,7 @@ from datetime import datetime import logging import json +import uuid import time import tempfile import os @@ -27,7 +28,7 @@ from ray.tune.result import (DEFAULT_RESULTS_DIR, DONE, HOSTNAME, PID, TIME_TOTAL_S, TRAINING_ITERATION, TIMESTEPS_TOTAL, EPISODE_REWARD_MEAN, MEAN_LOSS, MEAN_ACCURACY) -from ray.utils import _random_string, binary_to_hex, hex_to_binary +from ray.utils import binary_to_hex, hex_to_binary DEBUG_PRINT_INTERVAL = 5 MAX_LEN_IDENTIFIER = 130 @@ -341,19 +342,22 @@ def _registration_check(cls, trainable_name): @classmethod def generate_id(cls): - return binary_to_hex(_random_string())[:8] + return str(uuid.uuid1().hex)[:8] + + @classmethod + def create_logdir(cls, identifier, local_dir): + if not os.path.exists(local_dir): + os.makedirs(local_dir) + return tempfile.mkdtemp( + prefix="{}_{}".format(identifier[:MAX_LEN_IDENTIFIER], date_str()), + dir=local_dir) def init_logger(self): """Init logger.""" if not self.result_logger: - if not os.path.exists(self.local_dir): - os.makedirs(self.local_dir) if not self.logdir: - self.logdir = tempfile.mkdtemp( - prefix="{}_{}".format( - str(self)[:MAX_LEN_IDENTIFIER], date_str()), - dir=self.local_dir) + self.logdir = Trial.create_logdir(str(self), self.local_dir) elif not os.path.exists(self.logdir): os.makedirs(self.logdir)