Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Simplify options handling [Part 2] #23882

Merged
merged 2 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 15 additions & 59 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import tempfile
import threading
import time
from typing import Optional, Sequence, Tuple, Any, Union
from typing import Optional, Sequence, Tuple, Any, Union, Dict
import uuid
import grpc
import warnings
Expand Down Expand Up @@ -283,52 +283,16 @@ def set_cuda_visible_devices(gpu_ids):
last_set_gpu_ids = gpu_ids


def resources_from_resource_arguments(
default_num_cpus,
default_num_gpus,
default_memory,
default_object_store_memory,
default_resources,
default_accelerator_type,
runtime_num_cpus,
runtime_num_gpus,
runtime_memory,
runtime_object_store_memory,
runtime_resources,
runtime_accelerator_type,
):
def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Determine a task's resource requirements.

Args:
default_num_cpus: The default number of CPUs required by this function
or actor method.
default_num_gpus: The default number of GPUs required by this function
or actor method.
default_memory: The default heap memory required by this function
or actor method.
default_object_store_memory: The default object store memory required
by this function or actor method.
default_resources: The default custom resources required by this
function or actor method.
runtime_num_cpus: The number of CPUs requested when the task was
invoked.
runtime_num_gpus: The number of GPUs requested when the task was
invoked.
runtime_memory: The heap memory requested when the task was invoked.
runtime_object_store_memory: The object store memory requested when
the task was invoked.
runtime_resources: The custom resources requested when the task was
invoked.
options_dict: The dictionary that contains resources requirements.

Returns:
A dictionary of the resource requirements for the task.
"""
if runtime_resources is not None:
resources = runtime_resources.copy()
elif default_resources is not None:
resources = default_resources.copy()
else:
resources = {}
resources = (options_dict.get("resources") or {}).copy()

if "CPU" in resources or "GPU" in resources:
raise ValueError(
Expand All @@ -340,33 +304,25 @@ def resources_from_resource_arguments(
"contain the key 'memory' or 'object_store_memory'"
)

assert default_num_cpus is not None
resources["CPU"] = (
default_num_cpus if runtime_num_cpus is None else runtime_num_cpus
)

if runtime_num_gpus is not None:
resources["GPU"] = runtime_num_gpus
elif default_num_gpus is not None:
resources["GPU"] = default_num_gpus
num_cpus = options_dict.get("num_cpus")
num_gpus = options_dict.get("num_gpus")
memory = options_dict.get("memory")
object_store_memory = options_dict.get("object_store_memory")
accelerator_type = options_dict.get("accelerator_type")

# Order of arguments matter for short circuiting.
memory = runtime_memory or default_memory
object_store_memory = runtime_object_store_memory or default_object_store_memory
if num_cpus is not None:
resources["CPU"] = num_cpus
if num_gpus is not None:
resources["GPU"] = num_gpus
if memory is not None:
resources["memory"] = ray_constants.to_memory_units(memory, round_up=True)
if object_store_memory is not None:
resources["object_store_memory"] = ray_constants.to_memory_units(
object_store_memory, round_up=True
)

if runtime_accelerator_type is not None:
resources[
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" f"{runtime_accelerator_type}"
] = 0.001
elif default_accelerator_type is not None:
if accelerator_type is not None:
resources[
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" f"{default_accelerator_type}"
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"
] = 0.001

return resources
Expand Down
69 changes: 22 additions & 47 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def remote(self, *args, **kwargs):
"""
return self._remote(args=args, kwargs=kwargs, **self._default_options)

def options(self, args=None, kwargs=None, **actor_options):
def options(self, **actor_options):
"""Configures and overrides the actor instantiation parameters.

The arguments are the same as those that can be passed
Expand Down Expand Up @@ -704,12 +704,6 @@ def _remote(self, args=None, kwargs=None, **actor_options):
name = actor_options["name"]
namespace = actor_options["namespace"]
lifetime = actor_options["lifetime"]
num_cpus = actor_options["num_cpus"]
num_gpus = actor_options["num_gpus"]
accelerator_type = actor_options["accelerator_type"]
resources = actor_options["resources"]
memory = actor_options["memory"]
object_store_memory = actor_options["object_store_memory"]
runtime_env = actor_options["runtime_env"]
placement_group = actor_options["placement_group"]
placement_group_bundle_index = actor_options["placement_group_bundle_index"]
Expand Down Expand Up @@ -762,31 +756,6 @@ def _remote(self, args=None, kwargs=None, **actor_options):
"'non_detached' and 'None'."
)

# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
# specified when _remote() was called.
if (
num_cpus is None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original implementation is kind of buggy: It does not consider the case of "memory". "memory" should also be considered as a resource.

and num_gpus is None
and resources is None
and accelerator_type is None
):
# In the default case, actors acquire no resources for
# their lifetime, and actor methods will require 1 CPU.
cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
else:
# If any resources are specified (here or in decorator), then
# all resources are acquired for the actor's lifetime and no
# resources are associated with methods.
cpus_to_use = (
ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
if num_cpus is None
else num_cpus
)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED

# LOCAL_MODE cannot handle cross_language
if worker.mode == ray.LOCAL_MODE:
assert (
Expand All @@ -811,21 +780,27 @@ def _remote(self, args=None, kwargs=None, **actor_options):
meta.method_meta.methods.keys(),
)

# TODO(suquark): cleanup "resources_from_resource_arguments" later.
resources = ray._private.utils.resources_from_resource_arguments(
cpus_to_use,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
)
resources = ray._private.utils.resources_from_ray_options(actor_options)
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
# specified when _remote() was called.
# TODO(suquark): In the original code, memory is not considered as resources,
# when deciding the default CPUs. It is strange, but we keep the original
# semantics in case that it breaks user applications & tests.
if not set(resources.keys()).difference({"memory", "object_store_memory"}):
# In the default case, actors acquire no resources for
# their lifetime, and actor methods will require 1 CPU.
resources.setdefault("CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
else:
# If any resources are specified (here or in decorator), then
# all resources are acquired for the actor's lifetime and no
# resources are associated with methods.
resources.setdefault(
"CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED

# If the actor methods require CPU resources, then set the required
# placement resources. If actor_placement_resources is empty, then
Expand Down
29 changes: 2 additions & 27 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ def __call__(self, *args, **kwargs):
f"try '{self._function_name}.remote()'."
)

def options(
self,
args=None,
kwargs=None,
**task_options,
):
def options(self, **task_options):
"""Configures and overrides the task invocation parameters.

The arguments are the same as those that can be passed to :obj:`ray.remote`.
Expand Down Expand Up @@ -229,12 +224,6 @@ def _remote(self, args=None, kwargs=None, **task_options):

# TODO(suquark): cleanup these fields
name = task_options["name"]
num_cpus = task_options["num_cpus"]
num_gpus = task_options["num_gpus"]
accelerator_type = task_options["accelerator_type"]
resources = task_options["resources"]
memory = task_options["memory"]
object_store_memory = task_options["object_store_memory"]
runtime_env = parse_runtime_env(task_options["runtime_env"])
placement_group = task_options["placement_group"]
placement_group_bundle_index = task_options["placement_group_bundle_index"]
Expand All @@ -246,21 +235,7 @@ def _remote(self, args=None, kwargs=None, **task_options):
max_retries = task_options["max_retries"]
retry_exceptions = task_options["retry_exceptions"]

# TODO(suquark): cleanup "resources_from_resource_arguments" later.
resources = ray._private.utils.resources_from_resource_arguments(
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
)
resources = ray._private.utils.resources_from_ray_options(task_options)

if scheduling_strategy is None or isinstance(
scheduling_strategy, PlacementGroupSchedulingStrategy
Expand Down
1 change: 0 additions & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,6 @@ def _mode(worker=global_worker):


def _make_remote(function_or_class, options):
# filter out placeholders in options
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unexpected comment. I forgot to remove it last time.

if inspect.isfunction(function_or_class) or is_cython(function_or_class):
ray_option_utils.validate_task_options(options, in_options=False)
return ray.remote_function.RemoteFunction(
Expand Down