Skip to content

Commit

Permalink
[tune] Better error message for Tune nested tasks / actors (#25241)
Browse files Browse the repository at this point in the history
This PR uses a task/actor launch hook to generate better error messages for nested Tune tasks/actors in the case there are no extra resources reserved for them. The idea is that the Tune trial runner actor can set a hook prior to executing the user code. If the user code launches a task, and the placement group for the trial cannot possibly fit the task, then we raise TuneError right off to warn the user.
  • Loading branch information
ericl authored Jun 3, 2022
1 parent 03ed27b commit 22aaf47
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 22 deletions.
8 changes: 8 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

logger = logging.getLogger(__name__)

# Hook to call with (actor, resources, strategy) on each local actor creation.
_actor_launch_hook = None


@PublicAPI
@client_mode_hook(auto_init=False)
Expand Down Expand Up @@ -909,6 +912,11 @@ def _remote(self, args=None, kwargs=None, **actor_options):
scheduling_strategy=scheduling_strategy,
)

if _actor_launch_hook:
_actor_launch_hook(
meta.actor_creation_function_descriptor, resources, scheduling_strategy
)

actor_handle = ActorHandle(
meta.language,
actor_id,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def read_datasource(
)

if len(read_tasks) < parallelism and (
len(read_tasks) < ray.available_resources().get("CPU", parallelism) // 2
len(read_tasks) < ray.available_resources().get("CPU", 1) // 2
):
logger.warning(
"The number of blocks in this dataset ({}) limits its parallelism to {} "
Expand Down
7 changes: 7 additions & 0 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
logger = logging.getLogger(__name__)


# Hook to call with (fn, resources, strategy) on each local task submission.
_task_launch_hook = None


class RemoteFunction:
"""A remote function.
Expand Down Expand Up @@ -287,6 +291,9 @@ def _remote(self, args=None, kwargs=None, **task_options):
serialize=True,
)

if _task_launch_hook:
_task_launch_hook(self._function_descriptor, resources, scheduling_strategy)

def invocation(args, kwargs):
if self._is_cross_language:
list_args = cross_language.format_args(worker, args, kwargs)
Expand Down
8 changes: 8 additions & 0 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ py_test(
tags = ["team:ml", "exclusive", "tests_dir_R"],
)

py_test(
name = "test_warnings",
size = "medium",
srcs = ["tests/test_warnings.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "tests_dir_W"],
)

py_test(
name = "test_sample",
size = "large",
Expand Down
83 changes: 83 additions & 0 deletions python/ray/tune/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@
import os
import logging
import traceback
from typing import Dict, Optional, Set

import ray
from ray.util.debug import log_once
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.util.placement_group import _valid_resource_shape
from ray.util.scheduling_strategies import (
SchedulingStrategyT,
PlacementGroupSchedulingStrategy,
)
from ray.tune.error import TuneError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,9 +75,84 @@ def init(reporter, ignore_reinit_error=True):
"Most session commands will have no effect."
)

# Setup hooks for generating placement group resource deadlock warnings.
from ray import actor, remote_function

if "TUNE_DISABLE_RESOURCE_CHECKS" not in os.environ:
actor._actor_launch_hook = tune_task_and_actor_launch_hook
remote_function._task_launch_hook = tune_task_and_actor_launch_hook

_session = reporter


# Cache of resource dicts that have been checked by the launch hook already.
_checked_resources: Set[frozenset] = set()


def tune_task_and_actor_launch_hook(
fn, resources: Dict[str, float], strategy: Optional[SchedulingStrategyT]
):
"""Launch hook to catch nested tasks that can't fit in the placement group.
This gives users a nice warning in case they launch a nested task in a Tune trial
without reserving resources in the trial placement group to fit it.
"""

# Already checked, skip for performance reasons.
key = frozenset({(k, v) for k, v in resources.items() if v > 0})
if not key or key in _checked_resources:
return

# No need to check if placement group is None.
if (
not isinstance(strategy, PlacementGroupSchedulingStrategy)
or strategy.placement_group is None
):
return

# Check if the resource request is targeting the current placement group.
cur_pg = ray.util.get_current_placement_group()
if not cur_pg or strategy.placement_group.id != cur_pg.id:
return

_checked_resources.add(key)

# Check if the request can be fulfilled by the current placement group.
pgf = get_trial_resources()

if pgf.head_bundle_is_empty:
available_bundles = cur_pg.bundle_specs[0:]
else:
available_bundles = cur_pg.bundle_specs[1:]

# Check if the request can be fulfilled by the current placement group.
if _valid_resource_shape(resources, available_bundles):
return

if fn.class_name:
submitted = "actor"
name = fn.module_name + "." + fn.class_name + "." + fn.function_name
else:
submitted = "task"
name = fn.module_name + "." + fn.function_name

# Normalize the resource spec so it looks the same as the placement group bundle.
main_resources = cur_pg.bundle_specs[0]
resources = {k: float(v) for k, v in resources.items() if v > 0}

raise TuneError(
f"No trial resources are available for launching the {submitted} `{name}`. "
"To resolve this, specify the Tune option:\n\n"
"> resources_per_trial=tune.PlacementGroupFactory(\n"
f"> [{main_resources}] + [{resources}] * N\n"
"> )\n\n"
f"Where `N` is the number of slots to reserve for trial {submitted}s. "
"If you are using a Ray training library, there might be a utility function "
"to set this automatically for you. For more information, refer to "
"https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html"
)


def shutdown():
"""Cleans up the trial and removes it from the global context."""

Expand Down
143 changes: 143 additions & 0 deletions python/ray/tune/tests/test_warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import pytest

import ray
from ray import tune
from ray.data.context import DatasetContext
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.tune.error import TuneError


def test_nowarn_zero_cpu():
def f(*a):
@ray.remote(num_cpus=0)
def f():
pass

@ray.remote(num_cpus=0)
class Actor:
def f(self):
pass

ray.get(f.remote())
a = Actor.remote()
ray.get(a.f.remote())

tune.run(f, verbose=0)


def test_warn_cpu():
def f(*a):
@ray.remote(num_cpus=1)
def f():
pass

ray.get(f.remote())

with pytest.raises(TuneError):
tune.run(f, verbose=0)

with pytest.raises(TuneError):
tune.run(
f, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}]), verbose=0
)

def g(*a):
@ray.remote(num_cpus=1)
class Actor:
def f(self):
pass

a = Actor.remote()
ray.get(a.f.remote())

with pytest.raises(TuneError):
tune.run(g, verbose=0)

with pytest.raises(TuneError):
tune.run(
g, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}]), verbose=0
)


def test_pg_slots_ok():
def f(*a):
@ray.remote(num_cpus=1)
def f():
pass

@ray.remote(num_cpus=1)
class Actor:
def f(self):
pass

ray.get(f.remote())
a = Actor.remote()
ray.get(a.f.remote())

tune.run(
f, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), verbose=0
)


def test_bad_pg_slots():
def f(*a):
@ray.remote(num_cpus=2)
def f():
pass

ray.get(f.remote())

with pytest.raises(TuneError):
tune.run(
f,
resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2),
verbose=0,
)


def test_dataset_ok():
def f(*a):
ray.data.range(10).show()

tune.run(f, verbose=0)

def g(*a):
ctx = DatasetContext.get_current()
ctx.scheduling_strategy = PlacementGroupSchedulingStrategy(
ray.util.get_current_placement_group()
)
ray.data.range(10).show()

with pytest.raises(TuneError):
tune.run(g, verbose=0)

tune.run(
g, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), verbose=0
)


def test_scheduling_strategy_override():
def f(*a):
@ray.remote(num_cpus=1, scheduling_strategy="SPREAD")
def f():
pass

@ray.remote(num_cpus=1, scheduling_strategy="SPREAD")
class Actor:
def f(self):
pass

# SPREAD tasks are not captured by placement groups, so don't warn.
ray.get(f.remote())

# SPREAD actors are not captured by placement groups, so don't warn.
a = Actor.remote()
ray.get(a.f.remote())

tune.run(f, verbose=0)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
2 changes: 2 additions & 0 deletions python/ray/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
placement_group_table,
remove_placement_group,
get_placement_group,
get_current_placement_group,
)
from ray.util import rpdb as pdb
from ray.util.serialization import register_serializer, deregister_serializer
Expand Down Expand Up @@ -55,6 +56,7 @@ def list_named_actors(all_namespaces: bool = False) -> List[str]:
"placement_group",
"placement_group_table",
"get_placement_group",
"get_current_placement_group",
"get_node_ip_address",
"remove_placement_group",
"inspect_serializability",
Expand Down
44 changes: 23 additions & 21 deletions python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,32 +303,34 @@ def check_placement_group_index(
)


def _valid_resource_shape(resources, bundle_specs):
"""
If the resource shape cannot fit into every
bundle spec, return False
"""
for bundle in bundle_specs:
fit_in_bundle = True
for resource, requested_val in resources.items():
# Skip "bundle" resource as it is automatically added
# to all nodes with bundles by the placement group.
if resource == BUNDLE_RESOURCE_LABEL:
continue
if bundle.get(resource, 0) < requested_val:
fit_in_bundle = False
break
if fit_in_bundle:
# If resource request fits in any bundle, it is valid.
return True
return False


def _validate_resource_shape(
placement_group, resources, placement_resources, task_or_actor_repr
):
def valid_resource_shape(resources, bundle_specs):
"""
If the resource shape cannot fit into every
bundle spec, return False
"""
for bundle in bundle_specs:
fit_in_bundle = True
for resource, requested_val in resources.items():
# Skip "bundle" resource as it is automatically added
# to all nodes with bundles by the placement group.
if resource == BUNDLE_RESOURCE_LABEL:
continue
if bundle.get(resource, 0) < requested_val:
fit_in_bundle = False
break
if fit_in_bundle:
# If resource request fits in any bundle, it is valid.
return True
return False

bundles = placement_group.bundle_specs
resources_valid = valid_resource_shape(resources, bundles)
placement_resources_valid = valid_resource_shape(placement_resources, bundles)
resources_valid = _valid_resource_shape(resources, bundles)
placement_resources_valid = _valid_resource_shape(placement_resources, bundles)

if not resources_valid:
raise ValueError(
Expand Down

0 comments on commit 22aaf47

Please sign in to comment.