Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Introduce ability to turn off default logging. #4104

Merged
merged 13 commits into from
Mar 1, 2019
5 changes: 4 additions & 1 deletion doc/source/tune-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,18 @@ You can pass in your own logging mechanisms to output logs in custom formats
via the Experiment object as follows:

.. code-block:: python
from ray.tune import DEFAULT_LOGGERS

exp = Experiment(
name="experiment_name",
run=MyTrainableClass,
custom_loggers=[CustomLogger1, CustomLogger2]
loggers=DEFAULT_LOGGERS + (CustomLogger1, CustomLogger2)
)

These loggers will be called along with the default Tune loggers. All loggers must inherit the `Logger interface <tune-package-ref.html#ray.tune.logger.Logger>`__.

Tune has default loggers for Tensorboard, CSV, and JSON formats.

You can also check out `logger.py <https://github.com/ray-project/ray/blob/master/python/ray/tune/logger.py>`__ for implementation details.

An example can be found in `logging_example.py <https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/logging_example.py>`__.
Expand Down
2 changes: 2 additions & 0 deletions python/ray/tune/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ray.tune.error import TuneError
from ray.tune.tune import run_experiments
from ray.tune.logger import DEFAULT_LOGGERS
from ray.tune.experiment import Experiment
from ray.tune.registry import register_env, register_trainable
from ray.tune.trainable import Trainable
Expand All @@ -19,4 +20,5 @@
"Experiment",
"function",
"sample_from",
"DEFAULT_LOGGERS"
]
9 changes: 5 additions & 4 deletions python/ray/tune/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ray.tune import TuneError
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.trial import Trial, json_to_resources
from ray.tune.logger import _SafeFallbackEncoder
from ray.tune.logger import _SafeFallbackEncoder, DEFAULT_LOGGERS


def make_parser(parser_creator=None, **kwargs):
Expand Down Expand Up @@ -88,9 +88,10 @@ def make_parser(parser_creator=None, **kwargs):
"then it must be a string template for syncer to run and needs to "
"include replacement fields '{local_dir}' and '{remote_dir}'.")
parser.add_argument(
"--custom-loggers",
"--loggers",
default=None,
help="List of custom logger creators to be used with each Trial.")
help="List of logger creators to be used with each Trial. "
"Defaults to ray.tune.loggers.DEFAULT_LOGGERS.")
parser.add_argument(
"--checkpoint-freq",
default=0,
Expand Down Expand Up @@ -192,7 +193,7 @@ def create_trial_from_spec(spec, output_path, parser, **trial_kwargs):
restore_path=spec.get("restore"),
upload_dir=args.upload_dir,
trial_name_creator=spec.get("trial_name_creator"),
custom_loggers=spec.get("custom_loggers"),
loggers=spec.get("loggers", DEFAULT_LOGGERS),
# str(None) doesn't create None
sync_function=spec.get("sync_function"),
max_failures=args.max_failures,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/examples/logging_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _restore(self, checkpoint_path):
run=MyTrainableClass,
num_samples=1,
trial_name_creator=tune.function(trial_str_creator),
custom_loggers=[TestLogger],
loggers=[TestLogger],
stop={"training_iteration": 1 if args.smoke_test else 99999},
config={
"width": tune.sample_from(
Expand Down
10 changes: 6 additions & 4 deletions python/ray/tune/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import types

from ray.tune.error import TuneError
from ray.tune.logger import DEFAULT_LOGGERS
from ray.tune.registry import register_trainable
from ray.tune.result import DEFAULT_RESULTS_DIR

Expand Down Expand Up @@ -62,8 +63,9 @@ class Experiment(object):
to (e.g. ``s3://bucket``).
trial_name_creator (func): Optional function for generating
the trial string representation.
custom_loggers (list): List of custom logger creators to be used with
each Trial. See `ray/tune/logger.py`.
loggers (list): List of logger creators to be used with
each Trial. Defaults to ray.tune.loggers.DEFAULT_LOGGERS.
See `ray/tune/logger.py`.
sync_function (func|str): Function for syncing the local_dir to
upload_dir. If string, then it must be a string template for
syncer to run. If not provided, the sync command defaults
Expand Down Expand Up @@ -117,7 +119,7 @@ def __init__(self,
local_dir=None,
upload_dir=None,
trial_name_creator=None,
custom_loggers=None,
loggers=DEFAULT_LOGGERS,
sync_function=None,
checkpoint_freq=0,
checkpoint_at_end=False,
Expand Down Expand Up @@ -145,7 +147,7 @@ def __init__(self,
"local_dir": os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR),
"upload_dir": upload_dir or "", # argparse converts None to "null"
"trial_name_creator": trial_name_creator,
"custom_loggers": custom_loggers,
"loggers": loggers,
"sync_function": sync_function,
"checkpoint_freq": checkpoint_freq,
"checkpoint_at_end": checkpoint_at_end,
Expand Down
145 changes: 73 additions & 72 deletions python/ray/tune/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,81 +66,12 @@ def flush(self):
pass


class UnifiedLogger(Logger):
"""Unified result logger for TensorBoard, rllab/viskit, plain json.

This class also periodically syncs output to the given upload uri.

Arguments:
config: Configuration passed to all logger creators.
logdir: Directory for all logger creators to log to.
upload_uri (str): Optional URI where the logdir is sync'ed to.
custom_loggers (list): List of custom logger creators.
sync_function (func|str): Optional function for syncer to run.
See ray/python/ray/tune/log_sync.py
"""

def __init__(self,
config,
logdir,
upload_uri=None,
custom_loggers=None,
sync_function=None):
self._logger_list = [_JsonLogger, _TFLogger, _CSVLogger]
self._sync_function = sync_function
self._log_syncer = None
if custom_loggers:
assert isinstance(custom_loggers, list), "Improper custom loggers."
self._logger_list += custom_loggers

Logger.__init__(self, config, logdir, upload_uri)

def _init(self):
self._loggers = []
for cls in self._logger_list:
try:
self._loggers.append(cls(self.config, self.logdir, self.uri))
except Exception:
logger.warning("Could not instantiate {} - skipping.".format(
str(cls)))
self._log_syncer = get_syncer(
self.logdir, self.uri, sync_function=self._sync_function)

def on_result(self, result):
for _logger in self._loggers:
_logger.on_result(result)
self._log_syncer.set_worker_ip(result.get(NODE_IP))
self._log_syncer.sync_if_needed()

def close(self):
for _logger in self._loggers:
_logger.close()
self._log_syncer.sync_now(force=True)
self._log_syncer.close()

def flush(self):
for _logger in self._loggers:
_logger.flush()
self._log_syncer.sync_now(force=True)
self._log_syncer.wait()

def sync_results_to_new_location(self, worker_ip):
"""Sends the current log directory to the remote node.

Syncing will not occur if the cluster is not started
with the Ray autoscaler.
"""
if worker_ip != self._log_syncer.worker_ip:
self._log_syncer.set_worker_ip(worker_ip)
self._log_syncer.sync_to_worker_if_possible()


class NoopLogger(Logger):
def on_result(self, result):
pass


class _JsonLogger(Logger):
class JsonLogger(Logger):
def _init(self):
config_out = os.path.join(self.logdir, "params.json")
with open(config_out, "w") as f:
Expand Down Expand Up @@ -188,7 +119,7 @@ def to_tf_values(result, path):
return values


class _TFLogger(Logger):
class TFLogger(Logger):
def _init(self):
self._file_writer = tf.summary.FileWriter(self.logdir)

Expand Down Expand Up @@ -217,7 +148,7 @@ def close(self):
self._file_writer.close()


class _CSVLogger(Logger):
class CSVLogger(Logger):
def _init(self):
"""CSV outputted with Headers as first set of results."""
# Note that we assume params.json was already created by JsonLogger
Expand All @@ -242,6 +173,76 @@ def close(self):
self._file.close()


DEFAULT_LOGGERS = (JsonLogger, CSVLogger, TFLogger)


class UnifiedLogger(Logger):
"""Unified result logger for TensorBoard, rllab/viskit, plain json.

This class also periodically syncs output to the given upload uri.

Arguments:
config: Configuration passed to all logger creators.
logdir: Directory for all logger creators to log to.
upload_uri (str): Optional URI where the logdir is sync'ed to.
loggers (list): List of logger creators. Defaults to CSV, Tensorboard,
and JSON loggers.
sync_function (func|str): Optional function for syncer to run.
See ray/python/ray/tune/log_sync.py
"""

def __init__(self,
config,
logdir,
upload_uri=None,
loggers=DEFAULT_LOGGERS,
sync_function=None):
self._logger_cls_list = loggers
self._sync_function = sync_function
self._log_syncer = None

Logger.__init__(self, config, logdir, upload_uri)

def _init(self):
self._loggers = []
for cls in self._logger_cls_list:
try:
self._loggers.append(cls(self.config, self.logdir, self.uri))
except Exception:
logger.warning("Could not instantiate {} - skipping.".format(
str(cls)))
self._log_syncer = get_syncer(
self.logdir, self.uri, sync_function=self._sync_function)

def on_result(self, result):
for _logger in self._loggers:
_logger.on_result(result)
self._log_syncer.set_worker_ip(result.get(NODE_IP))
self._log_syncer.sync_if_needed()

def close(self):
for _logger in self._loggers:
_logger.close()
self._log_syncer.sync_now(force=True)
self._log_syncer.close()

def flush(self):
for _logger in self._loggers:
_logger.flush()
self._log_syncer.sync_now(force=True)
self._log_syncer.wait()

def sync_results_to_new_location(self, worker_ip):
"""Sends the current log directory to the remote node.

Syncing will not occur if the cluster is not started
with the Ray autoscaler.
"""
if worker_ip != self._log_syncer.worker_ip:
self._log_syncer.set_worker_ip(worker_ip)
self._log_syncer.sync_to_worker_if_possible()


class _SafeFallbackEncoder(json.JSONEncoder):
def __init__(self, nan_str="null", **kwargs):
super(_SafeFallbackEncoder, self).__init__(**kwargs)
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tune/test/trial_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,12 @@ def on_result(self, result):
"stop": {
"training_iteration": 1
},
"custom_loggers": [CustomLogger]
"loggers": [CustomLogger]
}
})
self.assertTrue(os.path.exists(os.path.join(trial.logdir, "test.log")))
self.assertFalse(
os.path.exists(os.path.join(trial.logdir, "params.json")))

def testCustomTrialString(self):
[trial] = run_experiments({
Expand Down
13 changes: 7 additions & 6 deletions python/ray/tune/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import ray
from ray.tune import TuneError
from ray.tune.log_sync import validate_sync_function
from ray.tune.logger import pretty_print, UnifiedLogger
from ray.tune.logger import pretty_print, UnifiedLogger, DEFAULT_LOGGERS
# NOTE(rkn): We import ray.tune.registry here instead of importing the names we
# need because there are cyclic imports that may cause specific names to not
# have been defined yet. See https://github.com/ray-project/ray/issues/1716.
Expand Down Expand Up @@ -256,7 +256,7 @@ def __init__(self,
restore_path=None,
upload_dir=None,
trial_name_creator=None,
custom_loggers=None,
loggers=DEFAULT_LOGGERS,
sync_function=None,
max_failures=0):
"""Initialize a new trial.
Expand All @@ -276,7 +276,7 @@ def __init__(self,
or self._get_trainable_cls().default_resource_request(self.config))
self.stopping_criterion = stopping_criterion or {}
self.upload_dir = upload_dir
self.custom_loggers = custom_loggers
self.loggers = loggers
self.sync_function = sync_function
validate_sync_function(sync_function)
self.verbose = True
Expand Down Expand Up @@ -333,7 +333,7 @@ def init_logger(self):
self.config,
self.logdir,
upload_uri=self.upload_dir,
custom_loggers=self.custom_loggers,
loggers=self.loggers,
sync_function=self.sync_function)

def sync_logger_to_new_location(self, worker_ip):
Expand Down Expand Up @@ -509,10 +509,11 @@ def __getstate__(self):
state = self.__dict__.copy()
state["resources"] = resources_to_json(self.resources)

# These are non-pickleable entries.
pickle_data = {
"_checkpoint": self._checkpoint,
"config": self.config,
"custom_loggers": self.custom_loggers,
"loggers": self.loggers,
"sync_function": self.sync_function,
"last_result": self.last_result
}
Expand All @@ -535,7 +536,7 @@ def __setstate__(self, state):
logger_started = state.pop("__logger_started__")
state["resources"] = json_to_resources(state["resources"])
for key in [
"_checkpoint", "config", "custom_loggers", "sync_function",
"_checkpoint", "config", "loggers", "sync_function",
"last_result"
]:
state[key] = cloudpickle.loads(hex_to_binary(state[key]))
Expand Down