From 5db1afef0710e489c72d2cd7bab616a9ad352a8c Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 7 Feb 2019 00:29:19 -0800 Subject: [PATCH] [tune] Support Custom Resources (#2979) Support arbitrary resource declarations in Tune. Fixes https://github.com/ray-project/ray/issues/2875 --- python/ray/tune/ray_trial_executor.py | 86 +++++++++++--- python/ray/tune/test/trial_runner_test.py | 138 +++++++++++++++++++++- python/ray/tune/trial.py | 96 ++++++++++++--- python/ray/tune/tune.py | 2 +- 4 files changed, 287 insertions(+), 35 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index ff672a329e3f..85fa86afd464 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -35,7 +35,9 @@ def __init__(self, queue_trials=False): def _setup_runner(self, trial): cls = ray.remote( num_cpus=trial.resources.cpu, - num_gpus=trial.resources.gpu)(trial._get_trainable_cls()) + num_gpus=trial.resources.gpu, + resources=trial.resources.custom_resources)( + trial._get_trainable_cls()) trial.init_logger() # We checkpoint metadata here to try mitigating logdir duplication @@ -229,16 +231,37 @@ def fetch_result(self, trial): return result def _commit_resources(self, resources): + committed = self._committed_resources + all_keys = set(resources.custom_resources).union( + set(committed.custom_resources)) + + custom_resources = { + k: committed.get(k) + resources.get_res_total(k) + for k in all_keys + } + self._committed_resources = Resources( - self._committed_resources.cpu + resources.cpu_total(), - self._committed_resources.gpu + resources.gpu_total()) + committed.cpu + resources.cpu_total(), + committed.gpu + resources.gpu_total(), + custom_resources=custom_resources) def _return_resources(self, resources): + committed = self._committed_resources + + all_keys = set(resources.custom_resources).union( + set(committed.custom_resources)) + + custom_resources = { + k: committed.get(k) - resources.get_res_total(k) + for k in all_keys + } self._committed_resources = Resources( - self._committed_resources.cpu - resources.cpu_total(), - self._committed_resources.gpu - resources.gpu_total()) - assert self._committed_resources.cpu >= 0 - assert self._committed_resources.gpu >= 0 + committed.cpu - resources.cpu_total(), + committed.gpu - resources.gpu_total(), + custom_resources=custom_resources) + + assert self._committed_resources.is_nonnegative(), ( + "Resource invalid: {}".format(resources)) def _update_avail_resources(self, num_retries=5): for i in range(num_retries): @@ -247,28 +270,37 @@ def _update_avail_resources(self, num_retries=5): logger.warning("Cluster resources not detected. Retrying...") time.sleep(0.5) - num_cpus = resources["CPU"] - num_gpus = resources["GPU"] + resources = resources.copy() + num_cpus = resources.pop("CPU") + num_gpus = resources.pop("GPU") + custom_resources = resources - self._avail_resources = Resources(int(num_cpus), int(num_gpus)) + self._avail_resources = Resources( + int(num_cpus), int(num_gpus), custom_resources=custom_resources) self._resources_initialized = True def has_resources(self, resources): """Returns whether this runner has at least the specified resources.""" self._update_avail_resources() - cpu_avail = self._avail_resources.cpu - self._committed_resources.cpu - gpu_avail = self._avail_resources.gpu - self._committed_resources.gpu + currently_available = Resources.subtract(self._avail_resources, + self._committed_resources) - have_space = (resources.cpu_total() <= cpu_avail - and resources.gpu_total() <= gpu_avail) + have_space = ( + resources.cpu_total() <= currently_available.cpu + and resources.gpu_total() <= currently_available.gpu and all( + resources.get_res_total(res) <= currently_available.get(res) + for res in resources.custom_resources)) if have_space: return True can_overcommit = self._queue_trials - if (resources.cpu_total() > 0 and cpu_avail <= 0) or \ - (resources.gpu_total() > 0 and gpu_avail <= 0): + if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \ + (resources.gpu_total() > 0 and currently_available.gpu <= 0) or \ + any((resources.get_res_total(res_name) > 0 + and currently_available.get(res_name) <= 0) + for res_name in resources.custom_resources): can_overcommit = False # requested resource is already saturated if can_overcommit: @@ -287,9 +319,18 @@ def debug_string(self): """Returns a human readable message for printing to the console.""" if self._resources_initialized: - return "Resources requested: {}/{} CPUs, {}/{} GPUs".format( + status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format( self._committed_resources.cpu, self._avail_resources.cpu, self._committed_resources.gpu, self._avail_resources.gpu) + customs = ", ".join([ + "{}/{} {}".format( + self._committed_resources.get_res_total(name), + self._avail_resources.get_res_total(name), name) + for name in self._avail_resources.custom_resources + ]) + if customs: + status += " ({})".format(customs) + return status else: return "Resources requested: ?" @@ -297,8 +338,15 @@ def resource_string(self): """Returns a string describing the total resources available.""" if self._resources_initialized: - return "{} CPUs, {} GPUs".format(self._avail_resources.cpu, - self._avail_resources.gpu) + res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu, + self._avail_resources.gpu) + if self._avail_resources.custom_resources: + custom = ", ".join( + "{} {}".format( + self._avail_resources.get_res_total(name), name) + for name in self._avail_resources.custom_resources) + res_str += " ({})".format(custom) + return res_str else: return "? CPUs, ? GPUs" diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py index 735b0659431f..f80479b7cf30 100644 --- a/python/ray/tune/test/trial_runner_test.py +++ b/python/ray/tune/test/trial_runner_test.py @@ -23,7 +23,8 @@ from ray.tune.logger import Logger from ray.tune.util import pin_in_object_store, get_pinned_object from ray.tune.experiment import Experiment -from ray.tune.trial import Trial, Resources, ExportFormat +from ray.tune.trial import (Trial, ExportFormat, Resources, resources_to_json, + json_to_resources) from ray.tune.trial_runner import TrialRunner from ray.tune.suggest import grid_search, BasicVariantGenerator from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm, @@ -736,6 +737,28 @@ def _train(self): for trial in trials: self.assertEqual(trial.status, Trial.TERMINATED) + def testCustomResources(self): + ray.shutdown() + ray.init(resources={"hi": 3}) + + class train(Trainable): + def _train(self): + return {"timesteps_this_iter": 1, "done": True} + + trials = run_experiments({ + "foo": { + "run": train, + "resources_per_trial": { + "cpu": 1, + "custom_resources": { + "hi": 2 + } + } + } + }) + for trial in trials: + self.assertEqual(trial.status, Trial.TERMINATED) + def testCustomLogger(self): class CustomLogger(Logger): def on_result(self, result): @@ -1083,6 +1106,62 @@ def testExtraResources(self): self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.PENDING) + def testCustomResources(self): + ray.init(num_cpus=4, num_gpus=2, resources={"a": 2}) + runner = TrialRunner(BasicVariantGenerator()) + kwargs = { + "stopping_criterion": { + "training_iteration": 1 + }, + "resources": Resources(cpu=1, gpu=0, custom_resources={"a": 2}), + } + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] + for t in trials: + runner.add_trial(t) + + runner.step() + self.assertEqual(trials[0].status, Trial.RUNNING) + self.assertEqual(trials[1].status, Trial.PENDING) + + runner.step() + self.assertEqual(trials[0].status, Trial.TERMINATED) + self.assertEqual(trials[1].status, Trial.PENDING) + + def testExtraCustomResources(self): + ray.init(num_cpus=4, num_gpus=2, resources={"a": 2}) + runner = TrialRunner(BasicVariantGenerator()) + kwargs = { + "stopping_criterion": { + "training_iteration": 1 + }, + "resources": Resources( + cpu=1, gpu=0, extra_custom_resources={"a": 2}), + } + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] + for t in trials: + runner.add_trial(t) + + runner.step() + self.assertEqual(trials[0].status, Trial.RUNNING) + self.assertEqual(trials[1].status, Trial.PENDING) + + runner.step() + self.assertTrue(sum(t.status == Trial.RUNNING for t in trials) < 2) + self.assertEqual(trials[0].status, Trial.TERMINATED) + self.assertEqual(trials[1].status, Trial.PENDING) + + def testCustomResources2(self): + ray.init(num_cpus=4, num_gpus=2, resources={"a": 2}) + runner = TrialRunner(BasicVariantGenerator()) + resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2}) + self.assertTrue(runner.has_resources(resource1)) + resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2}) + self.assertTrue(runner.has_resources(resource2)) + resource3 = Resources(cpu=1, gpu=0, custom_resources={"a": 3}) + self.assertFalse(runner.has_resources(resource3)) + resource4 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 3}) + self.assertFalse(runner.has_resources(resource4)) + def testFractionalGpus(self): ray.init(num_cpus=4, num_gpus=1) runner = TrialRunner(BasicVariantGenerator()) @@ -1292,6 +1371,7 @@ def testFailureRecoveryNodeRemoval(self): resource_mock.return_value = {"CPU": 1, "GPU": 1} runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) + runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) @@ -1878,5 +1958,61 @@ def _suggest(self, trial_id): self.assertTrue("d=4" in trial.experiment_tag) +class ResourcesTest(unittest.TestCase): + def testSubtraction(self): + resource_1 = Resources( + 1, + 0, + 0, + 1, + custom_resources={ + "a": 1, + "b": 2 + }, + extra_custom_resources={ + "a": 1, + "b": 1 + }) + resource_2 = Resources( + 1, + 0, + 0, + 1, + custom_resources={ + "a": 1, + "b": 2 + }, + extra_custom_resources={ + "a": 1, + "b": 1 + }) + new_res = Resources.subtract(resource_1, resource_2) + self.assertTrue(new_res.cpu == 0) + self.assertTrue(new_res.gpu == 0) + self.assertTrue(new_res.extra_cpu == 0) + self.assertTrue(new_res.extra_gpu == 0) + self.assertTrue(all(k == 0 for k in new_res.custom_resources.values())) + self.assertTrue( + all(k == 0 for k in new_res.extra_custom_resources.values())) + + def testDifferentResources(self): + resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2}) + resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2}) + new_res = Resources.subtract(resource_1, resource_2) + assert "c" in new_res.custom_resources + assert "b" in new_res.custom_resources + self.assertTrue(new_res.cpu == 0) + self.assertTrue(new_res.gpu == 0) + self.assertTrue(new_res.extra_cpu == 0) + self.assertTrue(new_res.extra_gpu == 0) + self.assertTrue(new_res.get("a") == 0) + + def testSerialization(self): + original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2}) + jsoned = resources_to_json(original) + new_resource = json_to_resources(jsoned) + self.assertEquals(original, new_resource) + + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 36057ac2a4ed..3ef0b5027a46 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -11,10 +11,10 @@ import time import tempfile import os +from numbers import Number # For compatibility under py2 to consider unicode as str from six import string_types -from numbers import Number import ray from ray.tune import TuneError @@ -38,11 +38,12 @@ def date_str(): class Resources( - namedtuple("Resources", ["cpu", "gpu", "extra_cpu", "extra_gpu"])): + namedtuple("Resources", [ + "cpu", "gpu", "extra_cpu", "extra_gpu", "custom_resources", + "extra_custom_resources" + ])): """Ray resources required to schedule a trial. - TODO: Custom resources. - Attributes: cpu (float): Number of CPUs to allocate to the trial. gpu (float): Number of GPUs to allocate to the trial. @@ -50,21 +51,51 @@ class Resources( launch additional Ray actors that use CPUs. extra_gpu (float): Extra GPUs to reserve in case the trial needs to launch additional Ray actors that use GPUs. + custom_resources (dict): Mapping of resource to quantity to allocate + to the trial. + extra_custom_resources (dict): Extra custom resources to reserve in + case the trial needs to launch additional Ray actors that use + any of these custom resources. """ __slots__ = () - def __new__(cls, cpu, gpu, extra_cpu=0, extra_gpu=0): - for entry in [cpu, gpu, extra_cpu, extra_gpu]: + def __new__(cls, + cpu, + gpu, + extra_cpu=0, + extra_gpu=0, + custom_resources=None, + extra_custom_resources=None): + custom_resources = custom_resources or {} + extra_custom_resources = extra_custom_resources or {} + leftovers = set(custom_resources) ^ set(extra_custom_resources) + + for value in leftovers: + custom_resources.setdefault(value, 0) + extra_custom_resources.setdefault(value, 0) + + all_values = [cpu, gpu, extra_cpu, extra_gpu] + all_values += list(custom_resources.values()) + all_values += list(extra_custom_resources.values()) + assert len(custom_resources) == len(extra_custom_resources) + for entry in all_values: assert isinstance(entry, Number), "Improper resource value." - assert entry >= 0, "Resource cannot be negative." - return super(Resources, cls).__new__(cls, cpu, gpu, extra_cpu, - extra_gpu) + return super(Resources, + cls).__new__(cls, cpu, gpu, extra_cpu, extra_gpu, + custom_resources, extra_custom_resources) def summary_string(self): - return "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu, - self.gpu + self.extra_gpu) + summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu, + self.gpu + self.extra_gpu) + custom_summary = ", ".join([ + "{} {}".format(self.get_res_total(res), res) + for res in self.custom_resources + ]) + if custom_summary: + summary += " ({})".format(custom_summary) + return summary def cpu_total(self): return self.cpu + self.extra_cpu @@ -72,6 +103,40 @@ def cpu_total(self): def gpu_total(self): return self.gpu + self.extra_gpu + def get_res_total(self, key): + return self.custom_resources.get( + key, 0) + self.extra_custom_resources.get(key, 0) + + def get(self, key): + return self.custom_resources.get(key, 0) + + def is_nonnegative(self): + all_values = [self.cpu, self.gpu, self.extra_cpu, self.extra_gpu] + all_values += list(self.custom_resources.values()) + all_values += list(self.extra_custom_resources.values()) + return all(v >= 0 for v in all_values) + + @classmethod + def subtract(cls, original, to_remove): + cpu = original.cpu - to_remove.cpu + gpu = original.gpu - to_remove.gpu + extra_cpu = original.extra_cpu - to_remove.extra_cpu + extra_gpu = original.extra_gpu - to_remove.extra_gpu + all_resources = set(original.custom_resources).union( + set(to_remove.custom_resources)) + new_custom_res = { + k: original.custom_resources.get(k, 0) - + to_remove.custom_resources.get(k, 0) + for k in all_resources + } + extra_custom_res = { + k: original.extra_custom_resources.get(k, 0) - + to_remove.extra_custom_resources.get(k, 0) + for k in all_resources + } + return Resources(cpu, gpu, extra_cpu, extra_gpu, new_custom_res, + extra_custom_res) + def json_to_resources(data): if data is None or data == "null": @@ -84,12 +149,13 @@ def json_to_resources(data): "The field `{}` is no longer supported. Use `extra_cpu` " "or `extra_gpu` instead.".format(k)) if k not in Resources._fields: - raise TuneError( - "Unknown resource type {}, must be one of {}".format( + raise ValueError( + "Unknown resource field {}, must be one of {}".format( k, Resources._fields)) return Resources( data.get("cpu", 1), data.get("gpu", 0), data.get("extra_cpu", 0), - data.get("extra_gpu", 0)) + data.get("extra_gpu", 0), data.get("custom_resources"), + data.get("extra_custom_resources")) def resources_to_json(resources): @@ -100,6 +166,8 @@ def resources_to_json(resources): "gpu": resources.gpu, "extra_cpu": resources.extra_cpu, "extra_gpu": resources.extra_gpu, + "custom_resources": resources.custom_resources.copy(), + "extra_custom_resources": resources.extra_custom_resources.copy() } diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 912c4a3130d1..0294497bf1f3 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -159,7 +159,7 @@ def run_experiments(experiments, metadata_checkpoint_dir=checkpoint_dir, launch_web_server=with_server, server_port=server_port, - verbose=int(verbose > 1), + verbose=bool(verbose > 1), queue_trials=queue_trials, trial_executor=trial_executor)