Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Better error message for Tune nested tasks / actors #25241

Merged
merged 3 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

logger = logging.getLogger(__name__)

# Hook to call with (actor, resources, strategy) on each local actor creation.
_actor_launch_hook = None


@PublicAPI
@client_mode_hook(auto_init=False)
Expand Down Expand Up @@ -909,6 +912,11 @@ def _remote(self, args=None, kwargs=None, **actor_options):
scheduling_strategy=scheduling_strategy,
)

if _actor_launch_hook:
_actor_launch_hook(
meta.actor_creation_function_descriptor, resources, scheduling_strategy
)

actor_handle = ActorHandle(
meta.language,
actor_id,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def read_datasource(
)

if len(read_tasks) < parallelism and (
len(read_tasks) < ray.available_resources().get("CPU", parallelism) // 2
len(read_tasks) < ray.available_resources().get("CPU", 1) // 2
Copy link
Contributor Author

@ericl ericl Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CPU == 0 / empty, fallback to 1 instead of parallelism (too large).

):
logger.warning(
"The number of blocks in this dataset ({}) limits its parallelism to {} "
Expand Down
7 changes: 7 additions & 0 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
logger = logging.getLogger(__name__)


# Hook to call with (fn, resources, strategy) on each local task submission.
_task_launch_hook = None


class RemoteFunction:
"""A remote function.

Expand Down Expand Up @@ -287,6 +291,9 @@ def _remote(self, args=None, kwargs=None, **task_options):
serialize=True,
)

if _task_launch_hook:
_task_launch_hook(self._function_descriptor, resources, scheduling_strategy)

def invocation(args, kwargs):
if self._is_cross_language:
list_args = cross_language.format_args(worker, args, kwargs)
Expand Down
8 changes: 8 additions & 0 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ py_test(
tags = ["team:ml", "exclusive", "tests_dir_R"],
)

py_test(
name = "test_warnings",
size = "medium",
srcs = ["tests/test_warnings.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "tests_dir_W"],
)

py_test(
name = "test_sample",
size = "large",
Expand Down
83 changes: 83 additions & 0 deletions python/ray/tune/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@
import os
import logging
import traceback
from typing import Dict, Optional, Set

import ray
from ray.util.debug import log_once
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.util.placement_group import _valid_resource_shape
from ray.util.scheduling_strategies import (
SchedulingStrategyT,
PlacementGroupSchedulingStrategy,
)
from ray.tune.error import TuneError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,9 +75,84 @@ def init(reporter, ignore_reinit_error=True):
"Most session commands will have no effect."
)

# Setup hooks for generating placement group resource deadlock warnings.
from ray import actor, remote_function

if "TUNE_DISABLE_RESOURCE_CHECKS" not in os.environ:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add to doc/source/tune/api_docs/env.rst? Or we intend this to be more of an internal feature flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if you could add this to the docs.

Also

Suggested change
if "TUNE_DISABLE_RESOURCE_CHECKS" not in os.environ:
if os.environ.get("TUNE_DISABLE_RESOURCE_CHECKS", "0") != "1":

to follow convention with other tune env variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep this a hidden / internal flag only, that's not documented. It should only be used for internal debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok with me

actor._actor_launch_hook = tune_task_and_actor_launch_hook
remote_function._task_launch_hook = tune_task_and_actor_launch_hook

_session = reporter


# Cache of resource dicts that have been checked by the launch hook already.
_checked_resources: Set[frozenset] = set()


def tune_task_and_actor_launch_hook(
fn, resources: Dict[str, float], strategy: Optional[SchedulingStrategyT]
):
"""Launch hook to catch nested tasks that can't fit in the placement group.

This gives users a nice warning in case they launch a nested task in a Tune trial
without reserving resources in the trial placement group to fit it.
"""

# Already checked, skip for performance reasons.
key = frozenset({(k, v) for k, v in resources.items() if v > 0})
if not key or key in _checked_resources:
return

# No need to check if placement group is None.
if (
not isinstance(strategy, PlacementGroupSchedulingStrategy)
or strategy.placement_group is None
):
return

# Check if the resource request is targeting the current placement group.
cur_pg = ray.util.get_current_placement_group()
if not cur_pg or strategy.placement_group.id != cur_pg.id:
return

_checked_resources.add(key)

# Check if the request can be fulfilled by the current placement group.
pgf = get_trial_resources()

if pgf.head_bundle_is_empty:
available_bundles = cur_pg.bundle_specs[0:]
else:
available_bundles = cur_pg.bundle_specs[1:]

# Check if the request can be fulfilled by the current placement group.
if _valid_resource_shape(resources, available_bundles):
return

if fn.class_name:
submitted = "actor"
name = fn.module_name + "." + fn.class_name + "." + fn.function_name
else:
submitted = "task"
name = fn.module_name + "." + fn.function_name

# Normalize the resource spec so it looks the same as the placement group bundle.
main_resources = cur_pg.bundle_specs[0]
resources = {k: float(v) for k, v in resources.items() if v > 0}

raise TuneError(
f"No trial resources are available for launching the {submitted} `{name}`. "
"To resolve this, specify the Tune option:\n\n"
"> resources_per_trial=tune.PlacementGroupFactory(\n"
f"> [{main_resources}] + [{resources}] * N\n"
"> )\n\n"
f"Where `N` is the number of slots to reserve for trial {submitted}s. "
"If you are using a Ray training library, there might be a utility function "
"to set this automatically for you. For more information, refer to "
"https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html"
)


def shutdown():
"""Cleans up the trial and removes it from the global context."""

Expand Down
143 changes: 143 additions & 0 deletions python/ray/tune/tests/test_warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import pytest

import ray
from ray import tune
from ray.data.context import DatasetContext
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.tune.error import TuneError


def test_nowarn_zero_cpu():
def f(*a):
@ray.remote(num_cpus=0)
def f():
pass

@ray.remote(num_cpus=0)
class Actor:
def f(self):
pass

ray.get(f.remote())
a = Actor.remote()
ray.get(a.f.remote())

tune.run(f, verbose=0)


def test_warn_cpu():
def f(*a):
@ray.remote(num_cpus=1)
def f():
pass

ray.get(f.remote())

with pytest.raises(TuneError):
tune.run(f, verbose=0)

with pytest.raises(TuneError):
tune.run(
f, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}]), verbose=0
)

def g(*a):
@ray.remote(num_cpus=1)
class Actor:
def f(self):
pass

a = Actor.remote()
ray.get(a.f.remote())

with pytest.raises(TuneError):
tune.run(g, verbose=0)

with pytest.raises(TuneError):
tune.run(
g, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}]), verbose=0
)


def test_pg_slots_ok():
def f(*a):
@ray.remote(num_cpus=1)
def f():
pass

@ray.remote(num_cpus=1)
class Actor:
def f(self):
pass

ray.get(f.remote())
a = Actor.remote()
ray.get(a.f.remote())

tune.run(
f, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), verbose=0
)


def test_bad_pg_slots():
def f(*a):
@ray.remote(num_cpus=2)
def f():
pass

ray.get(f.remote())

with pytest.raises(TuneError):
tune.run(
f,
resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2),
verbose=0,
)


def test_dataset_ok():
def f(*a):
ray.data.range(10).show()

tune.run(f, verbose=0)

def g(*a):
ctx = DatasetContext.get_current()
ctx.scheduling_strategy = PlacementGroupSchedulingStrategy(
ray.util.get_current_placement_group()
)
ray.data.range(10).show()

with pytest.raises(TuneError):
tune.run(g, verbose=0)

tune.run(
g, resources_per_trial=tune.PlacementGroupFactory([{"CPU": 1}] * 2), verbose=0
)


def test_scheduling_strategy_override():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I got lost here. Can you add a few comments about what this is trying to capture?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def f(*a):
@ray.remote(num_cpus=1, scheduling_strategy="SPREAD")
def f():
pass

@ray.remote(num_cpus=1, scheduling_strategy="SPREAD")
class Actor:
def f(self):
pass

# SPREAD tasks are not captured by placement groups, so don't warn.
ray.get(f.remote())

# SPREAD actors are not captured by placement groups, so don't warn.
a = Actor.remote()
ray.get(a.f.remote())

tune.run(f, verbose=0)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
2 changes: 2 additions & 0 deletions python/ray/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
placement_group_table,
remove_placement_group,
get_placement_group,
get_current_placement_group,
)
from ray.util import rpdb as pdb
from ray.util.serialization import register_serializer, deregister_serializer
Expand Down Expand Up @@ -55,6 +56,7 @@ def list_named_actors(all_namespaces: bool = False) -> List[str]:
"placement_group",
"placement_group_table",
"get_placement_group",
"get_current_placement_group",
"get_node_ip_address",
"remove_placement_group",
"inspect_serializability",
Expand Down
44 changes: 23 additions & 21 deletions python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,32 +303,34 @@ def check_placement_group_index(
)


def _valid_resource_shape(resources, bundle_specs):
"""
If the resource shape cannot fit into every
bundle spec, return False
"""
for bundle in bundle_specs:
fit_in_bundle = True
for resource, requested_val in resources.items():
# Skip "bundle" resource as it is automatically added
# to all nodes with bundles by the placement group.
if resource == BUNDLE_RESOURCE_LABEL:
continue
if bundle.get(resource, 0) < requested_val:
fit_in_bundle = False
break
if fit_in_bundle:
# If resource request fits in any bundle, it is valid.
return True
return False


def _validate_resource_shape(
placement_group, resources, placement_resources, task_or_actor_repr
):
def valid_resource_shape(resources, bundle_specs):
"""
If the resource shape cannot fit into every
bundle spec, return False
"""
for bundle in bundle_specs:
fit_in_bundle = True
for resource, requested_val in resources.items():
# Skip "bundle" resource as it is automatically added
# to all nodes with bundles by the placement group.
if resource == BUNDLE_RESOURCE_LABEL:
continue
if bundle.get(resource, 0) < requested_val:
fit_in_bundle = False
break
if fit_in_bundle:
# If resource request fits in any bundle, it is valid.
return True
return False

bundles = placement_group.bundle_specs
resources_valid = valid_resource_shape(resources, bundles)
placement_resources_valid = valid_resource_shape(placement_resources, bundles)
resources_valid = _valid_resource_shape(resources, bundles)
placement_resources_valid = _valid_resource_shape(placement_resources, bundles)

if not resources_valid:
raise ValueError(
Expand Down