From 5d936d4a9ae298c0bb3c2adcaf3009415611b93a Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Wed, 13 Mar 2019 18:20:25 -0700 Subject: [PATCH 01/28] initial track integration --- python/ray/tune/trainable.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index 8b640cbb45fc..e0ac09758510 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -6,6 +6,7 @@ import copy import io +import inspect import logging import os import pickle @@ -450,8 +451,21 @@ def _export_model(self, export_formats, export_dir): def wrap_function(train_func): from ray.tune.function_runner import FunctionRunner + + function_args = inspect.getargspec(train_func).args + use_track = ("reporter" not in function_args and len(function_args) == 1) + class WrappedFunc(FunctionRunner): def _trainable_func(self): return train_func - return WrappedFunc + class WrappedTrackFunc(FunctionRunner): + def _trainable_func(self, config, reporter): + import track + track.init(tune_reporter=reporter, log_dir=os.getcwd()) + output = train_func(config) + reporter(done=True) + track.shutdown() + return output + + return WrappedTrackFunc if use_track else WrappedFunc From f991f21605618f10003cca2852d9894079b41289 Mon Sep 17 00:00:00 2001 From: Noah Golmant Date: Wed, 13 Mar 2019 18:48:09 -0700 Subject: [PATCH 02/28] initial pull from track repo --- python/ray/tune/track/__init__.py | 74 ++++++++++ python/ray/tune/track/autodetect.py | 51 +++++++ python/ray/tune/track/constants.py | 23 +++ python/ray/tune/track/convenience.py | 31 ++++ python/ray/tune/track/error.py | 8 ++ python/ray/tune/track/log.py | 107 ++++++++++++++ python/ray/tune/track/project.py | 116 +++++++++++++++ python/ray/tune/track/sync.py | 99 +++++++++++++ python/ray/tune/track/trial.py | 204 +++++++++++++++++++++++++++ python/ray/tune/track/trials.py | 97 +++++++++++++ 10 files changed, 810 insertions(+) create mode 100644 python/ray/tune/track/__init__.py create mode 100644 python/ray/tune/track/autodetect.py create mode 100644 python/ray/tune/track/constants.py create mode 100644 python/ray/tune/track/convenience.py create mode 100644 python/ray/tune/track/error.py create mode 100644 python/ray/tune/track/log.py create mode 100644 python/ray/tune/track/project.py create mode 100644 python/ray/tune/track/sync.py create mode 100644 python/ray/tune/track/trial.py create mode 100644 python/ray/tune/track/trials.py diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py new file mode 100644 index 000000000000..6baed46a641d --- /dev/null +++ b/python/ray/tune/track/__init__.py @@ -0,0 +1,74 @@ +import pickle + +from .trial import Trial +from .project import Project +from .log import debug +from .convenience import absl_flags + + +_trial = None + + +def init(log_dir=None, + upload_dir=None, + sync_period=None, + trial_prefix="", + param_map=None, + init_logging=True): + """ + Initializes the global trial context for this process. + This creates a Trial object and the corresponding hooks for logging. + """ + global _trial # pylint: disable=global-statement + if _trial: + # TODO: would be nice to stack crawl at creation time to report + # where that initial trial was created, and that creation line + # info is helpful to keep around anyway. + raise ValueError("A trial already exists in the current context") + local_trial = Trial( + log_dir=log_dir, + upload_dir=upload_dir, + sync_period=sync_period, + trial_prefix=trial_prefix, + param_map=param_map, + init_logging=True) + # try: + _trial = local_trial + _trial.start() + + +def shutdown(): + """ + Cleans up the trial and removes it from the global context. + """ + global _trial # pylint: disable=global-statement + if not _trial: + raise ValueError("Tried to stop trial, but no trial exists") + _trial.close() + _trial = None + + +def save(obj, obj_name, iteration=None, save_fn=pickle.dump, **kwargs): + """ Applies Trial.save to the trial in the current context """ + return _trial.save(obj=obj, obj_name=obj_name, iteration=iteration, + save_fn=save_fn, **kwargs) + + +def metric(*, iteration=None, **kwargs): + """Applies Trial.metric to the trial in the current context.""" + return _trial.metric(iteration=iteration, **kwargs) + + +def load(obj_name, iteration=None, load_fn=pickle.load, **kwargs): + """Applies Trial.load to the trial in the current context.""" + return _trial.load(obj_name=obj_name, iteration=iteration, + load_fn=load_fn, **kwargs) + + +def trial_dir(): + """Retrieves the trial directory for the trial in the current context.""" + return _trial.trial_dir() + + +__all__ = ["Trial", "Project", "trial", "absl_flags", "debug", "metric", + "save", "load", "trial_dir"] diff --git a/python/ray/tune/track/autodetect.py b/python/ray/tune/track/autodetect.py new file mode 100644 index 000000000000..0416971a020d --- /dev/null +++ b/python/ray/tune/track/autodetect.py @@ -0,0 +1,51 @@ +""" +Hacky, library and usage specific tricks to infer decent defaults. +""" +import os +import subprocess +import sys +import shlex + + +def git_repo(): + """ + Returns the git repository root if the cwd is in a repo, else None + """ + try: + with open(os.devnull, 'wb') as quiet: + reldir = subprocess.check_output( + ["git", "rev-parse", "--git-dir"], + stdout=quiet) + reldir = reldir.decode("utf-8") + return os.path.basename(os.path.dirname(os.path.abspath(reldir))) + except subprocess.CalledProcessError: + return None + + +def git_hash(): + """returns the current git hash or unknown if not in git repo""" + if git_repo() is None: + return "unknown" + git_hash = subprocess.check_output( + ["git", "rev-parse", "HEAD"]) + # git_hash is a byte string; we want a string. + git_hash = git_hash.decode("utf-8") + # git_hash also comes with an extra \n at the end, which we remove. + git_hash = git_hash.strip() + return git_hash + +def git_pretty(): + """returns a pretty summary of the commit or unkown if not in git repo""" + if git_repo() is None: + return "unknown" + pretty = subprocess.check_output( + ["git", "log", "--pretty=format:%h %s", "-n", "1"]) + pretty = pretty.decode("utf-8") + pretty = pretty.strip() + return pretty + +def invocation(): + """reconstructs the invocation for this python program""" + cmdargs = [sys.executable] + sys.argv[:] + invocation = " ".join(shlex.quote(s) for s in cmdargs) + return invocation diff --git a/python/ray/tune/track/constants.py b/python/ray/tune/track/constants.py new file mode 100644 index 000000000000..81a9733c607d --- /dev/null +++ b/python/ray/tune/track/constants.py @@ -0,0 +1,23 @@ +""" +Both locally and remotely the directory structure is as follows: + +project-directory/ + METADATA_FOLDER/ + trialprefix_uuid_param_map.json + trialprefix_uuid_result.json + ... other trials + trailprefix_uuid/ + ... trial artifacts + ... other trial artifact folders + +Where the param map is a single json containing the trial uuid +and configuration parameters and, the result.json is a json +list file (i.e., not valid json, but valid json on each line), +and the artifacts folder contains the artifacts as supplied by +the user. +""" +import os + +METADATA_FOLDER = "trials" +CONFIG_SUFFIX = "param_map.json" +RESULT_SUFFIX = "result.json" diff --git a/python/ray/tune/track/convenience.py b/python/ray/tune/track/convenience.py new file mode 100644 index 000000000000..cd66f909990d --- /dev/null +++ b/python/ray/tune/track/convenience.py @@ -0,0 +1,31 @@ +""" +Miscellaneous helpers for getting some of the arguments to tracking-related +functions automatically, usually involving parameter extraction in a +sensible default way from commonly used libraries. +""" + +import sys +from absl import flags + +def absl_flags(): + """ + Extracts absl-py flags that the user has specified and outputs their + key-value mapping. + + By default, extracts only those flags in the current __package__ + and mainfile. + + Useful to put into a trial's param_map. + """ + # TODO: need same thing for argparse + flags_dict = flags.FLAGS.flags_by_module_dict() + # only include parameters from modules the user probably cares about + def _relevant_module(module_name): + if __package__ and __package__ in module_name: + return True + if module_name == sys.argv[0]: + return True + return False + return { + flag.name: flag.value for module, flags in flags_dict.items() + for flag in flags if _relevant_module(module)} diff --git a/python/ray/tune/track/error.py b/python/ray/tune/track/error.py new file mode 100644 index 000000000000..75759c49a925 --- /dev/null +++ b/python/ray/tune/track/error.py @@ -0,0 +1,8 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +class TrackError(Exception): + """General error class raised by Track""" + pass diff --git a/python/ray/tune/track/log.py b/python/ray/tune/track/log.py new file mode 100644 index 000000000000..0372d184fce6 --- /dev/null +++ b/python/ray/tune/track/log.py @@ -0,0 +1,107 @@ +""" +File adopted from Michael Whittaker. + +This module firstly offers a convenience function called "debug" which +alleviates a couple of inconveniences in python logging: + +* No need to find a logger before logging (uses the one from this package) +* Slightly friendly string interpolation interface. +""" + +from datetime import datetime +import inspect +import hashlib +import subprocess +import os +import shlex +import json +import sys +import logging + +class TrackLogHandler(logging.FileHandler): + """File-based logging handler for the track package""" + pass + +class StdoutHandler(logging.StreamHandler): + """As described by the name""" + def __init__(self): + super().__init__(sys.stdout) + +def init(track_log_handler): + """ + (Re)initialize track's file handler for track package logger. + + Adds a stdout-printing handler automatically. + """ + + logger = logging.getLogger(__package__) + + # TODO (just document prominently) + # assume only one trial can run at once right now + # multi-concurrent-trial support will require complex filter logic + # based on the currently-running trial (maybe we shouldn't allow multiple + # trials on different python threads, that's dumb) + to_rm = [h for h in logger.handlers if isinstance(h, TrackLogHandler)] + for h in to_rm: + logger.removeHandler(h) + + if not any(isinstance(h, StdoutHandler) for h in logger.handlers): + handler = StdoutHandler() + handler.setFormatter(_FORMATTER) + logger.addHandler(handler) + + track_log_handler.setFormatter(_FORMATTER) + logger.addHandler(track_log_handler) + + logger.propagate = False + logger.setLevel(logging.DEBUG) + +def debug(s, *args): + """debug(s, x1, ..., xn) logs s.format(x1, ..., xn).""" + # Get the path name and line number of the function which called us. + previous_frame = inspect.currentframe().f_back + try: + pathname, lineno, _, _, _ = inspect.getframeinfo(previous_frame) + # if path is in cwd, simplify it + cwd = os.path.abspath(os.getcwd()) + pathname = os.path.abspath(pathname) + if os.path.commonprefix([cwd, pathname]) == cwd: + pathname = os.path.relpath(pathname, cwd) + except Exception: # pylint: disable=broad-except + pathname = '.py' + lineno = 0 + if _FORMATTER: # log could have not been initialized. + _FORMATTER.pathname = pathname + _FORMATTER.lineno = lineno + logger = logging.getLogger(__package__) + logger.debug(s.format(*args)) + +class _StackCrawlingFormatter(logging.Formatter): + """ + If we configure a python logger with the format string + "%(pathname):%(lineno): %(message)", messages logged via `log.debug` will + be prefixed with the path name and line number of the code that called + `log.debug`. Unfortunately, when a `log.debug` call is wrapped in a helper + function (e.g. debug below), the path name and line number is always that + of the helper function, not the function which called the helper function. + + A _StackCrawlingFormatter is a hack to log a different pathname and line + number. Simply set the `pathname` and `lineno` attributes of the formatter + before you call `log.debug`. See `debug` below for an example. + """ + + def __init__(self, format_str): + super().__init__(format_str) + self.pathname = None + self.lineno = None + + def format(self, record): + s = super().format(record) + if self.pathname is not None: + s = s.replace('{pathname}', self.pathname) + if self.lineno is not None: + s = s.replace('{lineno}', str(self.lineno)) + return s + +_FORMAT_STRING = "[%(asctime)-15s {pathname}:{lineno}] %(message)s" +_FORMATTER = _StackCrawlingFormatter(_FORMAT_STRING) diff --git a/python/ray/tune/track/project.py b/python/ray/tune/track/project.py new file mode 100644 index 000000000000..956257f3579f --- /dev/null +++ b/python/ray/tune/track/project.py @@ -0,0 +1,116 @@ +try: # py3 + from shlex import quote +except ImportError: # py2 + from pipes import quote +import os +import subprocess +import json + +import pandas as pd + +from . import constants +from .autodetect import dfl_local_dir +from .sync import S3_PREFIX, GCS_PREFIX, check_remote_util + +class Project(object): + """ + The project class manages all trials that have been run with the given + log_dir and upload_dir. It gives pandas-dataframe access to trial metadata, + metrics, and then path-based access to stored user artifacts for each trial. + + log_dir is created with the same defaults as in track.Trial + """ + + def __init__(self, log_dir=None, upload_dir=None): + if log_dir is None: + log_dir = dfl_local_dir() + self.log_dir = log_dir + if upload_dir: + check_remote_util(upload_dir) + self.upload_dir = upload_dir + self._sync_metadata() + self._ids = self._load_metadata() + + @property + def ids(self): + return self._ids + + def results(self, trial_ids): + """ + Accepts a sequence of trial ids and returns a pandas dataframe + with the schema + + trial_id, iteration?, *metric_schema_union + + where iteration is an optional column that specifies the iteration + when a user logged a metric, if the user supplied one. The iteration + column is added if any metric was logged with an iteration. + Then, every metric name that was ever logged is a column in the + metric_schema_union. + """ + metadata_folder = os.path.join(self.log_dir, constants.METADATA_FOLDER) + dfs = [] + # TODO: various file-creation corner cases like the result file not + # always existing if stuff is not logged and etc should be ironed out + # (would probably be easier if we had a centralized Sync class which + # relied on some formal remote store semantics). + for trial_id in trial_ids: + # TODO constants should just contain the recipes for filename + # construction instead of this multi-file implicit constraint + result_file = os.path.join( + metadata_folder, trial_id + "_" + constants.RESULT_SUFFIX) + assert os.path.isfile(result_file), result_file + dfs.append(pd.read_json(result_file, typ='frame', lines=True)) + df = pd.concat(dfs, axis=0, ignore_index=True, sort=False) + return df + + + def fetch_artifact(self, trial_id, prefix): + """ + Verifies that all children of the artifact prefix path are + available locally. Fetches them if not. + + Returns the local path to the given trial's artifacts at the + specified prefix, which is always just + + {log_dir}/{trial_id}/{prefix} + """ + # TODO: general windows concern: local prefix will be in + # backslashes but remote dirs will be expecting / + # TODO: having s3 logic split between project and sync.py + # worries me + local = os.path.join(self.log_dir, trial_id, prefix) + if self.upload_dir: + remote = '/'.join([self.upload_dir, trial_id, prefix]) + _remote_to_local_sync(remote, local) + return local + + def _sync_metadata(self): + local = os.path.join(self.log_dir, constants.METADATA_FOLDER) + if self.upload_dir: + remote = '/'.join([self.upload_dir, constants.METADATA_FOLDER]) + _remote_to_local_sync(remote, local) + + def _load_metadata(self): + metadata_folder = os.path.join(self.log_dir, constants.METADATA_FOLDER) + rows = [] + for trial_file in os.listdir(metadata_folder): + if not trial_file.endswith(constants.CONFIG_SUFFIX): + continue + trial_file = os.path.join(metadata_folder, trial_file) + rows.append(pd.read_json(trial_file, typ='frame', lines=True)) + return pd.concat(rows, axis=0, ignore_index=True, sort=False) + +def _remote_to_local_sync(remote, local): + # TODO: at some point look up whether sync will clobber newer + # local files and do this more delicately + if remote.startswith(S3_PREFIX): + remote_to_local_sync_cmd = ("aws s3 sync {} {}".format( + quote(remote), quote(local))) + elif remote.startswith(GCS_PREFIX): + remote_to_local_sync_cmd = ("gsutil rsync -r {} {}".format( + quote(remote), quote(local))) + else: + raise ValueError('unhandled remote uri {}'.format(remote)) + print("Running log sync: {}".format(remote_to_local_sync_cmd)) + subprocess.check_call(remote_to_local_sync_cmd, shell=True) diff --git a/python/ray/tune/track/sync.py b/python/ray/tune/track/sync.py new file mode 100644 index 000000000000..b9d3ebc08f28 --- /dev/null +++ b/python/ray/tune/track/sync.py @@ -0,0 +1,99 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import distutils.spawn +import subprocess +import time + +try: # py3 + from shlex import quote +except ImportError: # py2 + from pipes import quote + +from track.error import TrackError + +S3_PREFIX = "s3://" +GCS_PREFIX = "gs://" +ALLOWED_REMOTE_PREFIXES = (S3_PREFIX, GCS_PREFIX) + + +def check_remote_util(remote_dir): + if not any( + remote_dir.startswith(prefix) + for prefix in ALLOWED_REMOTE_PREFIXES): + raise TrackError("Upload uri must start with one of: {}" + "".format(ALLOWED_REMOTE_PREFIXES)) + + if (remote_dir.startswith(S3_PREFIX) + and not distutils.spawn.find_executable("aws")): + raise TrackError( + "Upload uri starting with '{}' requires awscli tool" + " to be installed".format(S3_PREFIX)) + elif (remote_dir.startswith(GCS_PREFIX) + and not distutils.spawn.find_executable("gsutil")): + raise TrackError( + "Upload uri starting with '{}' requires gsutil tool" + " to be installed".format(GCS_PREFIX)) + + +class _LogSyncer(object): + """Log syncer. + + This syncs files from the local node to a remote directory (e.g. S3).""" + + def __init__(self, local_dir, remote_dir=None, sync_period=None): + self.local_dir = local_dir + if remote_dir: + check_remote_util(remote_dir) + self.remote_dir = remote_dir + self.last_sync_time = 0 + self.sync_period = sync_period or 300 + self.sync_process = None + print("Created LogSyncer for {} -> {}".format(local_dir, remote_dir)) + + def sync_if_needed(self): + if time.time() - self.last_sync_time > self.sync_period: + self.sync_now() + + def sync_now(self, force=False): + self.last_sync_time = time.time() + local_to_remote_sync_cmd = None + if self.remote_dir: + if self.remote_dir.startswith(S3_PREFIX): + local_to_remote_sync_cmd = ("aws s3 sync {} {}".format( + quote(self.local_dir), quote(self.remote_dir))) + elif self.remote_dir.startswith(GCS_PREFIX): + local_to_remote_sync_cmd = ("gsutil rsync -r {} {}".format( + quote(self.local_dir), quote(self.remote_dir))) + + if self.sync_process: + self.sync_process.poll() + if self.sync_process.returncode is None: + if force: + self.sync_process.kill() + else: + print("Warning: last sync is still in progress, skipping") + return + + if local_to_remote_sync_cmd: + final_cmd = local_to_remote_sync_cmd + print("Running log sync: {}".format(final_cmd)) + self.sync_process = subprocess.Popen(final_cmd, shell=True) + + def wait(self): + if self.sync_process: + self.sync_process.wait() + + +class SyncHook(object): + def __init__(self, local_dir, remote_dir=None, sync_period=None): + self._logsync = _LogSyncer(local_dir, remote_dir, sync_period) + + def on_result(self, *args, **kwargs): + self._logsync.sync_if_needed() + + def close(self): + self._logsync.sync_now(force=True) + self._logsync.wait() + print("Syncer closed.") diff --git a/python/ray/tune/track/trial.py b/python/ray/tune/track/trial.py new file mode 100644 index 000000000000..574768171124 --- /dev/null +++ b/python/ray/tune/track/trial.py @@ -0,0 +1,204 @@ +import os +import pickle +from ray.tune.logger import UnifiedLogger, Logger +# from track.sync import SyncHook +import uuid +from datetime import datetime +from .autodetect import ( + git_repo, dfl_local_dir, git_hash, invocation, git_pretty) +from .constants import METADATA_FOLDER, RESULT_SUFFIX +from . import log + + +def time_str(): + return datetime.now().strftime("%Y%m%d%H%M%S") + + +def flatten_dict(dt): + dt = dt.copy() + while any(type(v) is dict for v in dt.values()): + remove = [] + add = {} + for key, value in dt.items(): + if type(value) is dict: + for subkey, v in value.items(): + add[":".join([key, subkey])] = v + remove.append(key) + dt.update(add) + for k in remove: + del dt[k] + return dt + + +class _ReporterHook(Logger): + def __init__(self, reporter): + self.reporter = reporter + + def on_result(self, metrics): + return self.reporter(**metrics) + + +class Trial(object): + """ + Trial attempts to infer the local log_dir and remote upload_dir + automatically. + + In order of precedence, log_dir is determined by: + (1) the path passed into the argument of the Trial constructor + (2) autodetect.dfl_local_dir() + + The upload directory may be None (in which case no upload is performed), + or an S3 directory or a GCS directory. + + init_logging will automatically set up a logger at the debug level, + along with handlers to print logs to stdout and to a persistent store. + + Arguments: + log_dir (str): base log directory in which the results for all trials + are stored. if not specified, uses autodetect.dfl_local_dir() + upload_dir (str): + """ + def __init__(self, + log_dir=None, + upload_dir=None, + sync_period=None, + trial_prefix="", + param_map=None, + init_logging=True): + if log_dir is None: + log_dir = dfl_local_dir() + # TODO should probably check if this exists and whether + # we'll be clobbering anything in either the artifact dir + # or the metadata dir, idk what the probability is that a + # uuid truncation will get duplicated. Then also maybe + # the same thing for the remote dir. + + base_dir = os.path.expanduser(log_dir) + self.base_dir = base_dir + self.data_dir = os.path.join(base_dir, METADATA_FOLDER) + self.trial_id = str(uuid.uuid1().hex[:10]) + if trial_prefix: + self.trial_id = "_".join([trial_prefix, self.trial_id]) + + self._sync_period = sync_period + self.artifact_dir = os.path.join(base_dir, self.trial_id) + os.makedirs(self.artifact_dir, exist_ok=True) + self.upload_dir = upload_dir + self.param_map = param_map or {} + + # misc metadata to save as well + self.param_map["trial_id"] = self.trial_id + git_repo_or_none = git_repo() + self.param_map["git_repo"] = git_repo_or_none or "unknown" + self.param_map["git_hash"] = git_hash() + self.param_map["git_pretty"] = git_pretty() + self.param_map["start_time"] = datetime.now().isoformat() + self.param_map["invocation"] = invocation() + self.param_map["max_iteration"] = -1 + self.param_map["trial_completed"] = False + + if init_logging: + log.init(self.logging_handler()) + log.debug("(re)initilized logging") + + def logging_handler(self): + """ + For advanced logging setups, returns a file-based log handler + pointing to a log.txt artifact. + + If you use init_logging = True there is no need to call this + method. + """ + return log.TrackLogHandler( + os.path.join(self.artifact_dir, 'log.txt')) + + def start(self, reporter=None): + for path in [self.base_dir, self.data_dir, self.artifact_dir]: + if not os.path.exists(path): + os.makedirs(path) + + self._hooks = [] + if not reporter: + self._hooks += [UnifiedLogger( + self.param_map, + self.data_dir, + self.upload_dir, + filename_prefix=self.trial_id + "_")] + else: + self._hooks += [_ReporterHook(reporter)] + + def metric(self, *, iteration=None, **kwargs): + """ + Logs all named arguments specified in **kwargs. + This will log trial metrics locally, and they will be synchronized + with the driver periodically through ray. + + Arguments: + iteration (int): current iteration of the trial. + **kwargs: named arguments with corresponding values to log. + """ + new_args = flatten_dict(kwargs) + new_args.update({"iteration": iteration}) + new_args.update({"trial_id": self.trial_id}) + if iteration is not None: + self.param_map["max_iteration"] = max( + self.param_map["max_iteration"], iteration) + for hook in self._hooks: + hook.on_result(new_args) + + def _get_fname(self, result_name, iteration=None): + fname = os.path.join(self.artifact_dir, result_name) + if iteration is None: + iteration = self.param_map["max_iteration"] + base, file_extension = os.path.splittext(fname) + result = base + "_" + str(iteration) + file_extension + return result + + def save(self, result, result_name, save_fn, iteration=None, **kwargs): + """ + Persists a result to disk as an artifact. These results will be + synchronized with the driver periodically through ray. + + Arguments: + result (object): the python object to persist to disk. + result_fname (str): base filename for the object, e.g. "model.ckpt" + save_fn (function): function to save out the object. called as + "save_fn(obj, fname, **kwargs)" + iteration (int): the current iteration of the trial. If not + specified, overrides the previously saved file. + otherwise, creates a new object for each iter. + """ + fname = self._get_fname(result_name, iteration=iteration) + return save_fn(result, fname, **kwargs) + + def load(self, result_name, load_fn, iteration=None, **kwargs): + """ + Loads the persisted object of the given type for the corresponding + iteration. + + Arguments: + result_name (str): base filename for the object as supplied to + Trial.save + load_fn (function): function to load the object from disk. called + as "load_fn(fname, **kwargs)" + iteration (int): iteration of trial to load from. If not specified, + track will load the most recent file. + + """ + fname = self._get_fname(result_name, iteration=iteration) + return load_fn(fname, **kwargs) + + def trial_dir(self): + """returns the local file path to the trial's artifact directory""" + return self.artifact_dir + + def close(self): + self.param_map["trial_completed"] = True + self.param_map["end_time"] = datetime.now().isoformat() + self._logger.update_config(self.param_map) + + for hook in self._hooks: + hook.close() + + def get_result_filename(self): + return os.path.join(self.data_dir, self.trial_id + "_" + RESULT_SUFFIX) diff --git a/python/ray/tune/track/trials.py b/python/ray/tune/track/trials.py new file mode 100644 index 000000000000..8f61c87074a0 --- /dev/null +++ b/python/ray/tune/track/trials.py @@ -0,0 +1,97 @@ +""" +This module can be run as a python script when track is installed like so: + +python -m trials --local_dir ~/track/myproject + +The module prints the trial UUIDs that are available in the specified local +or remote directories. + +By default, this prints all available trials sorted by start time, but just +the trial_ids. We allow rudimentary filtering. For example, suppose all +trials in the default local directory (see track.autodetect.dfl_local_dir) are +summarized by + +$ python -m track.trials +trial_id start_time git_pretty +8424fb387a 2018-06-28 11:17:28.752259 c568050 Switch to track logging (#13) +6187e7bc7a 2018-06-27 18:27:11.635714 c568050 Switch to track logging (#13) +14b0ed447a 2018-06-27 18:25:02.718893 c568050 Switch to track logging (#13) + +The default prints the above columns sorted by start time. We can ask for +parameters on the command line. + +$ python -m track.trials learning_rate +learning_rate +0.1 +0.1 +0.1 + +$ python -m track.trials learning_rate "start_time>2018-06-28" +learning_rate start_time +0.1 2018-06-28 11:17:28.752259 + +In other words, only included columns are printed. +""" + +from absl import flags +from absl import app +import pandas as pd + +from . import Project + +flags.DEFINE_string("local_dir", None, + "the local directory where trials are stored " + "for default behavior see " + "track.autodetect.dfl_local_dir") +flags.DEFINE_string("remote_dir", None, + "the remote directory where trials are stored") + +def compare(c, l, r): + if c == '=': + return l == r + elif c == '<': + return l < r + elif c == '>': + return l > r + else: + raise ValueError('unknown operator ' + c) + +def _drop_first_two_words(sentence): + remain = sentence.partition(' ')[2] + return remain.partition(' ')[2] + +def _parse_pandas(lit): + df = pd.read_json('{{"0": "{}"}}'.format(lit), typ="series") + return df[0] + +def _main(argv): + proj = Project(flags.FLAGS.local_dir, flags.FLAGS.remote_dir) + dt_cols = ["start_time", "end_time"] + formatter = lambda x: x.strftime("%Y-%m-%d %H:%M.%S") + argv = argv[1:] + cols = [] + df = proj.ids + conds = '<>=' + for arg in argv: + if not any(c in arg for c in conds): + cols.append(arg) + continue + for c in conds: + if c not in arg: + continue + col, lit = arg.split(c) + cols.append(col) + lit = _parse_pandas(lit) + df = df[compare(c, df[col], lit)] + df = df.sort_values("start_time", ascending=False) + # just the flags + df["invocation"] = df["invocation"].map(_drop_first_two_words) + if not cols: + cols = ["trial_id", "start_time", "git_pretty"] + print(df[cols].to_string( + index=False, justify='left', + formatters={x: formatter for x in dt_cols})) + + +if __name__ == '__main__': + app.run(_main) From 7431e4bd1e6927bd11a241f778c057abe2377f1f Mon Sep 17 00:00:00 2001 From: Noah Golmant Date: Wed, 13 Mar 2019 21:19:38 -0700 Subject: [PATCH 03/28] cut extraneous sync/log/project code --- python/ray/tune/track/__init__.py | 4 +- python/ray/tune/track/convenience.py | 31 ------- python/ray/tune/track/error.py | 8 -- python/ray/tune/track/project.py | 116 --------------------------- python/ray/tune/track/trial.py | 2 - python/ray/tune/track/trials.py | 97 ---------------------- 6 files changed, 1 insertion(+), 257 deletions(-) delete mode 100644 python/ray/tune/track/convenience.py delete mode 100644 python/ray/tune/track/error.py delete mode 100644 python/ray/tune/track/project.py delete mode 100644 python/ray/tune/track/trials.py diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 6baed46a641d..b734992c4a30 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -1,9 +1,7 @@ import pickle from .trial import Trial -from .project import Project from .log import debug -from .convenience import absl_flags _trial = None @@ -70,5 +68,5 @@ def trial_dir(): return _trial.trial_dir() -__all__ = ["Trial", "Project", "trial", "absl_flags", "debug", "metric", +__all__ = ["Trial", "trial", "debug", "metric", "save", "load", "trial_dir"] diff --git a/python/ray/tune/track/convenience.py b/python/ray/tune/track/convenience.py deleted file mode 100644 index cd66f909990d..000000000000 --- a/python/ray/tune/track/convenience.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Miscellaneous helpers for getting some of the arguments to tracking-related -functions automatically, usually involving parameter extraction in a -sensible default way from commonly used libraries. -""" - -import sys -from absl import flags - -def absl_flags(): - """ - Extracts absl-py flags that the user has specified and outputs their - key-value mapping. - - By default, extracts only those flags in the current __package__ - and mainfile. - - Useful to put into a trial's param_map. - """ - # TODO: need same thing for argparse - flags_dict = flags.FLAGS.flags_by_module_dict() - # only include parameters from modules the user probably cares about - def _relevant_module(module_name): - if __package__ and __package__ in module_name: - return True - if module_name == sys.argv[0]: - return True - return False - return { - flag.name: flag.value for module, flags in flags_dict.items() - for flag in flags if _relevant_module(module)} diff --git a/python/ray/tune/track/error.py b/python/ray/tune/track/error.py deleted file mode 100644 index 75759c49a925..000000000000 --- a/python/ray/tune/track/error.py +++ /dev/null @@ -1,8 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - - -class TrackError(Exception): - """General error class raised by Track""" - pass diff --git a/python/ray/tune/track/project.py b/python/ray/tune/track/project.py deleted file mode 100644 index 956257f3579f..000000000000 --- a/python/ray/tune/track/project.py +++ /dev/null @@ -1,116 +0,0 @@ -try: # py3 - from shlex import quote -except ImportError: # py2 - from pipes import quote -import os -import subprocess -import json - -import pandas as pd - -from . import constants -from .autodetect import dfl_local_dir -from .sync import S3_PREFIX, GCS_PREFIX, check_remote_util - -class Project(object): - """ - The project class manages all trials that have been run with the given - log_dir and upload_dir. It gives pandas-dataframe access to trial metadata, - metrics, and then path-based access to stored user artifacts for each trial. - - log_dir is created with the same defaults as in track.Trial - """ - - def __init__(self, log_dir=None, upload_dir=None): - if log_dir is None: - log_dir = dfl_local_dir() - self.log_dir = log_dir - if upload_dir: - check_remote_util(upload_dir) - self.upload_dir = upload_dir - self._sync_metadata() - self._ids = self._load_metadata() - - @property - def ids(self): - return self._ids - - def results(self, trial_ids): - """ - Accepts a sequence of trial ids and returns a pandas dataframe - with the schema - - trial_id, iteration?, *metric_schema_union - - where iteration is an optional column that specifies the iteration - when a user logged a metric, if the user supplied one. The iteration - column is added if any metric was logged with an iteration. - Then, every metric name that was ever logged is a column in the - metric_schema_union. - """ - metadata_folder = os.path.join(self.log_dir, constants.METADATA_FOLDER) - dfs = [] - # TODO: various file-creation corner cases like the result file not - # always existing if stuff is not logged and etc should be ironed out - # (would probably be easier if we had a centralized Sync class which - # relied on some formal remote store semantics). - for trial_id in trial_ids: - # TODO constants should just contain the recipes for filename - # construction instead of this multi-file implicit constraint - result_file = os.path.join( - metadata_folder, trial_id + "_" + constants.RESULT_SUFFIX) - assert os.path.isfile(result_file), result_file - dfs.append(pd.read_json(result_file, typ='frame', lines=True)) - df = pd.concat(dfs, axis=0, ignore_index=True, sort=False) - return df - - - def fetch_artifact(self, trial_id, prefix): - """ - Verifies that all children of the artifact prefix path are - available locally. Fetches them if not. - - Returns the local path to the given trial's artifacts at the - specified prefix, which is always just - - {log_dir}/{trial_id}/{prefix} - """ - # TODO: general windows concern: local prefix will be in - # backslashes but remote dirs will be expecting / - # TODO: having s3 logic split between project and sync.py - # worries me - local = os.path.join(self.log_dir, trial_id, prefix) - if self.upload_dir: - remote = '/'.join([self.upload_dir, trial_id, prefix]) - _remote_to_local_sync(remote, local) - return local - - def _sync_metadata(self): - local = os.path.join(self.log_dir, constants.METADATA_FOLDER) - if self.upload_dir: - remote = '/'.join([self.upload_dir, constants.METADATA_FOLDER]) - _remote_to_local_sync(remote, local) - - def _load_metadata(self): - metadata_folder = os.path.join(self.log_dir, constants.METADATA_FOLDER) - rows = [] - for trial_file in os.listdir(metadata_folder): - if not trial_file.endswith(constants.CONFIG_SUFFIX): - continue - trial_file = os.path.join(metadata_folder, trial_file) - rows.append(pd.read_json(trial_file, typ='frame', lines=True)) - return pd.concat(rows, axis=0, ignore_index=True, sort=False) - -def _remote_to_local_sync(remote, local): - # TODO: at some point look up whether sync will clobber newer - # local files and do this more delicately - if remote.startswith(S3_PREFIX): - remote_to_local_sync_cmd = ("aws s3 sync {} {}".format( - quote(remote), quote(local))) - elif remote.startswith(GCS_PREFIX): - remote_to_local_sync_cmd = ("gsutil rsync -r {} {}".format( - quote(remote), quote(local))) - else: - raise ValueError('unhandled remote uri {}'.format(remote)) - print("Running log sync: {}".format(remote_to_local_sync_cmd)) - subprocess.check_call(remote_to_local_sync_cmd, shell=True) diff --git a/python/ray/tune/track/trial.py b/python/ray/tune/track/trial.py index 574768171124..08766eeb513a 100644 --- a/python/ray/tune/track/trial.py +++ b/python/ray/tune/track/trial.py @@ -1,7 +1,5 @@ import os -import pickle from ray.tune.logger import UnifiedLogger, Logger -# from track.sync import SyncHook import uuid from datetime import datetime from .autodetect import ( diff --git a/python/ray/tune/track/trials.py b/python/ray/tune/track/trials.py deleted file mode 100644 index 8f61c87074a0..000000000000 --- a/python/ray/tune/track/trials.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -This module can be run as a python script when track is installed like so: - -python -m trials --local_dir ~/track/myproject - -The module prints the trial UUIDs that are available in the specified local -or remote directories. - -By default, this prints all available trials sorted by start time, but just -the trial_ids. We allow rudimentary filtering. For example, suppose all -trials in the default local directory (see track.autodetect.dfl_local_dir) are -summarized by - -$ python -m track.trials -trial_id start_time git_pretty -8424fb387a 2018-06-28 11:17:28.752259 c568050 Switch to track logging (#13) -6187e7bc7a 2018-06-27 18:27:11.635714 c568050 Switch to track logging (#13) -14b0ed447a 2018-06-27 18:25:02.718893 c568050 Switch to track logging (#13) - -The default prints the above columns sorted by start time. We can ask for -parameters on the command line. - -$ python -m track.trials learning_rate -learning_rate -0.1 -0.1 -0.1 - -$ python -m track.trials learning_rate "start_time>2018-06-28" -learning_rate start_time -0.1 2018-06-28 11:17:28.752259 - -In other words, only included columns are printed. -""" - -from absl import flags -from absl import app -import pandas as pd - -from . import Project - -flags.DEFINE_string("local_dir", None, - "the local directory where trials are stored " - "for default behavior see " - "track.autodetect.dfl_local_dir") -flags.DEFINE_string("remote_dir", None, - "the remote directory where trials are stored") - -def compare(c, l, r): - if c == '=': - return l == r - elif c == '<': - return l < r - elif c == '>': - return l > r - else: - raise ValueError('unknown operator ' + c) - -def _drop_first_two_words(sentence): - remain = sentence.partition(' ')[2] - return remain.partition(' ')[2] - -def _parse_pandas(lit): - df = pd.read_json('{{"0": "{}"}}'.format(lit), typ="series") - return df[0] - -def _main(argv): - proj = Project(flags.FLAGS.local_dir, flags.FLAGS.remote_dir) - dt_cols = ["start_time", "end_time"] - formatter = lambda x: x.strftime("%Y-%m-%d %H:%M.%S") - argv = argv[1:] - cols = [] - df = proj.ids - conds = '<>=' - for arg in argv: - if not any(c in arg for c in conds): - cols.append(arg) - continue - for c in conds: - if c not in arg: - continue - col, lit = arg.split(c) - cols.append(col) - lit = _parse_pandas(lit) - df = df[compare(c, df[col], lit)] - df = df.sort_values("start_time", ascending=False) - # just the flags - df["invocation"] = df["invocation"].map(_drop_first_two_words) - if not cols: - cols = ["trial_id", "start_time", "git_pretty"] - print(df[cols].to_string( - index=False, justify='left', - formatters={x: formatter for x in dt_cols})) - - -if __name__ == '__main__': - app.run(_main) From 982600bb04a210e60d77d1292e352b18b756f223 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 22 Mar 2019 01:35:37 -0700 Subject: [PATCH 04/28] small_cleanup --- python/ray/tune/track/__init__.py | 3 +- python/ray/tune/track/log.py | 107 ------------------------------ python/ray/tune/track/sync.py | 99 --------------------------- 3 files changed, 1 insertion(+), 208 deletions(-) delete mode 100644 python/ray/tune/track/log.py delete mode 100644 python/ray/tune/track/sync.py diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index b734992c4a30..05b6ca9c0496 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -1,7 +1,6 @@ import pickle from .trial import Trial -from .log import debug _trial = None @@ -68,5 +67,5 @@ def trial_dir(): return _trial.trial_dir() -__all__ = ["Trial", "trial", "debug", "metric", +__all__ = ["Trial", "trial", "metric", "save", "load", "trial_dir"] diff --git a/python/ray/tune/track/log.py b/python/ray/tune/track/log.py deleted file mode 100644 index 0372d184fce6..000000000000 --- a/python/ray/tune/track/log.py +++ /dev/null @@ -1,107 +0,0 @@ -""" -File adopted from Michael Whittaker. - -This module firstly offers a convenience function called "debug" which -alleviates a couple of inconveniences in python logging: - -* No need to find a logger before logging (uses the one from this package) -* Slightly friendly string interpolation interface. -""" - -from datetime import datetime -import inspect -import hashlib -import subprocess -import os -import shlex -import json -import sys -import logging - -class TrackLogHandler(logging.FileHandler): - """File-based logging handler for the track package""" - pass - -class StdoutHandler(logging.StreamHandler): - """As described by the name""" - def __init__(self): - super().__init__(sys.stdout) - -def init(track_log_handler): - """ - (Re)initialize track's file handler for track package logger. - - Adds a stdout-printing handler automatically. - """ - - logger = logging.getLogger(__package__) - - # TODO (just document prominently) - # assume only one trial can run at once right now - # multi-concurrent-trial support will require complex filter logic - # based on the currently-running trial (maybe we shouldn't allow multiple - # trials on different python threads, that's dumb) - to_rm = [h for h in logger.handlers if isinstance(h, TrackLogHandler)] - for h in to_rm: - logger.removeHandler(h) - - if not any(isinstance(h, StdoutHandler) for h in logger.handlers): - handler = StdoutHandler() - handler.setFormatter(_FORMATTER) - logger.addHandler(handler) - - track_log_handler.setFormatter(_FORMATTER) - logger.addHandler(track_log_handler) - - logger.propagate = False - logger.setLevel(logging.DEBUG) - -def debug(s, *args): - """debug(s, x1, ..., xn) logs s.format(x1, ..., xn).""" - # Get the path name and line number of the function which called us. - previous_frame = inspect.currentframe().f_back - try: - pathname, lineno, _, _, _ = inspect.getframeinfo(previous_frame) - # if path is in cwd, simplify it - cwd = os.path.abspath(os.getcwd()) - pathname = os.path.abspath(pathname) - if os.path.commonprefix([cwd, pathname]) == cwd: - pathname = os.path.relpath(pathname, cwd) - except Exception: # pylint: disable=broad-except - pathname = '.py' - lineno = 0 - if _FORMATTER: # log could have not been initialized. - _FORMATTER.pathname = pathname - _FORMATTER.lineno = lineno - logger = logging.getLogger(__package__) - logger.debug(s.format(*args)) - -class _StackCrawlingFormatter(logging.Formatter): - """ - If we configure a python logger with the format string - "%(pathname):%(lineno): %(message)", messages logged via `log.debug` will - be prefixed with the path name and line number of the code that called - `log.debug`. Unfortunately, when a `log.debug` call is wrapped in a helper - function (e.g. debug below), the path name and line number is always that - of the helper function, not the function which called the helper function. - - A _StackCrawlingFormatter is a hack to log a different pathname and line - number. Simply set the `pathname` and `lineno` attributes of the formatter - before you call `log.debug`. See `debug` below for an example. - """ - - def __init__(self, format_str): - super().__init__(format_str) - self.pathname = None - self.lineno = None - - def format(self, record): - s = super().format(record) - if self.pathname is not None: - s = s.replace('{pathname}', self.pathname) - if self.lineno is not None: - s = s.replace('{lineno}', str(self.lineno)) - return s - -_FORMAT_STRING = "[%(asctime)-15s {pathname}:{lineno}] %(message)s" -_FORMATTER = _StackCrawlingFormatter(_FORMAT_STRING) diff --git a/python/ray/tune/track/sync.py b/python/ray/tune/track/sync.py deleted file mode 100644 index b9d3ebc08f28..000000000000 --- a/python/ray/tune/track/sync.py +++ /dev/null @@ -1,99 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import distutils.spawn -import subprocess -import time - -try: # py3 - from shlex import quote -except ImportError: # py2 - from pipes import quote - -from track.error import TrackError - -S3_PREFIX = "s3://" -GCS_PREFIX = "gs://" -ALLOWED_REMOTE_PREFIXES = (S3_PREFIX, GCS_PREFIX) - - -def check_remote_util(remote_dir): - if not any( - remote_dir.startswith(prefix) - for prefix in ALLOWED_REMOTE_PREFIXES): - raise TrackError("Upload uri must start with one of: {}" - "".format(ALLOWED_REMOTE_PREFIXES)) - - if (remote_dir.startswith(S3_PREFIX) - and not distutils.spawn.find_executable("aws")): - raise TrackError( - "Upload uri starting with '{}' requires awscli tool" - " to be installed".format(S3_PREFIX)) - elif (remote_dir.startswith(GCS_PREFIX) - and not distutils.spawn.find_executable("gsutil")): - raise TrackError( - "Upload uri starting with '{}' requires gsutil tool" - " to be installed".format(GCS_PREFIX)) - - -class _LogSyncer(object): - """Log syncer. - - This syncs files from the local node to a remote directory (e.g. S3).""" - - def __init__(self, local_dir, remote_dir=None, sync_period=None): - self.local_dir = local_dir - if remote_dir: - check_remote_util(remote_dir) - self.remote_dir = remote_dir - self.last_sync_time = 0 - self.sync_period = sync_period or 300 - self.sync_process = None - print("Created LogSyncer for {} -> {}".format(local_dir, remote_dir)) - - def sync_if_needed(self): - if time.time() - self.last_sync_time > self.sync_period: - self.sync_now() - - def sync_now(self, force=False): - self.last_sync_time = time.time() - local_to_remote_sync_cmd = None - if self.remote_dir: - if self.remote_dir.startswith(S3_PREFIX): - local_to_remote_sync_cmd = ("aws s3 sync {} {}".format( - quote(self.local_dir), quote(self.remote_dir))) - elif self.remote_dir.startswith(GCS_PREFIX): - local_to_remote_sync_cmd = ("gsutil rsync -r {} {}".format( - quote(self.local_dir), quote(self.remote_dir))) - - if self.sync_process: - self.sync_process.poll() - if self.sync_process.returncode is None: - if force: - self.sync_process.kill() - else: - print("Warning: last sync is still in progress, skipping") - return - - if local_to_remote_sync_cmd: - final_cmd = local_to_remote_sync_cmd - print("Running log sync: {}".format(final_cmd)) - self.sync_process = subprocess.Popen(final_cmd, shell=True) - - def wait(self): - if self.sync_process: - self.sync_process.wait() - - -class SyncHook(object): - def __init__(self, local_dir, remote_dir=None, sync_period=None): - self._logsync = _LogSyncer(local_dir, remote_dir, sync_period) - - def on_result(self, *args, **kwargs): - self._logsync.sync_if_needed() - - def close(self): - self._logsync.sync_now(force=True) - self._logsync.wait() - print("Syncer closed.") From ee3439870e6b291eda6cef706ad50097452730d6 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 31 Mar 2019 01:22:18 -0700 Subject: [PATCH 05/28] Session --- python/ray/tune/track/__init__.py | 14 ++--- python/ray/tune/track/autodetect.py | 2 + .../ray/tune/track/{trial.py => session.py} | 60 ++++--------------- 3 files changed, 22 insertions(+), 54 deletions(-) rename python/ray/tune/track/{trial.py => session.py} (78%) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 05b6ca9c0496..22e538e55bbe 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -1,6 +1,6 @@ import pickle -from .trial import Trial +from ray.tune.track.session import TrackSession _trial = None @@ -14,7 +14,7 @@ def init(log_dir=None, init_logging=True): """ Initializes the global trial context for this process. - This creates a Trial object and the corresponding hooks for logging. + This creates a TrackSession object and the corresponding hooks for logging. """ global _trial # pylint: disable=global-statement if _trial: @@ -22,7 +22,7 @@ def init(log_dir=None, # where that initial trial was created, and that creation line # info is helpful to keep around anyway. raise ValueError("A trial already exists in the current context") - local_trial = Trial( + local_trial = TrackSession( log_dir=log_dir, upload_dir=upload_dir, sync_period=sync_period, @@ -46,18 +46,18 @@ def shutdown(): def save(obj, obj_name, iteration=None, save_fn=pickle.dump, **kwargs): - """ Applies Trial.save to the trial in the current context """ + """ Applies TrackSession.save to the trial in the current context """ return _trial.save(obj=obj, obj_name=obj_name, iteration=iteration, save_fn=save_fn, **kwargs) def metric(*, iteration=None, **kwargs): - """Applies Trial.metric to the trial in the current context.""" + """Applies TrackSession.metric to the trial in the current context.""" return _trial.metric(iteration=iteration, **kwargs) def load(obj_name, iteration=None, load_fn=pickle.load, **kwargs): - """Applies Trial.load to the trial in the current context.""" + """Applies TrackSession.load to the trial in the current context.""" return _trial.load(obj_name=obj_name, iteration=iteration, load_fn=load_fn, **kwargs) @@ -67,5 +67,5 @@ def trial_dir(): return _trial.trial_dir() -__all__ = ["Trial", "trial", "metric", +__all__ = ["TrackSession", "trial", "metric", "save", "load", "trial_dir"] diff --git a/python/ray/tune/track/autodetect.py b/python/ray/tune/track/autodetect.py index 0416971a020d..d4c9f65dd476 100644 --- a/python/ray/tune/track/autodetect.py +++ b/python/ray/tune/track/autodetect.py @@ -34,6 +34,7 @@ def git_hash(): git_hash = git_hash.strip() return git_hash + def git_pretty(): """returns a pretty summary of the commit or unkown if not in git repo""" if git_repo() is None: @@ -44,6 +45,7 @@ def git_pretty(): pretty = pretty.strip() return pretty + def invocation(): """reconstructs the invocation for this python program""" cmdargs = [sys.executable] + sys.argv[:] diff --git a/python/ray/tune/track/trial.py b/python/ray/tune/track/session.py similarity index 78% rename from python/ray/tune/track/trial.py rename to python/ray/tune/track/session.py index 08766eeb513a..168d438ac71e 100644 --- a/python/ray/tune/track/trial.py +++ b/python/ray/tune/track/session.py @@ -1,31 +1,12 @@ import os -from ray.tune.logger import UnifiedLogger, Logger import uuid from datetime import datetime from .autodetect import ( git_repo, dfl_local_dir, git_hash, invocation, git_pretty) from .constants import METADATA_FOLDER, RESULT_SUFFIX -from . import log - - -def time_str(): - return datetime.now().strftime("%Y%m%d%H%M%S") -def flatten_dict(dt): - dt = dt.copy() - while any(type(v) is dict for v in dt.values()): - remove = [] - add = {} - for key, value in dt.items(): - if type(value) is dict: - for subkey, v in value.items(): - add[":".join([key, subkey])] = v - remove.append(key) - dt.update(add) - for k in remove: - del dt[k] - return dt +from ray.tune.logger import UnifiedLogger, Logger class _ReporterHook(Logger): @@ -36,13 +17,13 @@ def on_result(self, metrics): return self.reporter(**metrics) -class Trial(object): +class TrackSession(object): """ - Trial attempts to infer the local log_dir and remote upload_dir + TrackSession attempts to infer the local log_dir and remote upload_dir automatically. In order of precedence, log_dir is determined by: - (1) the path passed into the argument of the Trial constructor + (1) the path passed into the argument of the TrackSession constructor (2) autodetect.dfl_local_dir() The upload directory may be None (in which case no upload is performed), @@ -65,11 +46,11 @@ def __init__(self, init_logging=True): if log_dir is None: log_dir = dfl_local_dir() - # TODO should probably check if this exists and whether - # we'll be clobbering anything in either the artifact dir - # or the metadata dir, idk what the probability is that a - # uuid truncation will get duplicated. Then also maybe - # the same thing for the remote dir. + # TODO should probably check if this exists and whether + # we'll be clobbering anything in either the artifact dir + # or the metadata dir, idk what the probability is that a + # uuid truncation will get duplicated. Then also maybe + # the same thing for the remote dir. base_dir = os.path.expanduser(log_dir) self.base_dir = base_dir @@ -90,26 +71,11 @@ def __init__(self, self.param_map["git_repo"] = git_repo_or_none or "unknown" self.param_map["git_hash"] = git_hash() self.param_map["git_pretty"] = git_pretty() - self.param_map["start_time"] = datetime.now().isoformat() self.param_map["invocation"] = invocation() + self.param_map["start_time"] = datetime.now().isoformat() self.param_map["max_iteration"] = -1 self.param_map["trial_completed"] = False - if init_logging: - log.init(self.logging_handler()) - log.debug("(re)initilized logging") - - def logging_handler(self): - """ - For advanced logging setups, returns a file-based log handler - pointing to a log.txt artifact. - - If you use init_logging = True there is no need to call this - method. - """ - return log.TrackLogHandler( - os.path.join(self.artifact_dir, 'log.txt')) - def start(self, reporter=None): for path in [self.base_dir, self.data_dir, self.artifact_dir]: if not os.path.exists(path): @@ -125,7 +91,7 @@ def start(self, reporter=None): else: self._hooks += [_ReporterHook(reporter)] - def metric(self, *, iteration=None, **kwargs): + def metric(self, iteration=None, **kwargs): """ Logs all named arguments specified in **kwargs. This will log trial metrics locally, and they will be synchronized @@ -135,7 +101,7 @@ def metric(self, *, iteration=None, **kwargs): iteration (int): current iteration of the trial. **kwargs: named arguments with corresponding values to log. """ - new_args = flatten_dict(kwargs) + new_args = kwargs.copy() new_args.update({"iteration": iteration}) new_args.update({"trial_id": self.trial_id}) if iteration is not None: @@ -176,7 +142,7 @@ def load(self, result_name, load_fn, iteration=None, **kwargs): Arguments: result_name (str): base filename for the object as supplied to - Trial.save + TrackSession.save load_fn (function): function to load the object from disk. called as "load_fn(fname, **kwargs)" iteration (int): iteration of trial to load from. If not specified, From eb6794354a3c75f468bf391e0a537d53cb2f22c6 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 31 Mar 2019 15:19:18 -0700 Subject: [PATCH 06/28] nit --- python/ray/tune/track/autodetect.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/python/ray/tune/track/autodetect.py b/python/ray/tune/track/autodetect.py index d4c9f65dd476..188576b1fc34 100644 --- a/python/ray/tune/track/autodetect.py +++ b/python/ray/tune/track/autodetect.py @@ -1,6 +1,3 @@ -""" -Hacky, library and usage specific tricks to infer decent defaults. -""" import os import subprocess import sys @@ -8,9 +5,7 @@ def git_repo(): - """ - Returns the git repository root if the cwd is in a repo, else None - """ + """Returns Git repository root if the cwd is in a repo else None.""" try: with open(os.devnull, 'wb') as quiet: reldir = subprocess.check_output( @@ -23,9 +18,9 @@ def git_repo(): def git_hash(): - """returns the current git hash or unknown if not in git repo""" + """Returns the current git hash or None if not in git repo""" if git_repo() is None: - return "unknown" + return None git_hash = subprocess.check_output( ["git", "rev-parse", "HEAD"]) # git_hash is a byte string; we want a string. @@ -36,9 +31,9 @@ def git_hash(): def git_pretty(): - """returns a pretty summary of the commit or unkown if not in git repo""" + """Returns a pretty summary of the commit or None if not in git repo""" if git_repo() is None: - return "unknown" + return None pretty = subprocess.check_output( ["git", "log", "--pretty=format:%h %s", "-n", "1"]) pretty = pretty.decode("utf-8") @@ -47,7 +42,7 @@ def git_pretty(): def invocation(): - """reconstructs the invocation for this python program""" + """Returns the invocation for this python program.""" cmdargs = [sys.executable] + sys.argv[:] invocation = " ".join(shlex.quote(s) for s in cmdargs) return invocation From c56f7e9d09fcfb9b33788dcc7b0f28dda8c1b17d Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 1 Apr 2019 19:27:30 -0700 Subject: [PATCH 07/28] nit --- python/ray/tune/track/__init__.py | 16 ++++---- python/ray/tune/track/autodetect.py | 6 +-- python/ray/tune/track/session.py | 58 +++++++++++++++-------------- 3 files changed, 42 insertions(+), 38 deletions(-) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 22e538e55bbe..574c5b817bef 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -2,7 +2,6 @@ from ray.tune.track.session import TrackSession - _trial = None @@ -47,8 +46,12 @@ def shutdown(): def save(obj, obj_name, iteration=None, save_fn=pickle.dump, **kwargs): """ Applies TrackSession.save to the trial in the current context """ - return _trial.save(obj=obj, obj_name=obj_name, iteration=iteration, - save_fn=save_fn, **kwargs) + return _trial.save( + obj=obj, + obj_name=obj_name, + iteration=iteration, + save_fn=save_fn, + **kwargs) def metric(*, iteration=None, **kwargs): @@ -58,8 +61,8 @@ def metric(*, iteration=None, **kwargs): def load(obj_name, iteration=None, load_fn=pickle.load, **kwargs): """Applies TrackSession.load to the trial in the current context.""" - return _trial.load(obj_name=obj_name, iteration=iteration, - load_fn=load_fn, **kwargs) + return _trial.load( + obj_name=obj_name, iteration=iteration, load_fn=load_fn, **kwargs) def trial_dir(): @@ -67,5 +70,4 @@ def trial_dir(): return _trial.trial_dir() -__all__ = ["TrackSession", "trial", "metric", - "save", "load", "trial_dir"] +__all__ = ["TrackSession", "trial", "metric", "save", "load", "trial_dir"] diff --git a/python/ray/tune/track/autodetect.py b/python/ray/tune/track/autodetect.py index 188576b1fc34..5a41f29a8bfe 100644 --- a/python/ray/tune/track/autodetect.py +++ b/python/ray/tune/track/autodetect.py @@ -9,8 +9,7 @@ def git_repo(): try: with open(os.devnull, 'wb') as quiet: reldir = subprocess.check_output( - ["git", "rev-parse", "--git-dir"], - stdout=quiet) + ["git", "rev-parse", "--git-dir"], stdout=quiet) reldir = reldir.decode("utf-8") return os.path.basename(os.path.dirname(os.path.abspath(reldir))) except subprocess.CalledProcessError: @@ -21,8 +20,7 @@ def git_hash(): """Returns the current git hash or None if not in git repo""" if git_repo() is None: return None - git_hash = subprocess.check_output( - ["git", "rev-parse", "HEAD"]) + git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]) # git_hash is a byte string; we want a string. git_hash = git_hash.decode("utf-8") # git_hash also comes with an extra \n at the end, which we remove. diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 168d438ac71e..96ba28374a93 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -1,11 +1,10 @@ import os import uuid from datetime import datetime -from .autodetect import ( - git_repo, dfl_local_dir, git_hash, invocation, git_pretty) +from .autodetect import (git_repo, dfl_local_dir, git_hash, invocation, + git_pretty) from .constants import METADATA_FOLDER, RESULT_SUFFIX - from ray.tune.logger import UnifiedLogger, Logger @@ -17,6 +16,16 @@ def on_result(self, metrics): return self.reporter(**metrics) +class _TrackedState(): + def __init__(self): + git_repo_or_none = git_repo() + self.git_repo = git_repo_or_none or "unknown" + self.git_hash = git_hash() + self.git_pretty = git_pretty() + self.invocation = invocation() + self.start_time = datetime.now().isoformat() + + class TrackSession(object): """ TrackSession attempts to infer the local log_dir and remote upload_dir @@ -37,15 +46,15 @@ class TrackSession(object): are stored. if not specified, uses autodetect.dfl_local_dir() upload_dir (str): """ + def __init__(self, log_dir=None, upload_dir=None, sync_period=None, trial_prefix="", - param_map=None, - init_logging=True): + param_map=None): if log_dir is None: - log_dir = dfl_local_dir() + log_dir = DEFAULT_RESULTS_DIR # TODO should probably check if this exists and whether # we'll be clobbering anything in either the artifact dir # or the metadata dir, idk what the probability is that a @@ -67,12 +76,6 @@ def __init__(self, # misc metadata to save as well self.param_map["trial_id"] = self.trial_id - git_repo_or_none = git_repo() - self.param_map["git_repo"] = git_repo_or_none or "unknown" - self.param_map["git_hash"] = git_hash() - self.param_map["git_pretty"] = git_pretty() - self.param_map["invocation"] = invocation() - self.param_map["start_time"] = datetime.now().isoformat() self.param_map["max_iteration"] = -1 self.param_map["trial_completed"] = False @@ -83,32 +86,33 @@ def start(self, reporter=None): self._hooks = [] if not reporter: - self._hooks += [UnifiedLogger( - self.param_map, - self.data_dir, - self.upload_dir, - filename_prefix=self.trial_id + "_")] + self._hooks += [ + UnifiedLogger( + self.param_map, + self.data_dir, + self.upload_dir, + filename_prefix=self.trial_id + "_") + ] else: self._hooks += [_ReporterHook(reporter)] - def metric(self, iteration=None, **kwargs): + def metric(self, iteration=None, **metrics): """ - Logs all named arguments specified in **kwargs. + Logs all named arguments specified in **metrics. This will log trial metrics locally, and they will be synchronized with the driver periodically through ray. Arguments: iteration (int): current iteration of the trial. - **kwargs: named arguments with corresponding values to log. + **metrics: named arguments with corresponding values to log. """ - new_args = kwargs.copy() - new_args.update({"iteration": iteration}) - new_args.update({"trial_id": self.trial_id}) - if iteration is not None: - self.param_map["max_iteration"] = max( - self.param_map["max_iteration"], iteration) + metrics_dict = metrics.copy() + metrics_dict.update({"trial_id": self.trial_id}) + # if iteration is not None: + # self.param_map["max_iteration"] = max( + # self.param_map["max_iteration"], iteration) for hook in self._hooks: - hook.on_result(new_args) + hook.on_result(metrics_dict) def _get_fname(self, result_name, iteration=None): fname = os.path.join(self.artifact_dir, result_name) From 792f1ded094867d6dec5bcaf120e38fe705cf238 Mon Sep 17 00:00:00 2001 From: Noah Golmant Date: Sat, 4 May 2019 11:06:59 -0700 Subject: [PATCH 08/28] remove git --- python/ray/tune/track/autodetect.py | 46 ----------------------------- python/ray/tune/track/session.py | 7 ----- 2 files changed, 53 deletions(-) delete mode 100644 python/ray/tune/track/autodetect.py diff --git a/python/ray/tune/track/autodetect.py b/python/ray/tune/track/autodetect.py deleted file mode 100644 index 5a41f29a8bfe..000000000000 --- a/python/ray/tune/track/autodetect.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import subprocess -import sys -import shlex - - -def git_repo(): - """Returns Git repository root if the cwd is in a repo else None.""" - try: - with open(os.devnull, 'wb') as quiet: - reldir = subprocess.check_output( - ["git", "rev-parse", "--git-dir"], stdout=quiet) - reldir = reldir.decode("utf-8") - return os.path.basename(os.path.dirname(os.path.abspath(reldir))) - except subprocess.CalledProcessError: - return None - - -def git_hash(): - """Returns the current git hash or None if not in git repo""" - if git_repo() is None: - return None - git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]) - # git_hash is a byte string; we want a string. - git_hash = git_hash.decode("utf-8") - # git_hash also comes with an extra \n at the end, which we remove. - git_hash = git_hash.strip() - return git_hash - - -def git_pretty(): - """Returns a pretty summary of the commit or None if not in git repo""" - if git_repo() is None: - return None - pretty = subprocess.check_output( - ["git", "log", "--pretty=format:%h %s", "-n", "1"]) - pretty = pretty.decode("utf-8") - pretty = pretty.strip() - return pretty - - -def invocation(): - """Returns the invocation for this python program.""" - cmdargs = [sys.executable] + sys.argv[:] - invocation = " ".join(shlex.quote(s) for s in cmdargs) - return invocation diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 96ba28374a93..58e0cb7c3591 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -1,8 +1,6 @@ import os import uuid from datetime import datetime -from .autodetect import (git_repo, dfl_local_dir, git_hash, invocation, - git_pretty) from .constants import METADATA_FOLDER, RESULT_SUFFIX from ray.tune.logger import UnifiedLogger, Logger @@ -18,11 +16,6 @@ def on_result(self, metrics): class _TrackedState(): def __init__(self): - git_repo_or_none = git_repo() - self.git_repo = git_repo_or_none or "unknown" - self.git_hash = git_hash() - self.git_pretty = git_pretty() - self.invocation = invocation() self.start_time = datetime.now().isoformat() From 581a57d64410e8d9d185371d2f0b37acf634c3ce Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 4 May 2019 11:38:00 -0700 Subject: [PATCH 09/28] Integration for functionrunner --- python/ray/tune/function_runner.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 551f1702759f..cac9d5618fd4 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -5,9 +5,12 @@ import logging import sys import time +import inspect import threading from six.moves import queue + +from ray.tune import track from ray.tune import TuneError from ray.tune.trainable import Trainable from ray.tune.result import TIME_THIS_ITER_S, RESULT_DUPLICATE @@ -244,6 +247,10 @@ def _report_thread_runner_error(self, block=False): def wrap_function(train_func): + + function_args = inspect.getargspec(train_func).args + use_track = ("reporter" not in function_args and len(function_args) == 1) + class WrappedFunc(FunctionRunner): def _trainable_func(self, config, reporter): output = train_func(config, reporter) @@ -253,4 +260,13 @@ def _trainable_func(self, config, reporter): reporter(**{RESULT_DUPLICATE: True}) return output - return WrappedFunc + class WrappedTrackFunc(FunctionRunner): + def _trainable_func(self, config, reporter): + # TODO: logdir will need different handling in local_mode + track.init(tune_reporter=reporter, log_dir=os.getcwd()) + output = train_func(config) + reporter(**{RESULT_DUPLICATE: True}) + track.shutdown() + return output + + return WrappedTrackFunc if use_track else WrappedFunc From c7e1579e16a47611f82ff26f741e7e05c6bcdd76 Mon Sep 17 00:00:00 2001 From: Noah Golmant Date: Sat, 4 May 2019 11:41:11 -0700 Subject: [PATCH 10/28] use unifiedlogger for json data; save/load gone --- python/ray/tune/track/__init__.py | 49 ++++++++---------------------- python/ray/tune/track/constants.py | 23 -------------- python/ray/tune/track/session.py | 49 +++--------------------------- 3 files changed, 17 insertions(+), 104 deletions(-) delete mode 100644 python/ray/tune/track/constants.py diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 574c5b817bef..4474a8f06078 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -1,8 +1,6 @@ -import pickle - from ray.tune.track.session import TrackSession -_trial = None +_session = None def init(log_dir=None, @@ -15,13 +13,13 @@ def init(log_dir=None, Initializes the global trial context for this process. This creates a TrackSession object and the corresponding hooks for logging. """ - global _trial # pylint: disable=global-statement - if _trial: + global _session # pylint: disable=global-statement + if _session: # TODO: would be nice to stack crawl at creation time to report # where that initial trial was created, and that creation line # info is helpful to keep around anyway. raise ValueError("A trial already exists in the current context") - local_trial = TrackSession( + local_session = TrackSession( log_dir=log_dir, upload_dir=upload_dir, sync_period=sync_period, @@ -29,45 +27,24 @@ def init(log_dir=None, param_map=param_map, init_logging=True) # try: - _trial = local_trial - _trial.start() + _session = local_session + _session.start() def shutdown(): """ Cleans up the trial and removes it from the global context. """ - global _trial # pylint: disable=global-statement - if not _trial: + global _session # pylint: disable=global-statement + if not _session: raise ValueError("Tried to stop trial, but no trial exists") - _trial.close() - _trial = None - + _session.close() + _session = None -def save(obj, obj_name, iteration=None, save_fn=pickle.dump, **kwargs): - """ Applies TrackSession.save to the trial in the current context """ - return _trial.save( - obj=obj, - obj_name=obj_name, - iteration=iteration, - save_fn=save_fn, - **kwargs) - -def metric(*, iteration=None, **kwargs): +def metric(iteration=None, **kwargs): """Applies TrackSession.metric to the trial in the current context.""" - return _trial.metric(iteration=iteration, **kwargs) - - -def load(obj_name, iteration=None, load_fn=pickle.load, **kwargs): - """Applies TrackSession.load to the trial in the current context.""" - return _trial.load( - obj_name=obj_name, iteration=iteration, load_fn=load_fn, **kwargs) - - -def trial_dir(): - """Retrieves the trial directory for the trial in the current context.""" - return _trial.trial_dir() + return _session.metric(iteration=iteration, **kwargs) -__all__ = ["TrackSession", "trial", "metric", "save", "load", "trial_dir"] +__all__ = ["TrackSession", "trial", "metric", "trial_dir"] diff --git a/python/ray/tune/track/constants.py b/python/ray/tune/track/constants.py deleted file mode 100644 index 81a9733c607d..000000000000 --- a/python/ray/tune/track/constants.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Both locally and remotely the directory structure is as follows: - -project-directory/ - METADATA_FOLDER/ - trialprefix_uuid_param_map.json - trialprefix_uuid_result.json - ... other trials - trailprefix_uuid/ - ... trial artifacts - ... other trial artifact folders - -Where the param map is a single json containing the trial uuid -and configuration parameters and, the result.json is a json -list file (i.e., not valid json, but valid json on each line), -and the artifacts folder contains the artifacts as supplied by -the user. -""" -import os - -METADATA_FOLDER = "trials" -CONFIG_SUFFIX = "param_map.json" -RESULT_SUFFIX = "result.json" diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 58e0cb7c3591..c350155bf1f9 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -1,8 +1,8 @@ import os import uuid from datetime import datetime -from .constants import METADATA_FOLDER, RESULT_SUFFIX +from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.logger import UnifiedLogger, Logger @@ -56,14 +56,13 @@ def __init__(self, base_dir = os.path.expanduser(log_dir) self.base_dir = base_dir - self.data_dir = os.path.join(base_dir, METADATA_FOLDER) + self.artifact_dir = os.path.join(base_dir, self.trial_id) self.trial_id = str(uuid.uuid1().hex[:10]) if trial_prefix: self.trial_id = "_".join([trial_prefix, self.trial_id]) self._sync_period = sync_period - self.artifact_dir = os.path.join(base_dir, self.trial_id) - os.makedirs(self.artifact_dir, exist_ok=True) + self.upload_dir = upload_dir self.param_map = param_map or {} @@ -82,7 +81,7 @@ def start(self, reporter=None): self._hooks += [ UnifiedLogger( self.param_map, - self.data_dir, + self.base_dir, self.upload_dir, filename_prefix=self.trial_id + "_") ] @@ -101,9 +100,6 @@ def metric(self, iteration=None, **metrics): """ metrics_dict = metrics.copy() metrics_dict.update({"trial_id": self.trial_id}) - # if iteration is not None: - # self.param_map["max_iteration"] = max( - # self.param_map["max_iteration"], iteration) for hook in self._hooks: hook.on_result(metrics_dict) @@ -115,40 +111,6 @@ def _get_fname(self, result_name, iteration=None): result = base + "_" + str(iteration) + file_extension return result - def save(self, result, result_name, save_fn, iteration=None, **kwargs): - """ - Persists a result to disk as an artifact. These results will be - synchronized with the driver periodically through ray. - - Arguments: - result (object): the python object to persist to disk. - result_fname (str): base filename for the object, e.g. "model.ckpt" - save_fn (function): function to save out the object. called as - "save_fn(obj, fname, **kwargs)" - iteration (int): the current iteration of the trial. If not - specified, overrides the previously saved file. - otherwise, creates a new object for each iter. - """ - fname = self._get_fname(result_name, iteration=iteration) - return save_fn(result, fname, **kwargs) - - def load(self, result_name, load_fn, iteration=None, **kwargs): - """ - Loads the persisted object of the given type for the corresponding - iteration. - - Arguments: - result_name (str): base filename for the object as supplied to - TrackSession.save - load_fn (function): function to load the object from disk. called - as "load_fn(fname, **kwargs)" - iteration (int): iteration of trial to load from. If not specified, - track will load the most recent file. - - """ - fname = self._get_fname(result_name, iteration=iteration) - return load_fn(fname, **kwargs) - def trial_dir(self): """returns the local file path to the trial's artifact directory""" return self.artifact_dir @@ -160,6 +122,3 @@ def close(self): for hook in self._hooks: hook.close() - - def get_result_filename(self): - return os.path.join(self.data_dir, self.trial_id + "_" + RESULT_SUFFIX) From 6f7ba56a0658a818b0d997d06d2e705ae8f3d450 Mon Sep 17 00:00:00 2001 From: Noah Golmant Date: Sat, 4 May 2019 16:20:13 -0700 Subject: [PATCH 11/28] fix to use tune unified logger; add initial test cases --- python/ray/tune/__init__.py | 2 +- python/ray/tune/logger.py | 33 ++++++++---- python/ray/tune/tests/test_track.py | 83 +++++++++++++++++++++++++++++ python/ray/tune/track/__init__.py | 26 +++++---- python/ray/tune/track/session.py | 36 +++++++------ 5 files changed, 144 insertions(+), 36 deletions(-) create mode 100644 python/ray/tune/tests/test_track.py diff --git a/python/ray/tune/__init__.py b/python/ray/tune/__init__.py index b1d865338eff..f368daeb0aaa 100644 --- a/python/ray/tune/__init__.py +++ b/python/ray/tune/__init__.py @@ -12,5 +12,5 @@ __all__ = [ "Trainable", "TuneError", "grid_search", "register_env", "register_trainable", "run", "run_experiments", "Experiment", "function", - "sample_from" + "sample_from", "track" ] diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 6095cfb4dcbd..2bd2300ffcd8 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -50,6 +50,12 @@ def on_result(self, result): raise NotImplementedError + + def update_config(self, config): + """Updates the config for all loggers.""" + + pass + def close(self): """Releases all resources used by this logger.""" @@ -68,17 +74,7 @@ def on_result(self, result): class JsonLogger(Logger): def _init(self): - config_out = os.path.join(self.logdir, "params.json") - with open(config_out, "w") as f: - json.dump( - self.config, - f, - indent=2, - sort_keys=True, - cls=_SafeFallbackEncoder) - config_pkl = os.path.join(self.logdir, "params.pkl") - with open(config_pkl, "wb") as f: - cloudpickle.dump(self.config, f) + self.update_config(self.config) local_file = os.path.join(self.logdir, "result.json") self.local_out = open(local_file, "a") @@ -96,6 +92,15 @@ def flush(self): def close(self): self.local_out.close() + def update_config(self, config): + self.config = config + config_out = os.path.join(self.logdir, "params.json") + with open(config_out, "w") as f: + json.dump(self.config, f, cls=_SafeFallbackEncoder) + config_pkl = os.path.join(self.logdir, "params.pkl") + with open(config_pkl, "wb") as f: + cloudpickle.dump(self.config, f) + def to_tf_values(result, path): values = [] @@ -227,6 +232,12 @@ def on_result(self, result): self._log_syncer.set_worker_ip(result.get(NODE_IP)) self._log_syncer.sync_if_needed() + def update_config(self, config): + for _logger in self._loggers: + _logger.update_config(config) + self._log_syncer.set_worker_ip(config.get(NODE_IP)) + self._log_syncer.sync_if_needed() + def close(self): for _logger in self._loggers: _logger.close() diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py new file mode 100644 index 000000000000..d8a4d5167414 --- /dev/null +++ b/python/ray/tune/tests/test_track.py @@ -0,0 +1,83 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import pandas as pd +import unittest + +from ray.tune import track +from ray.tune.result import TRAINING_ITERATION + + +class TrackApiTest(unittest.TestCase): + + def testSessionInitShutdown(self): + self.assertTrue(track._session is None) + """Checks that the singleton _session is created/destroyed + + by track.init() and track.shutdown() + """ + for _ in range(2): + # do it twice to see that we can reopen the session + track.init(trial_prefix="test_init") + self.assertTrue(track._session is not None) + track.shutdown() + self.assertTrue(track._session is None) + + def testLogCreation(self): + """Checks that track.init() starts logger and creates log files. + """ + track.init(trial_prefix="test_init") + session = track._session + self.assertTrue(session is not None) + + self.assertTrue(os.path.isdir(session.base_dir)) + self.assertTrue(os.path.isdir(session.artifact_dir)) + + params_fname = os.path.join(session.artifact_dir, "params.json") + result_fname = os.path.join(session.artifact_dir, "result.json") + + self.assertTrue(os.path.exists(params_fname)) + self.assertTrue(os.path.exists(result_fname)) + track.shutdown() + + def testRayOutput(self): + """Checks that local and remote log format are the same. + """ + pass + + def testLocalMetrics(self): + """Checks that metric state is updated correctly. + """ + track.init(trial_prefix="test_metrics") + session = track._session + self.assertEqual(set(session.param_map.keys()), set( + ["trial_id", TRAINING_ITERATION, "trial_completed"])) + + # iteration=None defaults to max_iteration + track.metric(test=1) + self.assertEqual(session.param_map[TRAINING_ITERATION], -1) + + params_fname = os.path.join(session.artifact_dir, "params.json") + result_fname = os.path.join(session.artifact_dir, "result.json") + + # check that dict was correctly dumped to json + def _assert_json_val(fname, key, val): + with open(fname, "r") as f: + df = pd.read_json(f, typ='frame', lines=True) + self.assertTrue(key in df.columns) + self.assertTrue((df[key].tail(n=1) == val).all()) + + # check that params and results are dumped + _assert_json_val(params_fname, TRAINING_ITERATION, -1) + _assert_json_val(result_fname, "test", 1) + + # check that they are updated! + track.metric(iteration=1, test=2) + _assert_json_val(result_fname, "test", 2) + self.assertEqual(session.param_map[TRAINING_ITERATION], 1) + + # params are updated at the end + track.shutdown() + _assert_json_val(params_fname, TRAINING_ITERATION, 1) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 4474a8f06078..15f8424a0458 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -1,5 +1,7 @@ from ray.tune.track.session import TrackSession +__name__ = 'track' + _session = None @@ -7,8 +9,7 @@ def init(log_dir=None, upload_dir=None, sync_period=None, trial_prefix="", - param_map=None, - init_logging=True): + param_map=None): """ Initializes the global trial context for this process. This creates a TrackSession object and the corresponding hooks for logging. @@ -18,26 +19,24 @@ def init(log_dir=None, # TODO: would be nice to stack crawl at creation time to report # where that initial trial was created, and that creation line # info is helpful to keep around anyway. - raise ValueError("A trial already exists in the current context") + raise ValueError("A session already exists in the current context") local_session = TrackSession( log_dir=log_dir, upload_dir=upload_dir, sync_period=sync_period, trial_prefix=trial_prefix, - param_map=param_map, - init_logging=True) + param_map=param_map) # try: _session = local_session _session.start() def shutdown(): - """ - Cleans up the trial and removes it from the global context. + """Cleans up the trial and removes it from the global context. """ global _session # pylint: disable=global-statement if not _session: - raise ValueError("Tried to stop trial, but no trial exists") + raise ValueError("Tried to stop session, but no session exists") _session.close() _session = None @@ -47,4 +46,13 @@ def metric(iteration=None, **kwargs): return _session.metric(iteration=iteration, **kwargs) -__all__ = ["TrackSession", "trial", "metric", "trial_dir"] +def trial_dir(): + """Returns the directory where trial results are saved, including + + json data containing the session's parameters an stored metrics. + """ + return _session.trial_dir() + + +__all__ = ["TrackSession", "session", "metric", "trial_dir", + "init", "shutdown"] diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index c350155bf1f9..4d7d48ba8baa 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -2,7 +2,7 @@ import uuid from datetime import datetime -from ray.tune.result import DEFAULT_RESULTS_DIR +from ray.tune.result import DEFAULT_RESULTS_DIR, TRAINING_ITERATION from ray.tune.logger import UnifiedLogger, Logger @@ -31,9 +31,6 @@ class TrackSession(object): The upload directory may be None (in which case no upload is performed), or an S3 directory or a GCS directory. - init_logging will automatically set up a logger at the debug level, - along with handlers to print logs to stdout and to a persistent store. - Arguments: log_dir (str): base log directory in which the results for all trials are stored. if not specified, uses autodetect.dfl_local_dir() @@ -56,11 +53,13 @@ def __init__(self, base_dir = os.path.expanduser(log_dir) self.base_dir = base_dir - self.artifact_dir = os.path.join(base_dir, self.trial_id) self.trial_id = str(uuid.uuid1().hex[:10]) if trial_prefix: self.trial_id = "_".join([trial_prefix, self.trial_id]) + self.artifact_dir = os.path.join(base_dir, self.trial_id) + os.makedirs(self.artifact_dir, exist_ok=True) + self._sync_period = sync_period self.upload_dir = upload_dir @@ -68,23 +67,21 @@ def __init__(self, # misc metadata to save as well self.param_map["trial_id"] = self.trial_id - self.param_map["max_iteration"] = -1 + self.param_map[TRAINING_ITERATION] = -1 self.param_map["trial_completed"] = False def start(self, reporter=None): - for path in [self.base_dir, self.data_dir, self.artifact_dir]: + for path in [self.base_dir, self.artifact_dir]: if not os.path.exists(path): os.makedirs(path) self._hooks = [] if not reporter: - self._hooks += [ - UnifiedLogger( - self.param_map, - self.base_dir, - self.upload_dir, - filename_prefix=self.trial_id + "_") - ] + self._logger = UnifiedLogger( + self.param_map, + self.artifact_dir, + self.upload_dir) + self._hooks += [self._logger] else: self._hooks += [_ReporterHook(reporter)] @@ -100,13 +97,22 @@ def metric(self, iteration=None, **metrics): """ metrics_dict = metrics.copy() metrics_dict.update({"trial_id": self.trial_id}) + + if iteration is not None: + max_iter = max(iteration, self.param_map[TRAINING_ITERATION]) + else: + max_iter = self.param_map[TRAINING_ITERATION] + + self.param_map[TRAINING_ITERATION] = max_iter + metrics_dict[TRAINING_ITERATION] = max_iter + for hook in self._hooks: hook.on_result(metrics_dict) def _get_fname(self, result_name, iteration=None): fname = os.path.join(self.artifact_dir, result_name) if iteration is None: - iteration = self.param_map["max_iteration"] + iteration = self.param_map[TRAINING_ITERATION] base, file_extension = os.path.splittext(fname) result = base + "_" + str(iteration) + file_extension return result From 6cec7cc981fba087f7c0a4d126220480a2934b8d Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 9 May 2019 17:40:49 -0700 Subject: [PATCH 12/28] formatting --- python/ray/tune/function_runner.py | 3 +-- python/ray/tune/logger.py | 1 - python/ray/tune/tests/test_track.py | 6 +++--- python/ray/tune/track/__init__.py | 22 +++++++++++++--------- python/ray/tune/track/session.py | 10 ++++++---- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index cac9d5618fd4..60c6b33b1d59 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -9,7 +9,6 @@ import threading from six.moves import queue - from ray.tune import track from ray.tune import TuneError from ray.tune.trainable import Trainable @@ -249,7 +248,7 @@ def _report_thread_runner_error(self, block=False): def wrap_function(train_func): function_args = inspect.getargspec(train_func).args - use_track = ("reporter" not in function_args and len(function_args) == 1) + use_track = ("reporter" not in function_args and len(function_args) == 1) class WrappedFunc(FunctionRunner): def _trainable_func(self, config, reporter): diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 2bd2300ffcd8..0e93f789a569 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -50,7 +50,6 @@ def on_result(self, result): raise NotImplementedError - def update_config(self, config): """Updates the config for all loggers.""" diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index d8a4d5167414..2481855601c3 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -11,7 +11,6 @@ class TrackApiTest(unittest.TestCase): - def testSessionInitShutdown(self): self.assertTrue(track._session is None) """Checks that the singleton _session is created/destroyed @@ -52,8 +51,9 @@ def testLocalMetrics(self): """ track.init(trial_prefix="test_metrics") session = track._session - self.assertEqual(set(session.param_map.keys()), set( - ["trial_id", TRAINING_ITERATION, "trial_completed"])) + self.assertEqual( + set(session.param_map.keys()), + set(["trial_id", TRAINING_ITERATION, "trial_completed"])) # iteration=None defaults to max_iteration track.metric(test=1) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 15f8424a0458..9ca699abb4f0 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -1,3 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + from ray.tune.track.session import TrackSession __name__ = 'track' @@ -10,13 +14,13 @@ def init(log_dir=None, sync_period=None, trial_prefix="", param_map=None): - """ - Initializes the global trial context for this process. + """Initializes the global trial context for this process. + This creates a TrackSession object and the corresponding hooks for logging. """ - global _session # pylint: disable=global-statement + global _session if _session: - # TODO: would be nice to stack crawl at creation time to report + # TODO(ng): would be nice to stack crawl at creation time to report # where that initial trial was created, and that creation line # info is helpful to keep around anyway. raise ValueError("A session already exists in the current context") @@ -32,9 +36,8 @@ def init(log_dir=None, def shutdown(): - """Cleans up the trial and removes it from the global context. - """ - global _session # pylint: disable=global-statement + """Cleans up the trial and removes it from the global context.""" + global _session if not _session: raise ValueError("Tried to stop session, but no session exists") _session.close() @@ -54,5 +57,6 @@ def trial_dir(): return _session.trial_dir() -__all__ = ["TrackSession", "session", "metric", "trial_dir", - "init", "shutdown"] +__all__ = [ + "TrackSession", "session", "metric", "trial_dir", "init", "shutdown" +] diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 4d7d48ba8baa..aafdd052d734 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -1,3 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + import os import uuid from datetime import datetime @@ -77,10 +81,8 @@ def start(self, reporter=None): self._hooks = [] if not reporter: - self._logger = UnifiedLogger( - self.param_map, - self.artifact_dir, - self.upload_dir) + self._logger = UnifiedLogger(self.param_map, self.artifact_dir, + self.upload_dir) self._hooks += [self._logger] else: self._hooks += [_ReporterHook(reporter)] From 2cade6dc49ee62d32d1aac936bd0cf9dfdce1a39 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 9 May 2019 17:46:54 -0700 Subject: [PATCH 13/28] Enums --- python/ray/tune/tests/test_track.py | 17 +++++++++-------- python/ray/tune/track/__init__.py | 8 ++++---- python/ray/tune/track/session.py | 26 +++++++++++++------------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index 2481855601c3..09843f7fc00f 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -7,7 +7,8 @@ import unittest from ray.tune import track -from ray.tune.result import TRAINING_ITERATION +from ray.tune.result import ( + TRAINING_ITERATION, EXPR_PARARM_FILE, EXPR_RESULT_FILE) class TrackApiTest(unittest.TestCase): @@ -34,8 +35,8 @@ def testLogCreation(self): self.assertTrue(os.path.isdir(session.base_dir)) self.assertTrue(os.path.isdir(session.artifact_dir)) - params_fname = os.path.join(session.artifact_dir, "params.json") - result_fname = os.path.join(session.artifact_dir, "result.json") + params_fname = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) + result_fname = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) self.assertTrue(os.path.exists(params_fname)) self.assertTrue(os.path.exists(result_fname)) @@ -52,15 +53,15 @@ def testLocalMetrics(self): track.init(trial_prefix="test_metrics") session = track._session self.assertEqual( - set(session.param_map.keys()), + set(session.trial_config.keys()), set(["trial_id", TRAINING_ITERATION, "trial_completed"])) # iteration=None defaults to max_iteration track.metric(test=1) - self.assertEqual(session.param_map[TRAINING_ITERATION], -1) + self.assertEqual(session.trial_config[TRAINING_ITERATION], -1) - params_fname = os.path.join(session.artifact_dir, "params.json") - result_fname = os.path.join(session.artifact_dir, "result.json") + params_fname = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) + result_fname = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) # check that dict was correctly dumped to json def _assert_json_val(fname, key, val): @@ -76,7 +77,7 @@ def _assert_json_val(fname, key, val): # check that they are updated! track.metric(iteration=1, test=2) _assert_json_val(result_fname, "test", 2) - self.assertEqual(session.param_map[TRAINING_ITERATION], 1) + self.assertEqual(session.trial_config[TRAINING_ITERATION], 1) # params are updated at the end track.shutdown() diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 9ca699abb4f0..7d4a600c7797 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -13,7 +13,7 @@ def init(log_dir=None, upload_dir=None, sync_period=None, trial_prefix="", - param_map=None): + trial_config=None): """Initializes the global trial context for this process. This creates a TrackSession object and the corresponding hooks for logging. @@ -29,7 +29,7 @@ def init(log_dir=None, upload_dir=upload_dir, sync_period=sync_period, trial_prefix=trial_prefix, - param_map=param_map) + trial_config=trial_config) # try: _session = local_session _session.start() @@ -50,9 +50,9 @@ def metric(iteration=None, **kwargs): def trial_dir(): - """Returns the directory where trial results are saved, including + """Returns the directory where trial results are saved. - json data containing the session's parameters an stored metrics. + This includes json data containing the session's parameters and metrics. """ return _session.trial_dir() diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index aafdd052d734..c7c9e44bea98 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -46,7 +46,7 @@ def __init__(self, upload_dir=None, sync_period=None, trial_prefix="", - param_map=None): + trial_config=None): if log_dir is None: log_dir = DEFAULT_RESULTS_DIR # TODO should probably check if this exists and whether @@ -67,12 +67,12 @@ def __init__(self, self._sync_period = sync_period self.upload_dir = upload_dir - self.param_map = param_map or {} + self.trial_config = trial_config or {} # misc metadata to save as well - self.param_map["trial_id"] = self.trial_id - self.param_map[TRAINING_ITERATION] = -1 - self.param_map["trial_completed"] = False + self.trial_config["trial_id"] = self.trial_id + self.trial_config[TRAINING_ITERATION] = -1 + self.trial_config["trial_completed"] = False def start(self, reporter=None): for path in [self.base_dir, self.artifact_dir]: @@ -81,7 +81,7 @@ def start(self, reporter=None): self._hooks = [] if not reporter: - self._logger = UnifiedLogger(self.param_map, self.artifact_dir, + self._logger = UnifiedLogger(self.trial_config, self.artifact_dir, self.upload_dir) self._hooks += [self._logger] else: @@ -101,11 +101,11 @@ def metric(self, iteration=None, **metrics): metrics_dict.update({"trial_id": self.trial_id}) if iteration is not None: - max_iter = max(iteration, self.param_map[TRAINING_ITERATION]) + max_iter = max(iteration, self.trial_config[TRAINING_ITERATION]) else: - max_iter = self.param_map[TRAINING_ITERATION] + max_iter = self.trial_config[TRAINING_ITERATION] - self.param_map[TRAINING_ITERATION] = max_iter + self.trial_config[TRAINING_ITERATION] = max_iter metrics_dict[TRAINING_ITERATION] = max_iter for hook in self._hooks: @@ -114,7 +114,7 @@ def metric(self, iteration=None, **metrics): def _get_fname(self, result_name, iteration=None): fname = os.path.join(self.artifact_dir, result_name) if iteration is None: - iteration = self.param_map[TRAINING_ITERATION] + iteration = self.trial_config[TRAINING_ITERATION] base, file_extension = os.path.splittext(fname) result = base + "_" + str(iteration) + file_extension return result @@ -124,9 +124,9 @@ def trial_dir(self): return self.artifact_dir def close(self): - self.param_map["trial_completed"] = True - self.param_map["end_time"] = datetime.now().isoformat() - self._logger.update_config(self.param_map) + self.trial_config["trial_completed"] = True + self.trial_config["end_time"] = datetime.now().isoformat() + self._logger.update_config(self.trial_config) for hook in self._hooks: hook.close() From 6d897b8a0cdffedb1fd738b48dec71d099ee16ab Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 9 May 2019 19:03:20 -0700 Subject: [PATCH 14/28] Reformat tracking --- python/ray/tune/function_runner.py | 1 + python/ray/tune/tests/test_track.py | 33 ++++---- python/ray/tune/track/__init__.py | 46 ++++++----- python/ray/tune/track/session.py | 119 ++++++++++++---------------- python/ray/tune/trial.py | 16 ++-- 5 files changed, 103 insertions(+), 112 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 60c6b33b1d59..507b6f31770e 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -4,6 +4,7 @@ import logging import sys +import os import time import inspect import threading diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index 09843f7fc00f..e867b71262fb 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -7,8 +7,8 @@ import unittest from ray.tune import track -from ray.tune.result import ( - TRAINING_ITERATION, EXPR_PARARM_FILE, EXPR_RESULT_FILE) +from ray.tune.result import (TRAINING_ITERATION, EXPR_PARARM_FILE, + EXPR_RESULT_FILE) class TrackApiTest(unittest.TestCase): @@ -26,8 +26,7 @@ def testSessionInitShutdown(self): self.assertTrue(track._session is None) def testLogCreation(self): - """Checks that track.init() starts logger and creates log files. - """ + """Checks that track.init() starts logger and creates log files.""" track.init(trial_prefix="test_init") session = track._session self.assertTrue(session is not None) @@ -35,21 +34,19 @@ def testLogCreation(self): self.assertTrue(os.path.isdir(session.base_dir)) self.assertTrue(os.path.isdir(session.artifact_dir)) - params_fname = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) - result_fname = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) + params_path = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) + result_path = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) - self.assertTrue(os.path.exists(params_fname)) - self.assertTrue(os.path.exists(result_fname)) + self.assertTrue(os.path.exists(params_path)) + self.assertTrue(os.path.exists(result_path)) track.shutdown() def testRayOutput(self): - """Checks that local and remote log format are the same. - """ + """Checks that local and remote log format are the same.""" pass def testLocalMetrics(self): - """Checks that metric state is updated correctly. - """ + """Checks that metric state is updated correctly.""" track.init(trial_prefix="test_metrics") session = track._session self.assertEqual( @@ -60,8 +57,8 @@ def testLocalMetrics(self): track.metric(test=1) self.assertEqual(session.trial_config[TRAINING_ITERATION], -1) - params_fname = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) - result_fname = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) + params_path = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) + result_path = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) # check that dict was correctly dumped to json def _assert_json_val(fname, key, val): @@ -71,14 +68,14 @@ def _assert_json_val(fname, key, val): self.assertTrue((df[key].tail(n=1) == val).all()) # check that params and results are dumped - _assert_json_val(params_fname, TRAINING_ITERATION, -1) - _assert_json_val(result_fname, "test", 1) + _assert_json_val(params_path, TRAINING_ITERATION, -1) + _assert_json_val(result_path, "test", 1) # check that they are updated! track.metric(iteration=1, test=2) - _assert_json_val(result_fname, "test", 2) + _assert_json_val(result_path, "test", 2) self.assertEqual(session.trial_config[TRAINING_ITERATION], 1) # params are updated at the end track.shutdown() - _assert_json_val(params_fname, TRAINING_ITERATION, 1) + _assert_json_val(params_path, TRAINING_ITERATION, 1) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 7d4a600c7797..275997801ec2 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -2,50 +2,57 @@ from __future__ import division from __future__ import print_function +import logging + from ray.tune.track.session import TrackSession __name__ = 'track' - +logger = logging.getLogger(__name__) _session = None -def init(log_dir=None, - upload_dir=None, - sync_period=None, - trial_prefix="", - trial_config=None): +def _get_session(): + global _session + if not _session: + raise ValueError("Session not detected. Try `track.init()`?") + return _session + + +def init(ignore_reinit_error=False, **session_kwargs): """Initializes the global trial context for this process. This creates a TrackSession object and the corresponding hooks for logging. + + Examples: + >>> from ray.tune import track + >>> track.init() """ global _session + if _session: # TODO(ng): would be nice to stack crawl at creation time to report # where that initial trial was created, and that creation line # info is helpful to keep around anyway. - raise ValueError("A session already exists in the current context") - local_session = TrackSession( - log_dir=log_dir, - upload_dir=upload_dir, - sync_period=sync_period, - trial_prefix=trial_prefix, - trial_config=trial_config) - # try: - _session = local_session - _session.start() + reinit_msg = "A session already exists in the current context." + if ignore_reinit_error: + logger.warning(reinit_msg) + return + else: + raise ValueError(reinit_msg) + + _session = TrackSession(**session_kwargs) def shutdown(): """Cleans up the trial and removes it from the global context.""" - global _session - if not _session: - raise ValueError("Tried to stop session, but no session exists") + _session = _get_session() _session.close() _session = None def metric(iteration=None, **kwargs): """Applies TrackSession.metric to the trial in the current context.""" + _session = _get_session() return _session.metric(iteration=iteration, **kwargs) @@ -54,6 +61,7 @@ def trial_dir(): This includes json data containing the session's parameters and metrics. """ + _session = _get_session() return _session.trial_dir() diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index c7c9e44bea98..2fd6bf709e9b 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -6,7 +6,8 @@ import uuid from datetime import datetime -from ray.tune.result import DEFAULT_RESULTS_DIR, TRAINING_ITERATION +from ray.tune.trial import Trial +from ray.tune.result import DEFAULT_RESULTS_DIR, TRAINING_ITERATION, DONE from ray.tune.logger import UnifiedLogger, Logger @@ -18,78 +19,65 @@ def on_result(self, metrics): return self.reporter(**metrics) -class _TrackedState(): - def __init__(self): - self.start_time = datetime.now().isoformat() - - class TrackSession(object): - """ - TrackSession attempts to infer the local log_dir and remote upload_dir - automatically. - - In order of precedence, log_dir is determined by: - (1) the path passed into the argument of the TrackSession constructor - (2) autodetect.dfl_local_dir() + """Manages results for a single session. - The upload directory may be None (in which case no upload is performed), - or an S3 directory or a GCS directory. - - Arguments: - log_dir (str): base log directory in which the results for all trials - are stored. if not specified, uses autodetect.dfl_local_dir() - upload_dir (str): + Represents a single Trial in an experiment. """ def __init__(self, - log_dir=None, + trial_name="", + reporter=None, + experiment_dir=None, upload_dir=None, - sync_period=None, - trial_prefix="", trial_config=None): - if log_dir is None: - log_dir = DEFAULT_RESULTS_DIR - # TODO should probably check if this exists and whether - # we'll be clobbering anything in either the artifact dir - # or the metadata dir, idk what the probability is that a - # uuid truncation will get duplicated. Then also maybe - # the same thing for the remote dir. - - base_dir = os.path.expanduser(log_dir) - self.base_dir = base_dir - self.trial_id = str(uuid.uuid1().hex[:10]) - if trial_prefix: - self.trial_id = "_".join([trial_prefix, self.trial_id]) + self.experiment_dir = None + self.logdir = None + self.upload_dir = None + self.trial_config = None + self.trial_id = Trial.generate_id() + if trial_name: + self.trial_id = trial_name + "_" + self.trial_id + self.initialize(reporter, experiment_dir, upload_dir, trial_config) + + def initialize(self, + reporter=None, + experiment_dir=None, + upload_dir=None, + trial_config=None): + """ + Arguments: + experiment_dir (str): Directory where results for all trials + are stored. Each session is stored into a unique directory + inside experiment_dir. + upload_dir (str): S3 directory or a GCS directory (or None) + """ + if reporter: + self._logger = _ReporterHook(reporter) + return - self.artifact_dir = os.path.join(base_dir, self.trial_id) - os.makedirs(self.artifact_dir, exist_ok=True) + # TODO(rliaw): In other parts of the code, this is `local_dir`. + if experiment_dir is None: + experiment_dir = os.path.join(DEFAULT_RESULTS_DIR, "default") - self._sync_period = sync_period + self.experiment_dir = os.path.expanduser(experiment_dir) + # TODO(rliaw): Refactor `logdir` to `trial_dir`. + self.logdir = Trial.generate_logdir(trial_name, self.experiment_dir) self.upload_dir = upload_dir self.trial_config = trial_config or {} # misc metadata to save as well self.trial_config["trial_id"] = self.trial_id self.trial_config[TRAINING_ITERATION] = -1 - self.trial_config["trial_completed"] = False - - def start(self, reporter=None): - for path in [self.base_dir, self.artifact_dir]: - if not os.path.exists(path): - os.makedirs(path) - - self._hooks = [] - if not reporter: - self._logger = UnifiedLogger(self.trial_config, self.artifact_dir, - self.upload_dir) - self._hooks += [self._logger] - else: - self._hooks += [_ReporterHook(reporter)] + self.trial_config[DONE] = False + + self._logger = UnifiedLogger(self.trial_config, self.logdir, + self.upload_dir) def metric(self, iteration=None, **metrics): - """ - Logs all named arguments specified in **metrics. + """Logs all named arguments specified in **metrics. + This will log trial metrics locally, and they will be synchronized with the driver periodically through ray. @@ -97,6 +85,9 @@ def metric(self, iteration=None, **metrics): iteration (int): current iteration of the trial. **metrics: named arguments with corresponding values to log. """ + + # TODO: Implement a batching mechanism for multiple calls to `metric` + # within the same iteration. metrics_dict = metrics.copy() metrics_dict.update({"trial_id": self.trial_id}) @@ -108,25 +99,15 @@ def metric(self, iteration=None, **metrics): self.trial_config[TRAINING_ITERATION] = max_iter metrics_dict[TRAINING_ITERATION] = max_iter - for hook in self._hooks: - hook.on_result(metrics_dict) - - def _get_fname(self, result_name, iteration=None): - fname = os.path.join(self.artifact_dir, result_name) - if iteration is None: - iteration = self.trial_config[TRAINING_ITERATION] - base, file_extension = os.path.splittext(fname) - result = base + "_" + str(iteration) + file_extension - return result + self._logger.on_result(metrics_dict) def trial_dir(self): - """returns the local file path to the trial's artifact directory""" - return self.artifact_dir + """Returns the local file path to the trial directory.""" + return self.logdir def close(self): self.trial_config["trial_completed"] = True self.trial_config["end_time"] = datetime.now().isoformat() self._logger.update_config(self.trial_config) - for hook in self._hooks: - hook.close() + self._logger.close() diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index ad61e8d4b393..e8957d875d88 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -8,6 +8,7 @@ from datetime import datetime import logging import json +import uuid import time import tempfile import os @@ -27,7 +28,7 @@ from ray.tune.result import (DEFAULT_RESULTS_DIR, DONE, HOSTNAME, PID, TIME_TOTAL_S, TRAINING_ITERATION, TIMESTEPS_TOTAL, EPISODE_REWARD_MEAN, MEAN_LOSS, MEAN_ACCURACY) -from ray.utils import _random_string, binary_to_hex, hex_to_binary +from ray.utils import binary_to_hex, hex_to_binary DEBUG_PRINT_INTERVAL = 5 MAX_LEN_IDENTIFIER = 130 @@ -341,7 +342,13 @@ def _registration_check(cls, trainable_name): @classmethod def generate_id(cls): - return binary_to_hex(_random_string())[:8] + return str(uuid.uuid1().hex)[:8] + + @classmethod + def create_logdir(cls, identifier, local_dir): + return tempfile.mkdtemp( + prefix="{}_{}".format(identifier[:MAX_LEN_IDENTIFIER], date_str()), + dir=local_dir) def init_logger(self): """Init logger.""" @@ -350,10 +357,7 @@ def init_logger(self): if not os.path.exists(self.local_dir): os.makedirs(self.local_dir) if not self.logdir: - self.logdir = tempfile.mkdtemp( - prefix="{}_{}".format( - str(self)[:MAX_LEN_IDENTIFIER], date_str()), - dir=self.local_dir) + self.logdir = Trial.create_logdir(str(self), self.local_dir) elif not os.path.exists(self.logdir): os.makedirs(self.logdir) From bdd01ffdaad04ce593c1affd6489b97fc46ea1e9 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 9 May 2019 23:33:03 -0700 Subject: [PATCH 15/28] full cleanup --- python/ray/tune/function_runner.py | 1 - python/ray/tune/track/__init__.py | 2 +- python/ray/tune/track/session.py | 58 ++++++++++++++---------------- python/ray/tune/trainable.py | 1 - 4 files changed, 27 insertions(+), 35 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 507b6f31770e..fd0a297ea86e 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -262,7 +262,6 @@ def _trainable_func(self, config, reporter): class WrappedTrackFunc(FunctionRunner): def _trainable_func(self, config, reporter): - # TODO: logdir will need different handling in local_mode track.init(tune_reporter=reporter, log_dir=os.getcwd()) output = train_func(config) reporter(**{RESULT_DUPLICATE: True}) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 275997801ec2..a77eee8ecead 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -6,8 +6,8 @@ from ray.tune.track.session import TrackSession -__name__ = 'track' logger = logging.getLogger(__name__) + _session = None diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 2fd6bf709e9b..1cba811c8d2c 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -12,25 +12,35 @@ class _ReporterHook(Logger): - def __init__(self, reporter): - self.reporter = reporter + def __init__(self, tune_reporter): + self.tune_reporter = tune_reporter def on_result(self, metrics): - return self.reporter(**metrics) + return self.tune_reporter(**metrics) class TrackSession(object): """Manages results for a single session. Represents a single Trial in an experiment. + + Attributes: + trial_name (str): Custom trial name. + experiment_dir (str): Directory where results for all trials + are stored. Each session is stored into a unique directory + inside experiment_dir. + upload_dir (str): Directory to sync results to. + trial_config (dict): Parameters that will be logged to disk. + _tune_reporter (StatusReporter): For rerouting when using Tune. + Will not instantiate logging if not None. """ def __init__(self, trial_name="", - reporter=None, experiment_dir=None, upload_dir=None, - trial_config=None): + trial_config=None, + _tune_reporter=None): self.experiment_dir = None self.logdir = None self.upload_dir = None @@ -38,23 +48,16 @@ def __init__(self, self.trial_id = Trial.generate_id() if trial_name: self.trial_id = trial_name + "_" + self.trial_id - self.initialize(reporter, experiment_dir, upload_dir, trial_config) + if tune_reporter: + self._logger = _ReporterHook(tune_reporter) + else: + self._initialize_logging( + experiment_dir, upload_dir, trial_config) - def initialize(self, - reporter=None, + def _initialize_logging(self, experiment_dir=None, upload_dir=None, trial_config=None): - """ - Arguments: - experiment_dir (str): Directory where results for all trials - are stored. Each session is stored into a unique directory - inside experiment_dir. - upload_dir (str): S3 directory or a GCS directory (or None) - """ - if reporter: - self._logger = _ReporterHook(reporter) - return # TODO(rliaw): In other parts of the code, this is `local_dir`. if experiment_dir is None: @@ -65,13 +68,11 @@ def initialize(self, # TODO(rliaw): Refactor `logdir` to `trial_dir`. self.logdir = Trial.generate_logdir(trial_name, self.experiment_dir) self.upload_dir = upload_dir + self.iteration = -1 self.trial_config = trial_config or {} # misc metadata to save as well self.trial_config["trial_id"] = self.trial_id - self.trial_config[TRAINING_ITERATION] = -1 - self.trial_config[DONE] = False - self._logger = UnifiedLogger(self.trial_config, self.logdir, self.upload_dir) @@ -92,22 +93,15 @@ def metric(self, iteration=None, **metrics): metrics_dict.update({"trial_id": self.trial_id}) if iteration is not None: - max_iter = max(iteration, self.trial_config[TRAINING_ITERATION]) - else: - max_iter = self.trial_config[TRAINING_ITERATION] - - self.trial_config[TRAINING_ITERATION] = max_iter - metrics_dict[TRAINING_ITERATION] = max_iter + self.iteration = max(iteration, self.iteration) + # TODO: Move Trainable autopopulation to a util function + metrics_dict[TRAINING_ITERATION] = self.iteration self._logger.on_result(metrics_dict) - def trial_dir(self): - """Returns the local file path to the trial directory.""" - return self.logdir - def close(self): self.trial_config["trial_completed"] = True self.trial_config["end_time"] = datetime.now().isoformat() + # TODO(rliaw): Have Tune support updated configs self._logger.update_config(self.trial_config) - self._logger.close() diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index ae58d30e2662..c10934896bcc 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -6,7 +6,6 @@ import copy import io -import inspect import logging import os import pickle From 8b678e5c40d2997a20cd9553a87719ee2a4f3aad Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 10 May 2019 00:14:29 -0700 Subject: [PATCH 16/28] lint --- python/ray/tune/tests/test_track.py | 2 +- python/ray/tune/track/session.py | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index e867b71262fb..d053354efa93 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -63,7 +63,7 @@ def testLocalMetrics(self): # check that dict was correctly dumped to json def _assert_json_val(fname, key, val): with open(fname, "r") as f: - df = pd.read_json(f, typ='frame', lines=True) + df = pd.read_json(f, typ="frame", lines=True) self.assertTrue(key in df.columns) self.assertTrue((df[key].tail(n=1) == val).all()) diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 1cba811c8d2c..35acd0ea8621 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -3,11 +3,10 @@ from __future__ import print_function import os -import uuid from datetime import datetime from ray.tune.trial import Trial -from ray.tune.result import DEFAULT_RESULTS_DIR, TRAINING_ITERATION, DONE +from ray.tune.result import DEFAULT_RESULTS_DIR, TRAINING_ITERATION from ray.tune.logger import UnifiedLogger, Logger @@ -48,16 +47,17 @@ def __init__(self, self.trial_id = Trial.generate_id() if trial_name: self.trial_id = trial_name + "_" + self.trial_id - if tune_reporter: - self._logger = _ReporterHook(tune_reporter) + if _tune_reporter: + self._logger = _ReporterHook(_tune_reporter) else: - self._initialize_logging( - experiment_dir, upload_dir, trial_config) + self._initialize_logging(trial_name, experiment_dir, upload_dir, + trial_config) def _initialize_logging(self, - experiment_dir=None, - upload_dir=None, - trial_config=None): + trial_name="", + experiment_dir=None, + upload_dir=None, + trial_config=None): # TODO(rliaw): In other parts of the code, this is `local_dir`. if experiment_dir is None: From 1c1ee5f4fea0ad9311e29b4978944b8340bc915d Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 10 May 2019 00:54:25 -0700 Subject: [PATCH 17/28] Fix up tests --- python/ray/tune/function_runner.py | 2 +- python/ray/tune/tests/test_track.py | 117 +++++++++++++++++----------- python/ray/tune/track/__init__.py | 12 +-- python/ray/tune/track/session.py | 4 +- 4 files changed, 81 insertions(+), 54 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index fd0a297ea86e..52cdb3fde3f7 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -262,7 +262,7 @@ def _trainable_func(self, config, reporter): class WrappedTrackFunc(FunctionRunner): def _trainable_func(self, config, reporter): - track.init(tune_reporter=reporter, log_dir=os.getcwd()) + track.init(_tune_reporter=reporter) output = train_func(config) reporter(**{RESULT_DUPLICATE: True}) track.shutdown() diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index d053354efa93..20fe4bb7a77b 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -6,76 +6,101 @@ import pandas as pd import unittest +import ray +from ray import tune from ray.tune import track from ray.tune.result import (TRAINING_ITERATION, EXPR_PARARM_FILE, EXPR_RESULT_FILE) +def _check_json_val(fname, key, val): + with open(fname, "r") as f: + df = pd.read_json(f, typ="frame", lines=True) + return key in df.columns and (df[key].tail(n=1) == val).all() + + class TrackApiTest(unittest.TestCase): + def tearDown(self): + track.shutdown() + ray.shutdown() + + def testSessionInitShutdown(self): self.assertTrue(track._session is None) - """Checks that the singleton _session is created/destroyed - by track.init() and track.shutdown() - """ + # Checks that the singleton _session is created/destroyed + # by track.init() and track.shutdown() for _ in range(2): # do it twice to see that we can reopen the session - track.init(trial_prefix="test_init") + track.init(trial_name="test_init") self.assertTrue(track._session is not None) track.shutdown() self.assertTrue(track._session is None) def testLogCreation(self): """Checks that track.init() starts logger and creates log files.""" - track.init(trial_prefix="test_init") - session = track._session + track.init(trial_name="test_init") + session = track.get_session() self.assertTrue(session is not None) - self.assertTrue(os.path.isdir(session.base_dir)) - self.assertTrue(os.path.isdir(session.artifact_dir)) + self.assertTrue(os.path.isdir(session.experiment_dir)) + self.assertTrue(os.path.isdir(session.logdir)) - params_path = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) - result_path = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) + params_path = os.path.join(session.logdir, EXPR_PARARM_FILE) + result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) self.assertTrue(os.path.exists(params_path)) self.assertTrue(os.path.exists(result_path)) track.shutdown() + def testMetric(self): + track.init(trial_name="test_metric") + session = track.get_session() + for i in range(5): + track.metric(test=i) + result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) + self.assertTrue(_check_json_val(result_path, "test", i)) + + def testRayOutput(self): """Checks that local and remote log format are the same.""" - pass - - def testLocalMetrics(self): - """Checks that metric state is updated correctly.""" - track.init(trial_prefix="test_metrics") - session = track._session - self.assertEqual( - set(session.trial_config.keys()), - set(["trial_id", TRAINING_ITERATION, "trial_completed"])) - - # iteration=None defaults to max_iteration - track.metric(test=1) - self.assertEqual(session.trial_config[TRAINING_ITERATION], -1) - - params_path = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) - result_path = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) - - # check that dict was correctly dumped to json - def _assert_json_val(fname, key, val): - with open(fname, "r") as f: - df = pd.read_json(f, typ="frame", lines=True) - self.assertTrue(key in df.columns) - self.assertTrue((df[key].tail(n=1) == val).all()) - - # check that params and results are dumped - _assert_json_val(params_path, TRAINING_ITERATION, -1) - _assert_json_val(result_path, "test", 1) - - # check that they are updated! - track.metric(iteration=1, test=2) - _assert_json_val(result_path, "test", 2) - self.assertEqual(session.trial_config[TRAINING_ITERATION], 1) - - # params are updated at the end - track.shutdown() - _assert_json_val(params_path, TRAINING_ITERATION, 1) + ray.init() + def testme(config): + for i in range(config["iters"]): + track.metric(iteration=i, **{"hi": "test"}) + + trials = tune.run(testme, config={"iters": 5}) + trial_res = trials[0].last_result + self.assertTrue(trial_res["hi"], "test") + self.assertTrue(trial_res["training_iteration"], 5) + + + # def testLocalMetrics(self): + # """Checks that metric state is updated correctly.""" + # track.init(trial_name="test_metrics") + # session = track.get_session() + # self.assertEqual( + # set(session.trial_config.keys()), + # set(["trial_id"])) + + # # iteration=None defaults to max_iteration + # track.metric(test=1) + # self.assertEqual(session.trial_config[TRAINING_ITERATION], -1) + + # params_path = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) + # result_path = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) + + # # check that dict was correctly dumped to json + + # # check that params and results are dumped + # self.assertTrue(_check_json_val(params_path, TRAINING_ITERATION, -1)) + # self.assertTrue(_check_json_val(result_path, "test", 1)) + + # # check that they are updated! + # track.metric(iteration=1, test=2) + # self.assertTrue(_check_json_val(result_path, "test", 2)) + # self.assertEqual(session.trial_config[TRAINING_ITERATION], 1) + + # # params are updated at the end + # track.shutdown() + # self.assertTrue(_check_json_val(params_path, TRAINING_ITERATION, 1)) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index a77eee8ecead..abf334f6fa65 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -11,7 +11,7 @@ _session = None -def _get_session(): +def get_session(): global _session if not _session: raise ValueError("Session not detected. Try `track.init()`?") @@ -45,14 +45,16 @@ def init(ignore_reinit_error=False, **session_kwargs): def shutdown(): """Cleans up the trial and removes it from the global context.""" - _session = _get_session() - _session.close() + + global _session + if _session: + _session.close() _session = None def metric(iteration=None, **kwargs): """Applies TrackSession.metric to the trial in the current context.""" - _session = _get_session() + _session = get_session() return _session.metric(iteration=iteration, **kwargs) @@ -61,7 +63,7 @@ def trial_dir(): This includes json data containing the session's parameters and metrics. """ - _session = _get_session() + _session = get_session() return _session.trial_dir() diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 35acd0ea8621..a64451ef5321 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -44,6 +44,7 @@ def __init__(self, self.logdir = None self.upload_dir = None self.trial_config = None + self.iteration = 0 self.trial_id = Trial.generate_id() if trial_name: self.trial_id = trial_name + "_" + self.trial_id @@ -66,9 +67,8 @@ def _initialize_logging(self, self.experiment_dir = os.path.expanduser(experiment_dir) # TODO(rliaw): Refactor `logdir` to `trial_dir`. - self.logdir = Trial.generate_logdir(trial_name, self.experiment_dir) + self.logdir = Trial.create_logdir(trial_name, self.experiment_dir) self.upload_dir = upload_dir - self.iteration = -1 self.trial_config = trial_config or {} # misc metadata to save as well From e737a33a55be9497de4b2a7d598f97015f076de8 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 10 May 2019 00:55:51 -0700 Subject: [PATCH 18/28] some formatting --- python/ray/tune/function_runner.py | 1 - python/ray/tune/tests/test_track.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 52cdb3fde3f7..46a45daaec3d 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -4,7 +4,6 @@ import logging import sys -import os import time import inspect import threading diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index 20fe4bb7a77b..a7f21e5062d6 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -24,7 +24,6 @@ def tearDown(self): track.shutdown() ray.shutdown() - def testSessionInitShutdown(self): self.assertTrue(track._session is None) @@ -61,10 +60,10 @@ def testMetric(self): result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) self.assertTrue(_check_json_val(result_path, "test", i)) - def testRayOutput(self): """Checks that local and remote log format are the same.""" ray.init() + def testme(config): for i in range(config["iters"]): track.metric(iteration=i, **{"hi": "test"}) @@ -74,7 +73,6 @@ def testme(config): self.assertTrue(trial_res["hi"], "test") self.assertTrue(trial_res["training_iteration"], 5) - # def testLocalMetrics(self): # """Checks that metric state is updated correctly.""" # track.init(trial_name="test_metrics") From 28d0283f15884154c55fbc88b75a1ff4c962de9f Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 11 May 2019 01:24:57 -0700 Subject: [PATCH 19/28] Param, fix up metric test --- .../ray/tune/automlboard/backend/collector.py | 6 +-- python/ray/tune/result.py | 2 +- python/ray/tune/tests/test_track.py | 43 ++++++------------- 3 files changed, 16 insertions(+), 35 deletions(-) diff --git a/python/ray/tune/automlboard/backend/collector.py b/python/ray/tune/automlboard/backend/collector.py index 5566f479960f..dd87df1a450d 100644 --- a/python/ray/tune/automlboard/backend/collector.py +++ b/python/ray/tune/automlboard/backend/collector.py @@ -14,7 +14,7 @@ from ray.tune.automlboard.models.models import JobRecord, \ TrialRecord, ResultRecord from ray.tune.result import DEFAULT_RESULTS_DIR, JOB_META_FILE, \ - EXPR_PARARM_FILE, EXPR_RESULT_FILE, EXPR_META_FILE + EXPR_PARAM_FILE, EXPR_RESULT_FILE, EXPR_META_FILE class CollectorService(object): @@ -327,7 +327,7 @@ def _build_trial_meta(cls, expr_dir): if not meta: job_id = expr_dir.split("/")[-2] trial_id = expr_dir[-8:] - params = parse_json(os.path.join(expr_dir, EXPR_PARARM_FILE)) + params = parse_json(os.path.join(expr_dir, EXPR_PARAM_FILE)) meta = { "trial_id": trial_id, "job_id": job_id, @@ -349,7 +349,7 @@ def _build_trial_meta(cls, expr_dir): if meta.get("end_time", None): meta["end_time"] = timestamp2date(meta["end_time"]) - meta["params"] = parse_json(os.path.join(expr_dir, EXPR_PARARM_FILE)) + meta["params"] = parse_json(os.path.join(expr_dir, EXPR_PARAM_FILE)) return meta diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 2978fe540d18..51a67d5931a7 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -68,7 +68,7 @@ EXPR_META_FILE = "trial_status.json" # File that stores parameters of the trial. -EXPR_PARARM_FILE = "params.json" +EXPR_PARAM_FILE = "params.json" # File that stores the progress of the trial. EXPR_PROGRESS_FILE = "progress.csv" diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index a7f21e5062d6..176c44e3b69a 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -9,8 +9,7 @@ import ray from ray import tune from ray.tune import track -from ray.tune.result import (TRAINING_ITERATION, EXPR_PARARM_FILE, - EXPR_RESULT_FILE) +from ray.tune.result import EXPR_PARAM_FILE, EXPR_RESULT_FILE def _check_json_val(fname, key, val): @@ -45,7 +44,7 @@ def testLogCreation(self): self.assertTrue(os.path.isdir(session.experiment_dir)) self.assertTrue(os.path.isdir(session.logdir)) - params_path = os.path.join(session.logdir, EXPR_PARARM_FILE) + params_path = os.path.join(session.logdir, EXPR_PARAM_FILE) result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) self.assertTrue(os.path.exists(params_path)) @@ -73,32 +72,14 @@ def testme(config): self.assertTrue(trial_res["hi"], "test") self.assertTrue(trial_res["training_iteration"], 5) - # def testLocalMetrics(self): - # """Checks that metric state is updated correctly.""" - # track.init(trial_name="test_metrics") - # session = track.get_session() - # self.assertEqual( - # set(session.trial_config.keys()), - # set(["trial_id"])) - - # # iteration=None defaults to max_iteration - # track.metric(test=1) - # self.assertEqual(session.trial_config[TRAINING_ITERATION], -1) - - # params_path = os.path.join(session.artifact_dir, EXPR_PARARM_FILE) - # result_path = os.path.join(session.artifact_dir, EXPR_RESULT_FILE) - - # # check that dict was correctly dumped to json - - # # check that params and results are dumped - # self.assertTrue(_check_json_val(params_path, TRAINING_ITERATION, -1)) - # self.assertTrue(_check_json_val(result_path, "test", 1)) - - # # check that they are updated! - # track.metric(iteration=1, test=2) - # self.assertTrue(_check_json_val(result_path, "test", 2)) - # self.assertEqual(session.trial_config[TRAINING_ITERATION], 1) + def testLocalMetrics(self): + """Checks that metric state is updated correctly.""" + track.init(trial_name="test_metrics") + session = track.get_session() + self.assertEqual(set(session.trial_config.keys()), set(["trial_id"])) - # # params are updated at the end - # track.shutdown() - # self.assertTrue(_check_json_val(params_path, TRAINING_ITERATION, 1)) + result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) + track.metric(test=1) + self.assertTrue(_check_json_val(result_path, "test", 1)) + track.metric(iteration=1, test=2) + self.assertTrue(_check_json_val(result_path, "test", 2)) From 8037f58327c9d333a9bf37fd448761479716efc3 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 11 May 2019 12:09:06 -0700 Subject: [PATCH 20/28] fix up for example --- python/ray/tune/examples/track_example.py | 79 +++++++++++++++++++++++ python/ray/tune/examples/utils.py | 4 +- python/ray/tune/track/__init__.py | 5 +- python/ray/tune/track/session.py | 3 +- 4 files changed, 87 insertions(+), 4 deletions(-) create mode 100644 python/ray/tune/examples/track_example.py diff --git a/python/ray/tune/examples/track_example.py b/python/ray/tune/examples/track_example.py new file mode 100644 index 000000000000..281517622273 --- /dev/null +++ b/python/ray/tune/examples/track_example.py @@ -0,0 +1,79 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import numpy as np +import keras +from keras.datasets import mnist +from keras.models import Sequential +from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D) + +import ray +from ray.tune import track +from ray.tune.examples.utils import TuneKerasCallback, get_mnist_data + + +parser = argparse.ArgumentParser() +parser.add_argument( + "--smoke-test", action="store_true", help="Finish quickly for testing") +parser.add_argument( + "--lr", + type=float, + default=0.01, + metavar="LR", + help="learning rate (default: 0.01)") +parser.add_argument( + "--momentum", + type=float, + default=0.5, + metavar="M", + help="SGD momentum (default: 0.5)") +parser.add_argument( + "--hidden", + type=int, + default=64, + help="Size of hidden layer.") +args, _ = parser.parse_known_args() + + +def train_mnist(args): + track.init(trial_name="track-example", trial_config=vars(args)) + batch_size = 128 + num_classes = 10 + epochs = 1 if args.smoke_test else 12 + + x_train, y_train, x_test, y_test, input_shape = get_mnist_data() + + model = Sequential() + model.add( + Conv2D( + 32, kernel_size=(3, 3), activation="relu", + input_shape=input_shape)) + model.add(Conv2D(64, (3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.5)) + model.add(Flatten()) + model.add(Dense(args.hidden, activation="relu")) + model.add(Dropout(0.5)) + model.add(Dense(num_classes, activation="softmax")) + + model.compile( + loss="categorical_crossentropy", + optimizer=keras.optimizers.SGD( + lr=args.lr, momentum=args.momentum), + metrics=["accuracy"]) + + model.fit( + x_test, + y_test, + batch_size=batch_size, + epochs=epochs, + # verbose=0, + validation_data=(x_test, y_test), + callbacks=[TuneKerasCallback(track.metric)]) + track.shutdown() + + +if __name__ == '__main__': + train_mnist(args) diff --git a/python/ray/tune/examples/utils.py b/python/ray/tune/examples/utils.py index 3c73bce2bae7..a5ab1dbdb6a1 100644 --- a/python/ray/tune/examples/utils.py +++ b/python/ray/tune/examples/utils.py @@ -15,7 +15,9 @@ def __init__(self, reporter, logs={}): def on_train_end(self, epoch, logs={}): self.reporter( - timesteps_total=self.iteration, done=1, mean_accuracy=logs["acc"]) + timesteps_total=self.iteration, + done=1, + mean_accuracy=logs.get("acc")) def on_batch_end(self, batch, logs={}): self.iteration += 1 diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index abf334f6fa65..4685314c5763 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -18,7 +18,7 @@ def get_session(): return _session -def init(ignore_reinit_error=False, **session_kwargs): +def init(ignore_reinit_error=True, **session_kwargs): """Initializes the global trial context for this process. This creates a TrackSession object and the corresponding hooks for logging. @@ -35,7 +35,8 @@ def init(ignore_reinit_error=False, **session_kwargs): # info is helpful to keep around anyway. reinit_msg = "A session already exists in the current context." if ignore_reinit_error: - logger.warning(reinit_msg) + if not _session.is_tune_session: + logger.warning(reinit_msg) return else: raise ValueError(reinit_msg) diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index a64451ef5321..42eeaf5a40e3 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -45,10 +45,11 @@ def __init__(self, self.upload_dir = None self.trial_config = None self.iteration = 0 + self.is_tune_session = bool(_tune_reporter) self.trial_id = Trial.generate_id() if trial_name: self.trial_id = trial_name + "_" + self.trial_id - if _tune_reporter: + if self.is_tune_session: self._logger = _ReporterHook(_tune_reporter) else: self._initialize_logging(trial_name, experiment_dir, upload_dir, From f52c3f3a3dfeb18be383f92ec5995f79d5d3cad6 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 11 May 2019 12:15:14 -0700 Subject: [PATCH 21/28] Fix up example and test --- python/ray/tune/examples/track_example.py | 20 ++++++-------------- python/ray/tune/logger.py | 2 -- python/ray/tune/tests/test_track.py | 2 +- python/ray/tune/track/__init__.py | 2 +- python/ray/tune/track/session.py | 4 ++++ 5 files changed, 12 insertions(+), 18 deletions(-) diff --git a/python/ray/tune/examples/track_example.py b/python/ray/tune/examples/track_example.py index 281517622273..1ccec39462d0 100644 --- a/python/ray/tune/examples/track_example.py +++ b/python/ray/tune/examples/track_example.py @@ -3,17 +3,14 @@ from __future__ import print_function import argparse -import numpy as np import keras from keras.datasets import mnist from keras.models import Sequential from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D) -import ray from ray.tune import track from ray.tune.examples.utils import TuneKerasCallback, get_mnist_data - parser = argparse.ArgumentParser() parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") @@ -30,10 +27,7 @@ metavar="M", help="SGD momentum (default: 0.5)") parser.add_argument( - "--hidden", - type=int, - default=64, - help="Size of hidden layer.") + "--hidden", type=int, default=64, help="Size of hidden layer.") args, _ = parser.parse_known_args() @@ -42,7 +36,7 @@ def train_mnist(args): batch_size = 128 num_classes = 10 epochs = 1 if args.smoke_test else 12 - + mnist.load() x_train, y_train, x_test, y_test, input_shape = get_mnist_data() model = Sequential() @@ -60,20 +54,18 @@ def train_mnist(args): model.compile( loss="categorical_crossentropy", - optimizer=keras.optimizers.SGD( - lr=args.lr, momentum=args.momentum), + optimizer=keras.optimizers.SGD(lr=args.lr, momentum=args.momentum), metrics=["accuracy"]) model.fit( - x_test, - y_test, + x_train, + y_train, batch_size=batch_size, epochs=epochs, - # verbose=0, validation_data=(x_test, y_test), callbacks=[TuneKerasCallback(track.metric)]) track.shutdown() -if __name__ == '__main__': +if __name__ == "__main__": train_mnist(args) diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index a117b453b89d..4b9d5a914aa1 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -238,8 +238,6 @@ def on_result(self, result): def update_config(self, config): for _logger in self._loggers: _logger.update_config(config) - self._log_syncer.set_worker_ip(config.get(NODE_IP)) - self._log_syncer.sync_if_needed() def close(self): for _logger in self._loggers: diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index 176c44e3b69a..e2e2c1519b94 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -49,7 +49,7 @@ def testLogCreation(self): self.assertTrue(os.path.exists(params_path)) self.assertTrue(os.path.exists(result_path)) - track.shutdown() + self.assertTrue(session.logdir == track.trial_dir()) def testMetric(self): track.init(trial_name="test_metric") diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index 4685314c5763..e2b18c115ee4 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -65,7 +65,7 @@ def trial_dir(): This includes json data containing the session's parameters and metrics. """ _session = get_session() - return _session.trial_dir() + return _session.logdir __all__ = [ diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 42eeaf5a40e3..52535ac1c6e7 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -106,3 +106,7 @@ def close(self): # TODO(rliaw): Have Tune support updated configs self._logger.update_config(self.trial_config) self._logger.close() + + @property + def logdir(self): + return self.experiment_dir From b81f6f84d9e8e7945c5f0befb84b5bfcf4ef7485 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 12 May 2019 22:32:20 -0700 Subject: [PATCH 22/28] Cleanup --- python/ray/tune/tests/test_track.py | 13 +++++------ python/ray/tune/track/__init__.py | 10 ++++----- python/ray/tune/track/session.py | 34 ++++++++++++++--------------- 3 files changed, 26 insertions(+), 31 deletions(-) diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index e2e2c1519b94..8afc9a7471da 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -41,7 +41,6 @@ def testLogCreation(self): session = track.get_session() self.assertTrue(session is not None) - self.assertTrue(os.path.isdir(session.experiment_dir)) self.assertTrue(os.path.isdir(session.logdir)) params_path = os.path.join(session.logdir, EXPR_PARAM_FILE) @@ -52,10 +51,10 @@ def testLogCreation(self): self.assertTrue(session.logdir == track.trial_dir()) def testMetric(self): - track.init(trial_name="test_metric") + track.init(trial_name="test_log") session = track.get_session() for i in range(5): - track.metric(test=i) + track.log(test=i) result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) self.assertTrue(_check_json_val(result_path, "test", i)) @@ -65,7 +64,7 @@ def testRayOutput(self): def testme(config): for i in range(config["iters"]): - track.metric(iteration=i, **{"hi": "test"}) + track.log(iteration=i, hi="test") trials = tune.run(testme, config={"iters": 5}) trial_res = trials[0].last_result @@ -74,12 +73,12 @@ def testme(config): def testLocalMetrics(self): """Checks that metric state is updated correctly.""" - track.init(trial_name="test_metrics") + track.init(trial_name="test_logs") session = track.get_session() self.assertEqual(set(session.trial_config.keys()), set(["trial_id"])) result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) - track.metric(test=1) + track.log(test=1) self.assertTrue(_check_json_val(result_path, "test", 1)) - track.metric(iteration=1, test=2) + track.log(iteration=1, test=2) self.assertTrue(_check_json_val(result_path, "test", 2)) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index e2b18c115ee4..eba5128e29af 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -53,10 +53,10 @@ def shutdown(): _session = None -def metric(iteration=None, **kwargs): - """Applies TrackSession.metric to the trial in the current context.""" +def log(iteration=None, **kwargs): + """Applies TrackSession.log to the trial in the current context.""" _session = get_session() - return _session.metric(iteration=iteration, **kwargs) + return _session.log(iteration=iteration, **kwargs) def trial_dir(): @@ -68,6 +68,4 @@ def trial_dir(): return _session.logdir -__all__ = [ - "TrackSession", "session", "metric", "trial_dir", "init", "shutdown" -] +__all__ = ["TrackSession", "session", "log", "trial_dir", "init", "shutdown"] diff --git a/python/ray/tune/track/session.py b/python/ray/tune/track/session.py index 52535ac1c6e7..faf850e5fea2 100644 --- a/python/ray/tune/track/session.py +++ b/python/ray/tune/track/session.py @@ -40,11 +40,11 @@ def __init__(self, upload_dir=None, trial_config=None, _tune_reporter=None): - self.experiment_dir = None - self.logdir = None - self.upload_dir = None + self._experiment_dir = None + self._logdir = None + self._upload_dir = None self.trial_config = None - self.iteration = 0 + self._iteration = -1 self.is_tune_session = bool(_tune_reporter) self.trial_id = Trial.generate_id() if trial_name: @@ -65,39 +65,36 @@ def _initialize_logging(self, if experiment_dir is None: experiment_dir = os.path.join(DEFAULT_RESULTS_DIR, "default") - self.experiment_dir = os.path.expanduser(experiment_dir) + self._experiment_dir = os.path.expanduser(experiment_dir) # TODO(rliaw): Refactor `logdir` to `trial_dir`. - self.logdir = Trial.create_logdir(trial_name, self.experiment_dir) - self.upload_dir = upload_dir + self._logdir = Trial.create_logdir(trial_name, self._experiment_dir) + self._upload_dir = upload_dir self.trial_config = trial_config or {} # misc metadata to save as well self.trial_config["trial_id"] = self.trial_id - self._logger = UnifiedLogger(self.trial_config, self.logdir, - self.upload_dir) + self._logger = UnifiedLogger(self.trial_config, self._logdir, + self._upload_dir) - def metric(self, iteration=None, **metrics): + def log(self, **metrics): """Logs all named arguments specified in **metrics. This will log trial metrics locally, and they will be synchronized with the driver periodically through ray. Arguments: - iteration (int): current iteration of the trial. - **metrics: named arguments with corresponding values to log. + metrics: named arguments with corresponding values to log. """ - # TODO: Implement a batching mechanism for multiple calls to `metric` + # TODO: Implement a batching mechanism for multiple calls to `log` # within the same iteration. + self._iteration += 1 metrics_dict = metrics.copy() metrics_dict.update({"trial_id": self.trial_id}) - if iteration is not None: - self.iteration = max(iteration, self.iteration) - # TODO: Move Trainable autopopulation to a util function - metrics_dict[TRAINING_ITERATION] = self.iteration + metrics_dict.setdefault(TRAINING_ITERATION, self._iteration) self._logger.on_result(metrics_dict) def close(self): @@ -109,4 +106,5 @@ def close(self): @property def logdir(self): - return self.experiment_dir + """Trial logdir (subdir of given experiment directory)""" + return self._logdir From 350188e63f91425d57c8a964a7c95923c14a4f57 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 16 May 2019 11:20:21 -0700 Subject: [PATCH 23/28] lint --- python/ray/tune/tests/test_track.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index 8afc9a7471da..d3b6c38d745a 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -75,7 +75,7 @@ def testLocalMetrics(self): """Checks that metric state is updated correctly.""" track.init(trial_name="test_logs") session = track.get_session() - self.assertEqual(set(session.trial_config.keys()), set(["trial_id"])) + self.assertEqual(set(session.trial_config.keys()), {"trial_id"}) result_path = os.path.join(session.logdir, EXPR_RESULT_FILE) track.log(test=1) From 41075fdc6720e7e10ae3afb4f929bbc8412eb1f6 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 16 May 2019 11:27:04 -0700 Subject: [PATCH 24/28] localdir --- python/ray/tune/trial.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index e8957d875d88..272945ba1cf4 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -346,6 +346,8 @@ def generate_id(cls): @classmethod def create_logdir(cls, identifier, local_dir): + if not os.path.exists(local_dir): + os.makedirs(local_dir) return tempfile.mkdtemp( prefix="{}_{}".format(identifier[:MAX_LEN_IDENTIFIER], date_str()), dir=local_dir) @@ -354,8 +356,6 @@ def init_logger(self): """Init logger.""" if not self.result_logger: - if not os.path.exists(self.local_dir): - os.makedirs(self.local_dir) if not self.logdir: self.logdir = Trial.create_logdir(str(self), self.local_dir) elif not os.path.exists(self.logdir): From 9ce0403bab43db89369e411fe33cb8126ff9cb63 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 16 May 2019 11:32:56 -0700 Subject: [PATCH 25/28] fix --- python/ray/tune/function_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 46a45daaec3d..efc44a809bfe 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -247,7 +247,7 @@ def _report_thread_runner_error(self, block=False): def wrap_function(train_func): - function_args = inspect.getargspec(train_func).args + function_args = inspect.signature(train_func).parameters use_track = ("reporter" not in function_args and len(function_args) == 1) class WrappedFunc(FunctionRunner): From 732fd126cfdb7d45a99baab85e2fc5109d3037cd Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 16 May 2019 15:55:39 -0700 Subject: [PATCH 26/28] comments --- python/ray/tune/track/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/track/__init__.py b/python/ray/tune/track/__init__.py index eba5128e29af..a35511e89350 100644 --- a/python/ray/tune/track/__init__.py +++ b/python/ray/tune/track/__init__.py @@ -53,10 +53,10 @@ def shutdown(): _session = None -def log(iteration=None, **kwargs): +def log(**kwargs): """Applies TrackSession.log to the trial in the current context.""" _session = get_session() - return _session.log(iteration=iteration, **kwargs) + return _session.log(**kwargs) def trial_dir(): From 38444ae1e11ca966cdd2f1b8f4bb2c7a0319be21 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 17 May 2019 02:23:17 -0700 Subject: [PATCH 27/28] safer track inspection --- python/ray/tune/function_runner.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index efc44a809bfe..f6e6eb2601d9 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -247,8 +247,14 @@ def _report_thread_runner_error(self, block=False): def wrap_function(train_func): - function_args = inspect.signature(train_func).parameters - use_track = ("reporter" not in function_args and len(function_args) == 1) + use_track = False + try: + func_args = inspect.getargspec(train_func).args + use_track = ("reporter" not in func_args and len(func_args) == 1) + if use_track: + logger.info("tune.track signature detected.") + except Exception: + logger.info("Function inspection failed - assuming reporter signature.") class WrappedFunc(FunctionRunner): def _trainable_func(self, config, reporter): From 9e13309ea89fd1a1e585a8e0ed63a7d5ca895948 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 17 May 2019 11:31:01 -0700 Subject: [PATCH 28/28] lint --- python/ray/tune/function_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index f6e6eb2601d9..e30e2bdf5cf0 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -254,7 +254,8 @@ def wrap_function(train_func): if use_track: logger.info("tune.track signature detected.") except Exception: - logger.info("Function inspection failed - assuming reporter signature.") + logger.info( + "Function inspection failed - assuming reporter signature.") class WrappedFunc(FunctionRunner): def _trainable_func(self, config, reporter):