Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Fix trial cleanup after x seconds, set default to 600 #28449

Merged
merged 8 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/tune/api_docs/env.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ These are the environment variables Ray Tune currently considers:
* **TUNE_FORCE_TRIAL_CLEANUP_S**: By default, Ray Tune will gracefully terminate trials,
letting them finish the current training step and any user-defined cleanup.
Setting this variable to a non-zero, positive integer will cause trials to be forcefully
terminated after a grace period of that many seconds. Defaults to ``0``.
terminated after a grace period of that many seconds. Defaults to ``600`` (seconds).
* **TUNE_GET_EXECUTOR_EVENT_WAIT_S**: The time that TrialRunner waits for the
next ExecutorEvent in a blocking fashion. Defaults to ``5``.
* **TUNE_FUNCTION_THREAD_TIMEOUT_S**: Time in seconds the function API waits
Expand Down
29 changes: 18 additions & 11 deletions python/ray/tune/execution/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ def unwrap(self):
return self._result


def _post_stop_cleanup(future, pg):
def _post_stop_cleanup(future, actor, pg):
"""Things to be done after a trial is stopped."""
assert isinstance(actor, ray.actor.ActorHandle)
assert isinstance(pg, PlacementGroup)
try:
# This should not be blocking as
# we are only here when triggered.
ray.kill(actor)
krfricke marked this conversation as resolved.
Show resolved Hide resolved
ray.get(future, timeout=0)
except GetTimeoutError:
if log_once("tune_trial_cleanup_timeout"):
Expand Down Expand Up @@ -129,7 +131,8 @@ def get_next(self):
len(self._future_to_insert_time) > 0
and self._future_to_insert_time[0][1] + self._force_cleanup < time.time()
):
return self._future_to_insert_time.popleft()
future, _time = self._future_to_insert_time.popleft()
return future
else:
return None

Expand Down Expand Up @@ -204,7 +207,7 @@ def __init__(
# future --> (type, trial/pg)
self._futures = {}

force_trial_cleanup = int(os.environ.get("TUNE_FORCE_TRIAL_CLEANUP_S", "0"))
force_trial_cleanup = int(os.environ.get("TUNE_FORCE_TRIAL_CLEANUP_S", "600"))
self._get_next_event_wait = int(
os.environ.get("TUNE_GET_EXECUTOR_EVENT_WAIT_S", "5")
)
Expand Down Expand Up @@ -534,7 +537,10 @@ def _stop_trial(
future = trial.runner.stop.remote()

pg = self._pg_manager.remove_from_in_use(trial)
self._futures[future] = (_ExecutorEventType.STOP_RESULT, pg)
self._futures[future] = (
_ExecutorEventType.STOP_RESULT,
(trial.runner, pg),
)
if self._trial_cleanup: # force trial cleanup within a deadline
self._trial_cleanup.add(future)

Expand Down Expand Up @@ -592,12 +598,12 @@ def stop_trial(
exc: Optional[Union[TuneError, RayTaskError]] = None,
) -> None:
prior_status = trial.status
self._stop_trial(trial, error=error or exc, exc=exc)
if prior_status == Trial.RUNNING:
logger.debug("Trial %s: Returning resources.", trial)
out = self._find_future(trial)
for result_id in out:
self._futures.pop(result_id)
self._stop_trial(trial, error=error or exc, exc=exc)

def continue_training(self, trial: Trial) -> None:
"""Continues the training of this trial."""
Expand Down Expand Up @@ -713,9 +719,9 @@ def _do_force_trial_cleanup(self) -> None:
next_future_to_clean = self._trial_cleanup.get_next()
if not next_future_to_clean:
break
if next_future_to_clean in self._futures.keys():
_, pg = self._futures.pop(next_future_to_clean)
_post_stop_cleanup(next_future_to_clean, pg)
if next_future_to_clean in self._futures:
_, (actor, pg) = self._futures.pop(next_future_to_clean)
_post_stop_cleanup(next_future_to_clean, actor, pg)
else:
# This just means that before the deadline reaches,
# the future is already cleaned up.
Expand Down Expand Up @@ -845,7 +851,8 @@ def cleanup(self, trials: List[Trial]) -> None:
continue
event_type, trial_or_pg = self._futures.pop(ready[0])
if event_type == _ExecutorEventType.STOP_RESULT:
_post_stop_cleanup(ready[0], trial_or_pg)
actor, pg = trial_or_pg
_post_stop_cleanup(ready[0], actor, pg)

self._pg_manager.reconcile_placement_groups(trials)
self._pg_manager.cleanup(force=True)
Expand Down Expand Up @@ -987,8 +994,8 @@ def get_next_executor_event(
###################################################################
result_type, trial_or_pg = self._futures.pop(ready_future)
if result_type == _ExecutorEventType.STOP_RESULT:
pg = trial_or_pg
_post_stop_cleanup(ready_future, pg)
actor, pg = trial_or_pg
_post_stop_cleanup(ready_future, actor, pg)
else:
trial = trial_or_pg
assert isinstance(trial, Trial)
Expand Down
61 changes: 61 additions & 0 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_ExecutorEvent,
_ExecutorEventType,
RayTrialExecutor,
_TrialCleanup,
)
from ray.tune.registry import _global_registry, TRAINABLE_CLASS, register_trainable
from ray.tune.result import PID, TRAINING_ITERATION, TRIAL_ID
Expand All @@ -29,6 +30,23 @@
from unittest.mock import patch


class _HangingTrainable(tune.Trainable):
def setup(self, config):
pass

def save_checkpoint(self, checkpoint_dir: str):
return None

def load_checkpoint(self, checkpoint):
pass

def step(self):
return {"metric": 4}

def stop(self):
time.sleep(20)


class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "0"
Expand Down Expand Up @@ -211,6 +229,49 @@ def testStartFailure(self):
self.trial_executor.start_trial(trial)
self.assertEqual(Trial.ERROR, trial.status)

def testTrialHangingCleanup(self):
register_trainable("hanging", _HangingTrainable)
trial = Trial("hanging")

self.trial_executor._trial_cleanup = _TrialCleanup(1)
self.trial_executor._get_next_event_wait = 30

# Schedule trial PG
ev = self.trial_executor.get_next_executor_event(
[trial], next_trial_exists=True
)
assert ev.type == _ExecutorEventType.PG_READY

# Start trial
self.trial_executor.start_trial(trial)

# Kick off future
self.trial_executor.continue_training(trial)

# Wait for result
ev = self.trial_executor.get_next_executor_event(
[trial], next_trial_exists=True
)
assert ev.type == _ExecutorEventType.TRAINING_RESULT

# Kick of new training future to have something in-flight
self.trial_executor.get_next_executor_event([trial], next_trial_exists=True)

# Stop trial (time it)
start_time = time.time()
self.trial_executor.stop_trial(trial)
self.trial_executor.on_step_end([trial])
time.sleep(2)
self.trial_executor.on_step_end([trial])
ev = self.trial_executor.get_next_executor_event(
[trial], next_trial_exists=True
)
assert ev.type == _ExecutorEventType.NO_RUNNING_TRIAL_TIMEOUT
end_time = time.time() - start_time

# Should not wait until end of hang
assert 0 < end_time < 10

def testPauseResume2(self):
"""Tests that pausing works for trials being processed."""
trial = Trial("__fake")
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/trainable/trainable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ def cleanup(self):
If any Ray actors are launched in the Trainable (i.e., with a RLlib
trainer), be sure to kill the Ray actor process here.

You can kill a Ray actor by calling `actor.__ray_terminate__.remote()`
You can kill a Ray actor by calling `ray.kill(actor)`
krfricke marked this conversation as resolved.
Show resolved Hide resolved
on the actor.

.. versionadded:: 0.8.7
Expand Down