Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air/output] Improve leaked mentions of Tune concepts #35003

Merged
merged 12 commits into from
May 8, 2023
5 changes: 4 additions & 1 deletion python/ray/train/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,10 @@ def fit(self) -> Result:
)
else:
tuner = Tuner(
trainable=trainable, param_space=param_space, run_config=self.run_config
trainable=trainable,
param_space=param_space,
run_config=self.run_config,
_trainer_api=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justinvyu can we have a chat about adding parameters like _trainer_api in Tuner :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we generally need a good idea how to pass this information.

To me it feels like there should be some time of context. We have different requirements for different ML jobs. Even rllib vs Train has different requirements (e.g. default metrics to show), and maybe even rllib's single algorithms.

We don't have that story, yet, so in order to unblock this work, I think we can go ahead with the private flags. But yes, we should resolve this (also for telemetry).

)

experiment_path = Path(
Expand Down
130 changes: 80 additions & 50 deletions python/ray/tune/execution/insufficient_resources_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import ray
import time
from typing import Dict
from typing import Dict, Optional, Tuple

from ray.tune.execution.cluster_info import _is_ray_cluster
from ray.tune.experiment import Trial
Expand All @@ -18,10 +18,10 @@ def _get_cluster_resources_no_autoscaler() -> Dict:
return ray.cluster_resources()


def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
def _get_trial_cpu_and_gpu(trial: Trial) -> Tuple[int, int]:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}
return cpu, gpu


def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
Expand All @@ -30,11 +30,11 @@ def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
For no autoscaler case.
"""
assert trial.status == Trial.PENDING
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)

return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get(
return asked_cpus <= _get_cluster_resources_no_autoscaler().get(
"CPU", 0
) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get("GPU", 0)
) and asked_gpus <= _get_cluster_resources_no_autoscaler().get("GPU", 0)


@lru_cache()
Expand All @@ -52,38 +52,68 @@ def _get_insufficient_resources_warning_threshold() -> float:
return float(os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "60"))


MSG_TRAIN_START = (
"Training has not started in the last {wait_time:.0f} seconds. "
"This could be due to the cluster not having enough resources available. "
)
MSG_TRAIN_INSUFFICIENT = (
"You asked for {asked_cpus} CPUs and {asked_gpus} GPUs, but the cluster only "
"has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
)
MSG_TRAIN_END = (
"Stop the training and adjust the required resources (e.g. via the "
"`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
"or add more resources to your cluster."
)

MSG_TUNE_START = (
"No trial is running and no new trial has been started within "
"the last {wait_time:.0f} seconds. "
"This could be due to the cluster not having enough resources available. "
)
MSG_TUNE_INSUFFICIENT = (
"You asked for {asked_cpus} CPUs and {asked_gpus} GPUs per trial, "
"but the cluster only has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
)
MSG_TUNE_END = (
"Stop the tuning and adjust the required resources (e.g. via the "
"`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
"or add more resources to your cluster."
)


# TODO(xwjiang): Consider having a help page with more detailed instructions.
@lru_cache()
def _get_insufficient_resources_warning_msg() -> str:
msg = (
f"No trial is running and no new trial has been started within"
f" at least the last "
f"{_get_insufficient_resources_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
if _is_ray_cluster():
return "Ignore this message if the cluster is autoscaling. " + msg
def _get_insufficient_resources_warning_msg(
for_train: bool = False, trial: Optional[Trial] = None
) -> str:
msg = "Ignore this message if the cluster is autoscaling. "

if for_train:
start = MSG_TRAIN_START
insufficient = MSG_TRAIN_INSUFFICIENT
end = MSG_TRAIN_END
else:
return msg
start = MSG_TUNE_START
insufficient = MSG_TUNE_INSUFFICIENT
end = MSG_TUNE_END

msg += start.format(wait_time=_get_insufficient_resources_warning_threshold())

if trial:
asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)
cluster_resources = _get_cluster_resources_no_autoscaler()

msg += insufficient.format(
asked_cpus=asked_cpus,
asked_gpus=asked_gpus,
cluster_cpus=cluster_resources.get("CPU", 0),
cluster_gpus=cluster_resources.get("GPU", 0),
)

msg += end

# A beefed up version when Tune Error is raised.
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"Ignore this message if the cluster is autoscaling. "
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
return msg


class _InsufficientResourcesManager:
Expand All @@ -94,10 +124,11 @@ class _InsufficientResourcesManager:
act upon.
"""

def __init__(self):
def __init__(self, for_train: bool = False):
# The information tracked across the life time of Tune loop.
self._no_running_trials_since = -1
self._last_trial_num = -1
self._for_train = for_train

def on_no_available_trials(self, all_trials):
"""Tracks information across the life of Tune loop and makes guesses
Expand All @@ -115,22 +146,21 @@ def on_no_available_trials(self, all_trials):
time.monotonic() - self._no_running_trials_since
> _get_insufficient_resources_warning_threshold()
):
if not _is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
if (
trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)
):
# TODO(xwjiang):
# Raise an Error once #18608 is resolved.
logger.warning(_get_insufficient_resources_error_msg(trial))
break
else:
# TODO(xwjiang): #17799.
# Output a more helpful msg for autoscaler.
logger.warning(_get_insufficient_resources_warning_msg())
can_fulfill_any = any(
trial.status == Trial.PENDING and _can_fulfill_no_autoscaler(trial)
for trial in all_trials
)

if can_fulfill_any:
# If one trial can be fulfilled, it will be fulfilled eventually
self._no_running_trials_since = -1
return

# Otherwise, can fulfill none
msg = _get_insufficient_resources_warning_msg(
for_train=self._for_train, trial=all_trials[0]
)
logger.warning(msg)
self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1
Expand Down
20 changes: 18 additions & 2 deletions python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,15 @@ def __init__(
callbacks: Optional[List[Callback]] = None,
metric: Optional[str] = None,
trial_checkpoint_config: Optional[CheckpointConfig] = None,
_trainer_api: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explanation of what this is?
can we compute whether there is only a single Trial ourselves?
I mean, feel like it would be nice to be able to avoid such a trainer parameter on Tuner init.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried this before, and long story short, it's not very straightforward due to the fact that we need some of the information pretty early, but number of trials is only calculated later. It also can lead to confusing situations - e.g. it's totally valid to use Tuner(trainable, tune_config=TuneConfig(num_samples=1)) for iteration and still expect to see a Tuner.restore() hint at the end. Tracking which object actually was the entrypoint saves us from those problems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I just feel like passing such a generic sounding parameter all the way through so many components like Tuner, BackendExecutor, etc just to be able to show the right output message is kind of too heavy.
we should probably have a way out of this if we want to live with it for now. @justinvyu
another idea is maybe passing this bit through a LoggingConfig or something? we probably need such a config class anyways?

Copy link
Contributor Author

@krfricke krfricke May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're doing the same at the moment with _tuner_api, so in that sense it's consistent :-D

This is not a logging configuration in my opinion. Users should not "configure" which output/error messages they want to see. It's more of a runtime context.

Ray core has a runtime context object, I think we just need something similar for Ray AIR.

):
self._search_alg = search_alg or BasicVariantGenerator()
self._placeholder_resolvers = placeholder_resolvers
self._scheduler_alg = scheduler or FIFOScheduler()
self._callbacks = CallbackList(callbacks or [])
self._insufficient_resources_manager = _InsufficientResourcesManager()
self._insufficient_resources_manager = _InsufficientResourcesManager(
for_train=_trainer_api
)
self._pending_trial_queue_times = {}

self._max_pending_trials = _get_max_pending_trials(self._search_alg)
Expand Down Expand Up @@ -519,6 +522,11 @@ def resume(
trial_to_add.status = Trial.TERMINATED
self.add_trial(trial_to_add)

def update_max_pending_trials(self, max_pending_trials: Optional[int] = None):
self._max_pending_trials = max_pending_trials or _get_max_pending_trials(
self._search_alg
)

def update_pending_trial_resources(
self, resources: Union[dict, PlacementGroupFactory]
):
Expand Down Expand Up @@ -1252,6 +1260,7 @@ def __init__(
callbacks: Optional[List[Callback]] = None,
metric: Optional[str] = None,
trial_checkpoint_config: Optional[CheckpointConfig] = None,
_trainer_api: bool = False,
# Deprecated
local_checkpoint_dir: Optional[str] = None,
):
Expand Down Expand Up @@ -1287,6 +1296,7 @@ def __init__(
callbacks=callbacks,
metric=metric,
trial_checkpoint_config=trial_checkpoint_config,
_trainer_api=_trainer_api,
)

self.trial_executor.setup(
Expand All @@ -1308,6 +1318,10 @@ def _wrapped(self):
executor_whitelist_attr={"has_resources_for_trial", "pause_trial", "save"},
)

def update_max_pending_trials(self, max_pending_trials: Optional[int] = None):
super().update_max_pending_trials(max_pending_trials=max_pending_trials)
self.trial_executor._max_staged_actors = self._max_pending_trials

def _used_resources_string(self) -> str:
return self.trial_executor.debug_string()

Expand Down Expand Up @@ -1604,7 +1618,9 @@ def _get_max_pending_trials(search_alg: SearchAlgorithm) -> int:
# Use a minimum of 16 to trigger fast autoscaling
# Scale up to at most the number of available cluster CPUs
cluster_cpus = ray.cluster_resources().get("CPU", 1.0)
max_pending_trials = max(16, int(cluster_cpus * 1.1))
max_pending_trials = min(
max(search_alg.total_samples, 16), max(16, int(cluster_cpus * 1.1))
)

if max_pending_trials > 128:
logger.warning(
Expand Down
2 changes: 2 additions & 0 deletions python/ray/tune/execution/tune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
chdir_to_trial_dir: bool = False,
reuse_actors: bool = False,
resource_manager_factory: Optional[Callable[[], ResourceManager]] = None,
_trainer_api: bool = False,
):
if resource_manager_factory:
self._resource_manager = resource_manager_factory()
Expand Down Expand Up @@ -144,6 +145,7 @@ def __init__(
callbacks=callbacks,
metric=metric,
trial_checkpoint_config=trial_checkpoint_config,
_trainer_api=_trainer_api,
)

def _wrapped(self):
Expand Down
3 changes: 3 additions & 0 deletions python/ray/tune/impl/tuner_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(
tune_config: Optional[TuneConfig] = None,
run_config: Optional[RunConfig] = None,
_tuner_kwargs: Optional[Dict] = None,
_trainer_api: bool = False,
):
from ray.train.trainer import BaseTrainer

Expand All @@ -102,6 +103,7 @@ def __init__(

self._tune_config = tune_config or TuneConfig()
self._run_config = run_config or RunConfig()
self._trainer_api = _trainer_api

# Restore from Tuner checkpoint.
if restore_path:
Expand Down Expand Up @@ -681,6 +683,7 @@ def _get_tune_run_arguments(self, trainable: TrainableType) -> Dict[str, Any]:
trial_dirname_creator=self._tune_config.trial_dirname_creator,
chdir_to_trial_dir=self._tune_config.chdir_to_trial_dir,
_tuner_api=True,
_trainer_api=self._trainer_api,
)

def _fit_internal(
Expand Down
15 changes: 8 additions & 7 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ def train(config):
)
msg = (
"Ignore this message if the cluster is autoscaling. "
"You asked for 5.0 cpu and 3.0 gpu per trial, "
"but the cluster only has 4.0 cpu and 2.0 gpu. "
"Stop the tuning job and "
"adjust the resources requested per trial "
"(possibly via `resources_per_trial` "
"or via `num_workers` for rllib) "
"and/or add more resources to your Ray runtime."
"No trial is running and no new trial has been started "
"within the last 0 seconds. This could be due to the cluster not having "
"enough resources available. You asked for 5.0 CPUs and 3.0 GPUs per "
"trial, but the cluster only has 4.0 CPUs and 2.0 GPUs available. "
"Stop the tuning and adjust the required resources "
"(e.g. via the `ScalingConfig` or `resources_per_trial`, "
"or `num_workers` for rllib), "
"or add more resources to your cluster."
)
mocked_warn.assert_called_once_with(msg)

Expand Down
Loading