From 22aaf47fda0ccafded6fc146a5e1a8d7d4722ef0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 3 Jun 2022 14:53:40 -0700 Subject: [PATCH] [tune] Better error message for Tune nested tasks / actors (#25241) This PR uses a task/actor launch hook to generate better error messages for nested Tune tasks/actors in the case there are no extra resources reserved for them. The idea is that the Tune trial runner actor can set a hook prior to executing the user code. If the user code launches a task, and the placement group for the trial cannot possibly fit the task, then we raise TuneError right off to warn the user. --- python/ray/actor.py | 8 ++ python/ray/data/read_api.py | 2 +- python/ray/remote_function.py | 7 ++ python/ray/tune/BUILD | 8 ++ python/ray/tune/session.py | 83 ++++++++++++++ python/ray/tune/tests/test_warnings.py | 143 +++++++++++++++++++++++++ python/ray/util/__init__.py | 2 + python/ray/util/placement_group.py | 44 ++++---- 8 files changed, 275 insertions(+), 22 deletions(-) create mode 100644 python/ray/tune/tests/test_warnings.py diff --git a/python/ray/actor.py b/python/ray/actor.py index a6f6c6da57ee..fd2bfacdaaa3 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -36,6 +36,9 @@ logger = logging.getLogger(__name__) +# Hook to call with (actor, resources, strategy) on each local actor creation. +_actor_launch_hook = None + @PublicAPI @client_mode_hook(auto_init=False) @@ -909,6 +912,11 @@ def _remote(self, args=None, kwargs=None, **actor_options): scheduling_strategy=scheduling_strategy, ) + if _actor_launch_hook: + _actor_launch_hook( + meta.actor_creation_function_descriptor, resources, scheduling_strategy + ) + actor_handle = ActorHandle( meta.language, actor_id, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 15d9b08d0a36..3be2a3b15aeb 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -248,7 +248,7 @@ def read_datasource( ) if len(read_tasks) < parallelism and ( - len(read_tasks) < ray.available_resources().get("CPU", parallelism) // 2 + len(read_tasks) < ray.available_resources().get("CPU", 1) // 2 ): logger.warning( "The number of blocks in this dataset ({}) limits its parallelism to {} " diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 2e4fc2a7d311..a3be68c55573 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -22,6 +22,10 @@ logger = logging.getLogger(__name__) +# Hook to call with (fn, resources, strategy) on each local task submission. +_task_launch_hook = None + + class RemoteFunction: """A remote function. @@ -287,6 +291,9 @@ def _remote(self, args=None, kwargs=None, **task_options): serialize=True, ) + if _task_launch_hook: + _task_launch_hook(self._function_descriptor, resources, scheduling_strategy) + def invocation(args, kwargs): if self._is_cross_language: list_args = cross_language.format_args(worker, args, kwargs) diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index ba1a1e37300f..b7f006741584 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -240,6 +240,14 @@ py_test( tags = ["team:ml", "exclusive", "tests_dir_R"], ) +py_test( + name = "test_warnings", + size = "medium", + srcs = ["tests/test_warnings.py"], + deps = [":tune_lib"], + tags = ["team:ml", "exclusive", "tests_dir_W"], +) + py_test( name = "test_sample", size = "large", diff --git a/python/ray/tune/session.py b/python/ray/tune/session.py index 84faef2363f4..e2d774d6ae91 100644 --- a/python/ray/tune/session.py +++ b/python/ray/tune/session.py @@ -3,9 +3,17 @@ import os import logging import traceback +from typing import Dict, Optional, Set +import ray from ray.util.debug import log_once from ray.util.annotations import PublicAPI, DeveloperAPI +from ray.util.placement_group import _valid_resource_shape +from ray.util.scheduling_strategies import ( + SchedulingStrategyT, + PlacementGroupSchedulingStrategy, +) +from ray.tune.error import TuneError logger = logging.getLogger(__name__) @@ -67,9 +75,84 @@ def init(reporter, ignore_reinit_error=True): "Most session commands will have no effect." ) + # Setup hooks for generating placement group resource deadlock warnings. + from ray import actor, remote_function + + if "TUNE_DISABLE_RESOURCE_CHECKS" not in os.environ: + actor._actor_launch_hook = tune_task_and_actor_launch_hook + remote_function._task_launch_hook = tune_task_and_actor_launch_hook + _session = reporter +# Cache of resource dicts that have been checked by the launch hook already. +_checked_resources: Set[frozenset] = set() + + +def tune_task_and_actor_launch_hook( + fn, resources: Dict[str, float], strategy: Optional[SchedulingStrategyT] +): + """Launch hook to catch nested tasks that can't fit in the placement group. + + This gives users a nice warning in case they launch a nested task in a Tune trial + without reserving resources in the trial placement group to fit it. + """ + + # Already checked, skip for performance reasons. + key = frozenset({(k, v) for k, v in resources.items() if v > 0}) + if not key or key in _checked_resources: + return + + # No need to check if placement group is None. + if ( + not isinstance(strategy, PlacementGroupSchedulingStrategy) + or strategy.placement_group is None + ): + return + + # Check if the resource request is targeting the current placement group. + cur_pg = ray.util.get_current_placement_group() + if not cur_pg or strategy.placement_group.id != cur_pg.id: + return + + _checked_resources.add(key) + + # Check if the request can be fulfilled by the current placement group. + pgf = get_trial_resources() + + if pgf.head_bundle_is_empty: + available_bundles = cur_pg.bundle_specs[0:] + else: + available_bundles = cur_pg.bundle_specs[1:] + + # Check if the request can be fulfilled by the current placement group. + if _valid_resource_shape(resources, available_bundles): + return + + if fn.class_name: + submitted = "actor" + name = fn.module_name + "." + fn.class_name + "." + fn.function_name + else: + submitted = "task" + name = fn.module_name + "." + fn.function_name + + # Normalize the resource spec so it looks the same as the placement group bundle. + main_resources = cur_pg.bundle_specs[0] + resources = {k: float(v) for k, v in resources.items() if v > 0} + + raise TuneError( + f"No trial resources are available for launching the {submitted} `{name}`. " + "To resolve this, specify the Tune option:\n\n" + "> resources_per_trial=tune.PlacementGroupFactory(\n" + f"> [{main_resources}] + [{resources}] * N\n" + "> )\n\n" + f"Where `N` is the number of slots to reserve for trial {submitted}s. " + "If you are using a Ray training library, there might be a utility function " + "to set this automatically for you. For more information, refer to " + "https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html" + ) + + def shutdown(): """Cleans up the trial and removes it from the global context.""" diff --git a/python/ray/tune/tests/test_warnings.py b/python/ray/tune/tests/test_warnings.py new file mode 100644 index 000000000000..a149c37aea7b --- /dev/null +++ b/python/ray/tune/tests/test_warnings.py @@ -0,0 +1,143 @@ +import pytest + +import ray +from ray import tune +from ray.data.context import DatasetContext +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy +from ray.tune.error import TuneError + + +def test_nowarn_zero_cpu(): + def f(*a): + @ray.remote(num_cpus=0) + def f(): + pass + + @ray.remote(num_cpus=0) + class Actor: + def f(self): + pass + + ray.get(f.remote()) + a = Actor.remote() + ray.get(a.f.remote()) + + tune.run(f, verbose=0) + + +def test_warn_cpu(): + def f(*a): + @ray.remote(num_cpus=1) + def f(): + pass + + ray.get(f.remote()) + + with pytest.raises(TuneError): + tune.run(f, verbose=0) + + with pytest.raises(TuneError): + tune.run( + f, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}]), verbose=0 + ) + + def g(*a): + @ray.remote(num_cpus=1) + class Actor: + def f(self): + pass + + a = Actor.remote() + ray.get(a.f.remote()) + + with pytest.raises(TuneError): + tune.run(g, verbose=0) + + with pytest.raises(TuneError): + tune.run( + g, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}]), verbose=0 + ) + + +def test_pg_slots_ok(): + def f(*a): + @ray.remote(num_cpus=1) + def f(): + pass + + @ray.remote(num_cpus=1) + class Actor: + def f(self): + pass + + ray.get(f.remote()) + a = Actor.remote() + ray.get(a.f.remote()) + + tune.run( + f, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), verbose=0 + ) + + +def test_bad_pg_slots(): + def f(*a): + @ray.remote(num_cpus=2) + def f(): + pass + + ray.get(f.remote()) + + with pytest.raises(TuneError): + tune.run( + f, + resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), + verbose=0, + ) + + +def test_dataset_ok(): + def f(*a): + ray.data.range(10).show() + + tune.run(f, verbose=0) + + def g(*a): + ctx = DatasetContext.get_current() + ctx.scheduling_strategy = PlacementGroupSchedulingStrategy( + ray.util.get_current_placement_group() + ) + ray.data.range(10).show() + + with pytest.raises(TuneError): + tune.run(g, verbose=0) + + tune.run( + g, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), verbose=0 + ) + + +def test_scheduling_strategy_override(): + def f(*a): + @ray.remote(num_cpus=1, scheduling_strategy="SPREAD") + def f(): + pass + + @ray.remote(num_cpus=1, scheduling_strategy="SPREAD") + class Actor: + def f(self): + pass + + # SPREAD tasks are not captured by placement groups, so don't warn. + ray.get(f.remote()) + + # SPREAD actors are not captured by placement groups, so don't warn. + a = Actor.remote() + ray.get(a.f.remote()) + + tune.run(f, verbose=0) + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/__init__.py b/python/ray/util/__init__.py index fde1b7e2dec2..2ccb9b4cf30d 100644 --- a/python/ray/util/__init__.py +++ b/python/ray/util/__init__.py @@ -12,6 +12,7 @@ placement_group_table, remove_placement_group, get_placement_group, + get_current_placement_group, ) from ray.util import rpdb as pdb from ray.util.serialization import register_serializer, deregister_serializer @@ -55,6 +56,7 @@ def list_named_actors(all_namespaces: bool = False) -> List[str]: "placement_group", "placement_group_table", "get_placement_group", + "get_current_placement_group", "get_node_ip_address", "remove_placement_group", "inspect_serializability", diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 5dc37d87adee..2dcb507c8253 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -303,32 +303,34 @@ def check_placement_group_index( ) +def _valid_resource_shape(resources, bundle_specs): + """ + If the resource shape cannot fit into every + bundle spec, return False + """ + for bundle in bundle_specs: + fit_in_bundle = True + for resource, requested_val in resources.items(): + # Skip "bundle" resource as it is automatically added + # to all nodes with bundles by the placement group. + if resource == BUNDLE_RESOURCE_LABEL: + continue + if bundle.get(resource, 0) < requested_val: + fit_in_bundle = False + break + if fit_in_bundle: + # If resource request fits in any bundle, it is valid. + return True + return False + + def _validate_resource_shape( placement_group, resources, placement_resources, task_or_actor_repr ): - def valid_resource_shape(resources, bundle_specs): - """ - If the resource shape cannot fit into every - bundle spec, return False - """ - for bundle in bundle_specs: - fit_in_bundle = True - for resource, requested_val in resources.items(): - # Skip "bundle" resource as it is automatically added - # to all nodes with bundles by the placement group. - if resource == BUNDLE_RESOURCE_LABEL: - continue - if bundle.get(resource, 0) < requested_val: - fit_in_bundle = False - break - if fit_in_bundle: - # If resource request fits in any bundle, it is valid. - return True - return False bundles = placement_group.bundle_specs - resources_valid = valid_resource_shape(resources, bundles) - placement_resources_valid = valid_resource_shape(placement_resources, bundles) + resources_valid = _valid_resource_shape(resources, bundles) + placement_resources_valid = _valid_resource_shape(placement_resources, bundles) if not resources_valid: raise ValueError(