Skip to content

Commit

Permalink
[Tune] Remove queue_trials. (#19472)
Browse files Browse the repository at this point in the history
  • Loading branch information
xwjiang2010 authored Oct 22, 2021
1 parent 580b58a commit a632cb4
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 213 deletions.
25 changes: 4 additions & 21 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,11 @@ class RayTrialExecutor(TrialExecutor):
"""An implementation of TrialExecutor based on Ray."""

def __init__(self,
queue_trials: bool = False,
reuse_actors: bool = False,
result_buffer_length: Optional[int] = None,
refresh_period: Optional[float] = None,
wait_for_placement_group: Optional[float] = None):
super(RayTrialExecutor, self).__init__(queue_trials)
# Check for if we are launching a trial without resources in kick off
# autoscaler.
self._trial_queued = False
super(RayTrialExecutor, self).__init__()
self._running = {}
# Since trial resume after paused should not run
# trial.train.remote(), thus no more new remote object ref generated.
Expand Down Expand Up @@ -881,9 +877,9 @@ def _update_avail_resources(self, num_retries=5):
def has_resources_for_trial(self, trial: Trial) -> bool:
"""Returns whether this runner has resources available for this trial.
If using placement groups, this will return True as long as we
didn't reach the maximum number of pending trials. It will also return
True if the trial placement group is already staged.
This will return True as long as we didn't reach the maximum number
of pending trials. It will also return True if the trial placement
group is already staged.
Args:
trial: Trial object which should be scheduled.
Expand Down Expand Up @@ -924,19 +920,6 @@ def has_resources(self, resources: Resources) -> bool:
if have_space:
# The assumption right now is that we block all trials if one
# trial is queued.
self._trial_queued = False
return True

can_overcommit = self._queue_trials and not self._trial_queued
if can_overcommit:
self._trial_queued = True
logger.warning(
"Allowing trial to start even though the "
"cluster does not have enough free resources. Trial actors "
"may appear to hang until enough resources are added to the "
"cluster (e.g., via autoscaling). You can disable this "
"behavior by specifying `queue_trials=False` in "
"ray.tune.run().")
return True

return False
Expand Down
35 changes: 14 additions & 21 deletions python/ray/tune/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,31 +265,24 @@ def step(self):

register_trainable("B", B)

def f(cpus, gpus, queue_trials):
return run_experiments(
{
"foo": {
"run": "B",
"config": {
"cpu": cpus,
"gpu": gpus,
},
}
},
queue_trials=queue_trials)[0]
def f(cpus, gpus):
return run_experiments({
"foo": {
"run": "B",
"config": {
"cpu": cpus,
"gpu": gpus,
},
}
})[0]

# Should all succeed
self.assertEqual(f(0, 0, False).status, Trial.TERMINATED)
self.assertEqual(f(1, 0, True).status, Trial.TERMINATED)
self.assertEqual(f(1, 0, True).status, Trial.TERMINATED)
self.assertEqual(f(0, 0).status, Trial.TERMINATED)

# Too large resource request
self.assertRaises(TuneError, lambda: f(100, 100, False))
self.assertRaises(TuneError, lambda: f(0, 100, False))
self.assertRaises(TuneError, lambda: f(100, 0, False))

# TODO(ekl) how can we test this is queued (hangs)?
# f(100, 0, True)
self.assertRaises(TuneError, lambda: f(100, 100))
self.assertRaises(TuneError, lambda: f(0, 100))
self.assertRaises(TuneError, lambda: f(100, 0))

def testRewriteEnv(self):
def train(config, reporter):
Expand Down
56 changes: 0 additions & 56 deletions python/ray/tune/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from ray.tune import register_trainable
from ray.tune.experiment import Experiment
from ray.tune.error import TuneError
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.resources import Resources
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.syncer import CloudSyncer, SyncerCallback, get_node_syncer
from ray.tune.utils.trainable import TrainableUtil
Expand Down Expand Up @@ -218,60 +216,6 @@ def test_remove_node_before_result(start_connected_emptyhead_cluster):
runner.step()


def test_queue_trials(start_connected_emptyhead_cluster):
"""Tests explicit oversubscription for autoscaling.
Tune oversubscribes a trial when `queue_trials=True`, but
does not block other trials from running.
"""
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"

cluster = start_connected_emptyhead_cluster
runner = TrialRunner()

def create_trial(cpu, gpu=0):
kwargs = {
"resources": Resources(cpu=cpu, gpu=gpu),
"stopping_criterion": {
"training_iteration": 3
}
}
return Trial("__fake", **kwargs)

runner.add_trial(create_trial(cpu=1))
with pytest.raises(TuneError):
runner.step() # run 1

del runner

executor = RayTrialExecutor(queue_trials=True)
runner = TrialRunner(trial_executor=executor)
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()

cpu_only = create_trial(cpu=1)
runner.add_trial(cpu_only)
runner.step() # add cpu_only trial

gpu_trial = create_trial(cpu=1, gpu=1)
runner.add_trial(gpu_trial)
runner.step() # queue gpu_trial

# This tests that the cpu_only trial should bypass the queued trial.
for i in range(3):
runner.step()
assert cpu_only.status == Trial.TERMINATED
assert gpu_trial.status == Trial.RUNNING

# Scale up
cluster.add_node(num_cpus=1, num_gpus=1)
cluster.wait_for_nodes()

for i in range(3):
runner.step()
assert gpu_trial.status == Trial.TERMINATED


@pytest.mark.parametrize("trainable_id", ["__fake", "__fake_durable"])
def test_trial_migration(start_connected_emptyhead_cluster, trainable_id):
"""Removing a node while cluster has space should migrate trial.
Expand Down
76 changes: 4 additions & 72 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def setUp(self):
os.environ["TUNE_TRIAL_STARTUP_GRACE_PERIOD"] = "0"
os.environ["TUNE_TRIAL_RESULT_WAIT_TIME_S"] = "99999"

self.trial_executor = RayTrialExecutor(queue_trials=False)
self.trial_executor = RayTrialExecutor()
ray.init(num_cpus=2, ignore_reinit_error=True)
_register_all() # Needed for flaky tests

Expand Down Expand Up @@ -190,7 +190,7 @@ def _testPauseUnpause(self, result_buffer_length):
os.environ["TUNE_RESULT_BUFFER_MIN_TIME_S"] = "1"

# Need a new trial executor so the ENV vars are parsed again
self.trial_executor = RayTrialExecutor(queue_trials=False)
self.trial_executor = RayTrialExecutor()

base = max(result_buffer_length, 1)

Expand Down Expand Up @@ -298,7 +298,7 @@ def cleanup(self):
}, "grid_search")
trial = trials[0]
os.environ["TUNE_FORCE_TRIAL_CLEANUP_S"] = "1"
self.trial_executor = RayTrialExecutor(queue_trials=False)
self.trial_executor = RayTrialExecutor()
os.environ["TUNE_FORCE_TRIAL_CLEANUP_S"] = "0"
self.trial_executor.start_trial(trial)
self.assertEqual(Trial.RUNNING, trial.status)
Expand Down Expand Up @@ -336,74 +336,6 @@ def process_trial_save(self, trial):
trial.on_checkpoint(checkpoint)


class RayExecutorQueueTest(unittest.TestCase):
def setUp(self):
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 1,
"_system_config": {
"num_heartbeats_timeout": 10
}
})
self.trial_executor = RayTrialExecutor(
queue_trials=True, refresh_period=0)
# Pytest doesn't play nicely with imports
_register_all()

def tearDown(self):
ray.shutdown()
self.cluster.shutdown()
_register_all() # re-register the evicted objects

def testQueueTrial(self):
"""Tests that reset handles NotImplemented properly."""

def create_trial(cpu, gpu=0):
return Trial("__fake", resources=Resources(cpu=cpu, gpu=gpu))

cpu_only = create_trial(1, 0)
self.assertTrue(self.trial_executor.has_resources_for_trial(cpu_only))
self.trial_executor.start_trial(cpu_only)

gpu_only = create_trial(0, 1)
self.assertTrue(self.trial_executor.has_resources_for_trial(gpu_only))

def testHeadBlocking(self):
# Once resource requests are deprecated, remove this test
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"

def create_trial(cpu, gpu=0):
return Trial("__fake", resources=Resources(cpu=cpu, gpu=gpu))

gpu_trial = create_trial(1, 1)
self.assertTrue(self.trial_executor.has_resources_for_trial(gpu_trial))
self.trial_executor.start_trial(gpu_trial)

# TODO(rliaw): This behavior is probably undesirable, but right now
# trials with different resource requirements is not often used.
cpu_only_trial = create_trial(1, 0)
self.assertFalse(
self.trial_executor.has_resources_for_trial(cpu_only_trial))

self.cluster.add_node(num_cpus=1, num_gpus=1)
self.cluster.wait_for_nodes()

self.assertTrue(
self.trial_executor.has_resources_for_trial(cpu_only_trial))
self.trial_executor.start_trial(cpu_only_trial)

cpu_only_trial2 = create_trial(1, 0)
self.assertTrue(
self.trial_executor.has_resources_for_trial(cpu_only_trial2))
self.trial_executor.start_trial(cpu_only_trial2)

cpu_only_trial3 = create_trial(1, 0)
self.assertFalse(
self.trial_executor.has_resources_for_trial(cpu_only_trial3))


class RayExecutorPlacementGroupTest(unittest.TestCase):
def setUp(self):
self.head_cpus = 8
Expand Down Expand Up @@ -537,7 +469,7 @@ def testPlacementGroupFactoryEquality(self):
class LocalModeExecutorTest(RayTrialExecutorTest):
def setUp(self):
ray.init(local_mode=True)
self.trial_executor = RayTrialExecutor(queue_trials=False)
self.trial_executor = RayTrialExecutor()

def tearDown(self):
ray.shutdown()
Expand Down
3 changes: 1 addition & 2 deletions python/ray/tune/tests/test_trial_scheduler_pbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ def save(self, *args, **kwargs):
checkpoint_freq=1,
fail_fast=True,
config={"a": tune.sample_from(lambda _: param_a())},
trial_executor=CustomExecutor(
queue_trials=False, reuse_actors=False),
trial_executor=CustomExecutor(reuse_actors=False),
)


Expand Down
21 changes: 3 additions & 18 deletions python/ray/tune/trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,9 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta):
and starting/stopping trials.
"""

def __init__(self, queue_trials: bool = False):
def __init__(self):
"""Initializes a new TrialExecutor.
Args:
queue_trials (bool): Whether to queue trials when the cluster does
not currently have enough resources to launch one. This should
be set to True when running on an autoscaling cluster to enable
automatic scale-up.
"""
self._queue_trials = queue_trials
self._cached_trial_state = {}
self._trials_to_cache = set()
# The next two variables are used to keep track of if there is any
Expand Down Expand Up @@ -338,29 +331,21 @@ def on_no_available_trials(self, trials: List[Trial]) -> None:
trials (List[Trial]): The list of trials. Note, refrain from
providing TrialRunner directly here.
"""
if self._queue_trials:
return
self._may_warn_insufficient_resources(trials)
for trial in trials:
if trial.uses_placement_groups:
return
# TODO(xwjiang): The rest should be gone in a follow up PR
# to remove non-pg case.
if trial.status == Trial.PENDING:
if not self.has_resources_for_trial(trial):
resource_string = trial.resources.summary_string()
trial_resource_help_msg = trial.get_trainable_cls(
).resource_help(trial.config)
autoscaling_msg = ""
if is_ray_cluster():
autoscaling_msg = (
"Pass `queue_trials=True` in ray.tune.run() or "
"on the command line to queue trials until the "
"cluster scales up or resources become available. "
)
raise TuneError(
"Insufficient cluster resources to launch trial: "
f"trial requested {resource_string}, but the cluster "
f"has only {self.resource_string()}. "
f"{autoscaling_msg}"
f"{trial_resource_help_msg} ")
elif trial.status == Trial.PAUSED:
raise TuneError("There are paused trials, but no more pending "
Expand Down
Loading

0 comments on commit a632cb4

Please sign in to comment.