Skip to content

Commit

Permalink
[tune] Support Custom Resources (ray-project#2979)
Browse files Browse the repository at this point in the history
Support arbitrary resource declarations in Tune.

Fixes ray-project#2875
  • Loading branch information
richardliaw authored Feb 7, 2019
1 parent a654152 commit 5db1afe
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 35 deletions.
86 changes: 67 additions & 19 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def __init__(self, queue_trials=False):
def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu)(trial._get_trainable_cls())
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())

trial.init_logger()
# We checkpoint metadata here to try mitigating logdir duplication
Expand Down Expand Up @@ -229,16 +231,37 @@ def fetch_result(self, trial):
return result

def _commit_resources(self, resources):
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))

custom_resources = {
k: committed.get(k) + resources.get_res_total(k)
for k in all_keys
}

self._committed_resources = Resources(
self._committed_resources.cpu + resources.cpu_total(),
self._committed_resources.gpu + resources.gpu_total())
committed.cpu + resources.cpu_total(),
committed.gpu + resources.gpu_total(),
custom_resources=custom_resources)

def _return_resources(self, resources):
committed = self._committed_resources

all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))

custom_resources = {
k: committed.get(k) - resources.get_res_total(k)
for k in all_keys
}
self._committed_resources = Resources(
self._committed_resources.cpu - resources.cpu_total(),
self._committed_resources.gpu - resources.gpu_total())
assert self._committed_resources.cpu >= 0
assert self._committed_resources.gpu >= 0
committed.cpu - resources.cpu_total(),
committed.gpu - resources.gpu_total(),
custom_resources=custom_resources)

assert self._committed_resources.is_nonnegative(), (
"Resource invalid: {}".format(resources))

def _update_avail_resources(self, num_retries=5):
for i in range(num_retries):
Expand All @@ -247,28 +270,37 @@ def _update_avail_resources(self, num_retries=5):
logger.warning("Cluster resources not detected. Retrying...")
time.sleep(0.5)

num_cpus = resources["CPU"]
num_gpus = resources["GPU"]
resources = resources.copy()
num_cpus = resources.pop("CPU")
num_gpus = resources.pop("GPU")
custom_resources = resources

self._avail_resources = Resources(int(num_cpus), int(num_gpus))
self._avail_resources = Resources(
int(num_cpus), int(num_gpus), custom_resources=custom_resources)
self._resources_initialized = True

def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources."""
self._update_avail_resources()
cpu_avail = self._avail_resources.cpu - self._committed_resources.cpu
gpu_avail = self._avail_resources.gpu - self._committed_resources.gpu
currently_available = Resources.subtract(self._avail_resources,
self._committed_resources)

have_space = (resources.cpu_total() <= cpu_avail
and resources.gpu_total() <= gpu_avail)
have_space = (
resources.cpu_total() <= currently_available.cpu
and resources.gpu_total() <= currently_available.gpu and all(
resources.get_res_total(res) <= currently_available.get(res)
for res in resources.custom_resources))

if have_space:
return True

can_overcommit = self._queue_trials

if (resources.cpu_total() > 0 and cpu_avail <= 0) or \
(resources.gpu_total() > 0 and gpu_avail <= 0):
if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \
(resources.gpu_total() > 0 and currently_available.gpu <= 0) or \
any((resources.get_res_total(res_name) > 0
and currently_available.get(res_name) <= 0)
for res_name in resources.custom_resources):
can_overcommit = False # requested resource is already saturated

if can_overcommit:
Expand All @@ -287,18 +319,34 @@ def debug_string(self):
"""Returns a human readable message for printing to the console."""

if self._resources_initialized:
return "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
customs = ", ".join([
"{}/{} {}".format(
self._committed_resources.get_res_total(name),
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources
])
if customs:
status += " ({})".format(customs)
return status
else:
return "Resources requested: ?"

def resource_string(self):
"""Returns a string describing the total resources available."""

if self._resources_initialized:
return "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
if self._avail_resources.custom_resources:
custom = ", ".join(
"{} {}".format(
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources)
res_str += " ({})".format(custom)
return res_str
else:
return "? CPUs, ? GPUs"

Expand Down
138 changes: 137 additions & 1 deletion python/ray/tune/test/trial_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from ray.tune.logger import Logger
from ray.tune.util import pin_in_object_store, get_pinned_object
from ray.tune.experiment import Experiment
from ray.tune.trial import Trial, Resources, ExportFormat
from ray.tune.trial import (Trial, ExportFormat, Resources, resources_to_json,
json_to_resources)
from ray.tune.trial_runner import TrialRunner
from ray.tune.suggest import grid_search, BasicVariantGenerator
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
Expand Down Expand Up @@ -736,6 +737,28 @@ def _train(self):
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)

def testCustomResources(self):
ray.shutdown()
ray.init(resources={"hi": 3})

class train(Trainable):
def _train(self):
return {"timesteps_this_iter": 1, "done": True}

trials = run_experiments({
"foo": {
"run": train,
"resources_per_trial": {
"cpu": 1,
"custom_resources": {
"hi": 2
}
}
}
})
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)

def testCustomLogger(self):
class CustomLogger(Logger):
def on_result(self, result):
Expand Down Expand Up @@ -1083,6 +1106,62 @@ def testExtraResources(self):
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)

def testCustomResources(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=0, custom_resources={"a": 2}),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)

runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)

def testExtraCustomResources(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(
cpu=1, gpu=0, extra_custom_resources={"a": 2}),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)

runner.step()
self.assertTrue(sum(t.status == Trial.RUNNING for t in trials) < 2)
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)

def testCustomResources2(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource1))
resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource2))
resource3 = Resources(cpu=1, gpu=0, custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource3))
resource4 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource4))

def testFractionalGpus(self):
ray.init(num_cpus=4, num_gpus=1)
runner = TrialRunner(BasicVariantGenerator())
Expand Down Expand Up @@ -1292,6 +1371,7 @@ def testFailureRecoveryNodeRemoval(self):
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)

Expand Down Expand Up @@ -1878,5 +1958,61 @@ def _suggest(self, trial_id):
self.assertTrue("d=4" in trial.experiment_tag)


class ResourcesTest(unittest.TestCase):
def testSubtraction(self):
resource_1 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
resource_2 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
new_res = Resources.subtract(resource_1, resource_2)
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(all(k == 0 for k in new_res.custom_resources.values()))
self.assertTrue(
all(k == 0 for k in new_res.extra_custom_resources.values()))

def testDifferentResources(self):
resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2})
new_res = Resources.subtract(resource_1, resource_2)
assert "c" in new_res.custom_resources
assert "b" in new_res.custom_resources
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(new_res.get("a") == 0)

def testSerialization(self):
original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
jsoned = resources_to_json(original)
new_resource = json_to_resources(jsoned)
self.assertEquals(original, new_resource)


if __name__ == "__main__":
unittest.main(verbosity=2)
Loading

0 comments on commit 5db1afe

Please sign in to comment.