Skip to content

Commit

Permalink
[air/output] Improve leaked mentions of Tune concepts (ray-project#35003
Browse files Browse the repository at this point in the history
)

Ray Tune is the execution backend for Ray Train. This means that sometimes error/warning messages use Tune concepts, that don't make sense in a single-trial run, such as with Ray Train trainers.

This PR improves three such occurrences:

1. The insufficient resources warnings message has been adjusted in the case where only one trial is run
2. Calculation of `max_pending_trials` now uses `search_alg.total_samples` as the minimum, which was an oversight before.
3. On interrupt of a training run, a `Tuner.restore()` message was suggested, but it should be `Trainer.restore()`

Signed-off-by: Kai Fricke <[email protected]>
  • Loading branch information
krfricke authored and architkulkarni committed May 16, 2023
1 parent 4ce6be8 commit 9edf062
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 69 deletions.
5 changes: 4 additions & 1 deletion python/ray/train/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,10 @@ def fit(self) -> Result:
)
else:
tuner = Tuner(
trainable=trainable, param_space=param_space, run_config=self.run_config
trainable=trainable,
param_space=param_space,
run_config=self.run_config,
_trainer_api=True,
)

experiment_path = Path(
Expand Down
130 changes: 80 additions & 50 deletions python/ray/tune/execution/insufficient_resources_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import ray
import time
from typing import Dict
from typing import Dict, Optional, Tuple

from ray.tune.execution.cluster_info import _is_ray_cluster
from ray.tune.experiment import Trial
Expand All @@ -18,10 +18,10 @@ def _get_cluster_resources_no_autoscaler() -> Dict:
return ray.cluster_resources()


def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
def _get_trial_cpu_and_gpu(trial: Trial) -> Tuple[int, int]:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}
return cpu, gpu


def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
Expand All @@ -30,11 +30,11 @@ def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
For no autoscaler case.
"""
assert trial.status == Trial.PENDING
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)

return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get(
return asked_cpus <= _get_cluster_resources_no_autoscaler().get(
"CPU", 0
) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get("GPU", 0)
) and asked_gpus <= _get_cluster_resources_no_autoscaler().get("GPU", 0)


@lru_cache()
Expand All @@ -52,38 +52,68 @@ def _get_insufficient_resources_warning_threshold() -> float:
return float(os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "60"))


MSG_TRAIN_START = (
"Training has not started in the last {wait_time:.0f} seconds. "
"This could be due to the cluster not having enough resources available. "
)
MSG_TRAIN_INSUFFICIENT = (
"You asked for {asked_cpus} CPUs and {asked_gpus} GPUs, but the cluster only "
"has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
)
MSG_TRAIN_END = (
"Stop the training and adjust the required resources (e.g. via the "
"`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
"or add more resources to your cluster."
)

MSG_TUNE_START = (
"No trial is running and no new trial has been started within "
"the last {wait_time:.0f} seconds. "
"This could be due to the cluster not having enough resources available. "
)
MSG_TUNE_INSUFFICIENT = (
"You asked for {asked_cpus} CPUs and {asked_gpus} GPUs per trial, "
"but the cluster only has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
)
MSG_TUNE_END = (
"Stop the tuning and adjust the required resources (e.g. via the "
"`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
"or add more resources to your cluster."
)


# TODO(xwjiang): Consider having a help page with more detailed instructions.
@lru_cache()
def _get_insufficient_resources_warning_msg() -> str:
msg = (
f"No trial is running and no new trial has been started within"
f" at least the last "
f"{_get_insufficient_resources_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
if _is_ray_cluster():
return "Ignore this message if the cluster is autoscaling. " + msg
def _get_insufficient_resources_warning_msg(
for_train: bool = False, trial: Optional[Trial] = None
) -> str:
msg = "Ignore this message if the cluster is autoscaling. "

if for_train:
start = MSG_TRAIN_START
insufficient = MSG_TRAIN_INSUFFICIENT
end = MSG_TRAIN_END
else:
return msg
start = MSG_TUNE_START
insufficient = MSG_TUNE_INSUFFICIENT
end = MSG_TUNE_END

msg += start.format(wait_time=_get_insufficient_resources_warning_threshold())

if trial:
asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)
cluster_resources = _get_cluster_resources_no_autoscaler()

msg += insufficient.format(
asked_cpus=asked_cpus,
asked_gpus=asked_gpus,
cluster_cpus=cluster_resources.get("CPU", 0),
cluster_gpus=cluster_resources.get("GPU", 0),
)

msg += end

# A beefed up version when Tune Error is raised.
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"Ignore this message if the cluster is autoscaling. "
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
return msg


class _InsufficientResourcesManager:
Expand All @@ -94,10 +124,11 @@ class _InsufficientResourcesManager:
act upon.
"""

def __init__(self):
def __init__(self, for_train: bool = False):
# The information tracked across the life time of Tune loop.
self._no_running_trials_since = -1
self._last_trial_num = -1
self._for_train = for_train

def on_no_available_trials(self, all_trials):
"""Tracks information across the life of Tune loop and makes guesses
Expand All @@ -115,22 +146,21 @@ def on_no_available_trials(self, all_trials):
time.monotonic() - self._no_running_trials_since
> _get_insufficient_resources_warning_threshold()
):
if not _is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
if (
trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)
):
# TODO(xwjiang):
# Raise an Error once #18608 is resolved.
logger.warning(_get_insufficient_resources_error_msg(trial))
break
else:
# TODO(xwjiang): #17799.
# Output a more helpful msg for autoscaler.
logger.warning(_get_insufficient_resources_warning_msg())
can_fulfill_any = any(
trial.status == Trial.PENDING and _can_fulfill_no_autoscaler(trial)
for trial in all_trials
)

if can_fulfill_any:
# If one trial can be fulfilled, it will be fulfilled eventually
self._no_running_trials_since = -1
return

# Otherwise, can fulfill none
msg = _get_insufficient_resources_warning_msg(
for_train=self._for_train, trial=all_trials[0]
)
logger.warning(msg)
self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1
Expand Down
20 changes: 18 additions & 2 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,15 @@ def __init__(
callbacks: Optional[List[Callback]] = None,
metric: Optional[str] = None,
trial_checkpoint_config: Optional[CheckpointConfig] = None,
_trainer_api: bool = False,
):
self._search_alg = search_alg or BasicVariantGenerator()
self._placeholder_resolvers = placeholder_resolvers
self._scheduler_alg = scheduler or FIFOScheduler()
self._callbacks = CallbackList(callbacks or [])
self._insufficient_resources_manager = _InsufficientResourcesManager()
self._insufficient_resources_manager = _InsufficientResourcesManager(
for_train=_trainer_api
)
self._pending_trial_queue_times = {}

self._max_pending_trials = _get_max_pending_trials(self._search_alg)
Expand Down Expand Up @@ -519,6 +522,11 @@ def resume(
trial_to_add.status = Trial.TERMINATED
self.add_trial(trial_to_add)

def update_max_pending_trials(self, max_pending_trials: Optional[int] = None):
self._max_pending_trials = max_pending_trials or _get_max_pending_trials(
self._search_alg
)

def update_pending_trial_resources(
self, resources: Union[dict, PlacementGroupFactory]
):
Expand Down Expand Up @@ -1252,6 +1260,7 @@ def __init__(
callbacks: Optional[List[Callback]] = None,
metric: Optional[str] = None,
trial_checkpoint_config: Optional[CheckpointConfig] = None,
_trainer_api: bool = False,
# Deprecated
local_checkpoint_dir: Optional[str] = None,
):
Expand Down Expand Up @@ -1287,6 +1296,7 @@ def __init__(
callbacks=callbacks,
metric=metric,
trial_checkpoint_config=trial_checkpoint_config,
_trainer_api=_trainer_api,
)

self.trial_executor.setup(
Expand All @@ -1308,6 +1318,10 @@ def _wrapped(self):
executor_whitelist_attr={"has_resources_for_trial", "pause_trial", "save"},
)

def update_max_pending_trials(self, max_pending_trials: Optional[int] = None):
super().update_max_pending_trials(max_pending_trials=max_pending_trials)
self.trial_executor._max_staged_actors = self._max_pending_trials

def _used_resources_string(self) -> str:
return self.trial_executor.debug_string()

Expand Down Expand Up @@ -1604,7 +1618,9 @@ def _get_max_pending_trials(search_alg: SearchAlgorithm) -> int:
# Use a minimum of 16 to trigger fast autoscaling
# Scale up to at most the number of available cluster CPUs
cluster_cpus = ray.cluster_resources().get("CPU", 1.0)
max_pending_trials = max(16, int(cluster_cpus * 1.1))
max_pending_trials = min(
max(search_alg.total_samples, 16), max(16, int(cluster_cpus * 1.1))
)

if max_pending_trials > 128:
logger.warning(
Expand Down
2 changes: 2 additions & 0 deletions python/ray/tune/execution/tune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
chdir_to_trial_dir: bool = False,
reuse_actors: bool = False,
resource_manager_factory: Optional[Callable[[], ResourceManager]] = None,
_trainer_api: bool = False,
):
if resource_manager_factory:
self._resource_manager = resource_manager_factory()
Expand Down Expand Up @@ -144,6 +145,7 @@ def __init__(
callbacks=callbacks,
metric=metric,
trial_checkpoint_config=trial_checkpoint_config,
_trainer_api=_trainer_api,
)

def _wrapped(self):
Expand Down
3 changes: 3 additions & 0 deletions python/ray/tune/impl/tuner_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(
tune_config: Optional[TuneConfig] = None,
run_config: Optional[RunConfig] = None,
_tuner_kwargs: Optional[Dict] = None,
_trainer_api: bool = False,
):
from ray.train.trainer import BaseTrainer

Expand All @@ -102,6 +103,7 @@ def __init__(

self._tune_config = tune_config or TuneConfig()
self._run_config = run_config or RunConfig()
self._trainer_api = _trainer_api

# Restore from Tuner checkpoint.
if restore_path:
Expand Down Expand Up @@ -681,6 +683,7 @@ def _get_tune_run_arguments(self, trainable: TrainableType) -> Dict[str, Any]:
trial_dirname_creator=self._tune_config.trial_dirname_creator,
chdir_to_trial_dir=self._tune_config.chdir_to_trial_dir,
_tuner_api=True,
_trainer_api=self._trainer_api,
)

def _fit_internal(
Expand Down
15 changes: 8 additions & 7 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ def train(config):
)
msg = (
"Ignore this message if the cluster is autoscaling. "
"You asked for 5.0 cpu and 3.0 gpu per trial, "
"but the cluster only has 4.0 cpu and 2.0 gpu. "
"Stop the tuning job and "
"adjust the resources requested per trial "
"(possibly via `resources_per_trial` "
"or via `num_workers` for rllib) "
"and/or add more resources to your Ray runtime."
"No trial is running and no new trial has been started "
"within the last 0 seconds. This could be due to the cluster not having "
"enough resources available. You asked for 5.0 CPUs and 3.0 GPUs per "
"trial, but the cluster only has 4.0 CPUs and 2.0 GPUs available. "
"Stop the tuning and adjust the required resources "
"(e.g. via the `ScalingConfig` or `resources_per_trial`, "
"or `num_workers` for rllib), "
"or add more resources to your cluster."
)
mocked_warn.assert_called_once_with(msg)

Expand Down
Loading

0 comments on commit 9edf062

Please sign in to comment.