From 346d297e95ef5606a799d8e54612bfcc90624194 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 22 Jul 2022 15:09:33 -0700 Subject: [PATCH 1/6] assigned-resources Signed-off-by: Richard Liaw --- python/ray/runtime_context.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index 9dcddc6b3052..e7dc609bcd5e 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -1,3 +1,4 @@ +from typing import Dict import logging import ray._private.worker @@ -16,7 +17,7 @@ def __init__(self, worker): assert worker is not None self.worker = worker - def get(self): + def get(self) -> Dict[str, Any]: """Get a dictionary of the current context. Returns: @@ -160,6 +161,18 @@ def should_capture_child_tasks_in_placement_group(self): """ return self.worker.should_capture_child_tasks_in_placement_group + @property + def assigned_resources(self): + """Get the assigned resources to this worker. + + Returns: + A dictionary mapping the name of a resource to a list of pairs, where + each pair consists of the ID of a resource and the fraction of that + resource reserved for this worker. + """ + self.worker.check_connected() + return self.worker.core_worker.resource_ids() + def get_runtime_env_string(self): """Get the runtime env string used for the current driver or worker. From 5602abe1f6a2eda1fbd958aeaf31abc4bc1b21f3 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 22 Jul 2022 15:13:34 -0700 Subject: [PATCH 2/6] update Signed-off-by: Richard Liaw --- python/ray/_private/worker.py | 2 +- python/ray/runtime_context.py | 2 +- python/ray/tests/test_basic_3.py | 2 +- python/ray/tests/test_placement_group.py | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index e3cb444459d9..56ed9d61d384 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -860,7 +860,7 @@ def get_gpu_ids(): return assigned_ids -@Deprecated +@Deprecated(message="Use ray.get_runtime_context().assigned_resources instead.") def get_resource_ids(): """Get the IDs of the resources that are available to the worker. diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index e7dc609bcd5e..29adc779e385 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -1,5 +1,5 @@ -from typing import Dict import logging +from typing import Dict, Any import ray._private.worker from ray._private.client_mode_hook import client_mode_hook diff --git a/python/ray/tests/test_basic_3.py b/python/ray/tests/test_basic_3.py index d41e2a444781..a9b56b2b3f0a 100644 --- a/python/ray/tests/test_basic_3.py +++ b/python/ray/tests/test_basic_3.py @@ -71,7 +71,7 @@ def g(): def f(block, accepted_resources): true_resources = { resource: value[0][1] - for resource, value in ray._private.worker.get_resource_ids().items() + for resource, value in ray.get_runtime_context().assigned_resources.items() } if block: ray.get(g.remote()) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 547c7de6927d..e9d4e1568c15 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -370,7 +370,7 @@ def test_placement_group_actor_resource_ids(ray_start_cluster, connect_to_client @ray.remote(num_cpus=1) class F: def f(self): - return ray._private.worker.get_resource_ids() + return ray.get_runtime_context().assigned_resources cluster = ray_start_cluster num_nodes = 1 @@ -391,7 +391,7 @@ def f(self): def test_placement_group_task_resource_ids(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=1) def f(): - return ray._private.worker.get_resource_ids() + return ray.get_runtime_context().assigned_resources cluster = ray_start_cluster num_nodes = 1 @@ -423,7 +423,7 @@ def f(): def test_placement_group_hang(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=1) def f(): - return ray._private.worker.get_resource_ids() + return ray.get_runtime_context().assigned_resources cluster = ray_start_cluster num_nodes = 1 From c5e464b2cd1d0a8b94ae7df305b2cf8c28572f00 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 23 Jul 2022 02:05:45 -0700 Subject: [PATCH 3/6] update-with-tests Signed-off-by: Richard Liaw --- python/ray/runtime_context.py | 21 ++++++++++++------ python/ray/tests/test_basic_3.py | 2 +- python/ray/tests/test_runtime_context.py | 28 ++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index 29adc779e385..0722230df649 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Any +from typing import Any, Dict import ray._private.worker from ray._private.client_mode_hook import client_mode_hook @@ -161,17 +161,24 @@ def should_capture_child_tasks_in_placement_group(self): """ return self.worker.should_capture_child_tasks_in_placement_group - @property - def assigned_resources(self): + def get_assigned_resources(self): """Get the assigned resources to this worker. + By default for tasks, this will return {"CPU": 1}. + By default for actors, this will return {}. + Returns: - A dictionary mapping the name of a resource to a list of pairs, where - each pair consists of the ID of a resource and the fraction of that - resource reserved for this worker. + A dictionary mapping the name of a resource to a float, where + the float represents the amount of that resource reserved + for this worker. """ self.worker.check_connected() - return self.worker.core_worker.resource_ids() + resource_id_map = self.worker.core_worker.resource_ids() + resource_map = { + res: sum(amt for _, amt in mapping) + for res, mapping in resource_id_map.items() + } + return resource_map def get_runtime_env_string(self): """Get the runtime env string used for the current driver or worker. diff --git a/python/ray/tests/test_basic_3.py b/python/ray/tests/test_basic_3.py index a9b56b2b3f0a..d41e2a444781 100644 --- a/python/ray/tests/test_basic_3.py +++ b/python/ray/tests/test_basic_3.py @@ -71,7 +71,7 @@ def g(): def f(block, accepted_resources): true_resources = { resource: value[0][1] - for resource, value in ray.get_runtime_context().assigned_resources.items() + for resource, value in ray._private.worker.get_resource_ids().items() } if block: ray.get(g.remote()) diff --git a/python/ray/tests/test_runtime_context.py b/python/ray/tests/test_runtime_context.py index 74bc3f07a2ac..13bd7565287e 100644 --- a/python/ray/tests/test_runtime_context.py +++ b/python/ray/tests/test_runtime_context.py @@ -118,6 +118,34 @@ def echo2(self, s): assert ray.get(ray.get(obj)) == "hello" +def test_get_assigned_resources(ray_start_10_cpus): + @ray.remote + class Echo: + def check(self): + return ray.get_runtime_context().get_assigned_resources() + + e = Echo.remote() + result = e.check.remote() + print(ray.get(result)) + assert ray.get(result).get("CPU") is None + ray.kill(e) + + e = Echo.options(num_cpus=4).remote() + result = e.check.remote() + assert ray.get(result)["CPU"] == 4.0 + ray.kill(e) + + @ray.remote + def check(): + return ray.get_runtime_context().get_assigned_resources() + + result = check.remote() + assert ray.get(result)["CPU"] == 1.0 + + result = check.options(num_cpus=2).remote() + assert ray.get(result)["CPU"] == 2.0 + + def test_actor_stats_normal_task(ray_start_regular): # Because it works at the core worker level, this API works for tasks. @ray.remote From 305181f55dfcdcf85b4e1b718ee1eeb776f2b128 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 24 Jul 2022 00:58:08 -0700 Subject: [PATCH 4/6] update-tests Signed-off-by: Richard Liaw --- python/ray/_private/worker.py | 2 +- python/ray/tests/test_placement_group.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 56ed9d61d384..fcbbc503a24b 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -860,7 +860,7 @@ def get_gpu_ids(): return assigned_ids -@Deprecated(message="Use ray.get_runtime_context().assigned_resources instead.") +@Deprecated(message="Use ray.get_runtime_context().get_assigned_resources() instead.") def get_resource_ids(): """Get the IDs of the resources that are available to the worker. diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index e9d4e1568c15..2b2c070e241a 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -370,7 +370,7 @@ def test_placement_group_actor_resource_ids(ray_start_cluster, connect_to_client @ray.remote(num_cpus=1) class F: def f(self): - return ray.get_runtime_context().assigned_resources + return ray.get_runtime_context().get_assigned_resources() cluster = ray_start_cluster num_nodes = 1 @@ -391,7 +391,7 @@ def f(self): def test_placement_group_task_resource_ids(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=1) def f(): - return ray.get_runtime_context().assigned_resources + return ray.get_runtime_context().get_assigned_resources() cluster = ray_start_cluster num_nodes = 1 @@ -423,7 +423,7 @@ def f(): def test_placement_group_hang(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=1) def f(): - return ray.get_runtime_context().assigned_resources + return ray.get_runtime_context().get_assigned_resources() cluster = ray_start_cluster num_nodes = 1 From ba9feb65d5523a63093ae82caa37ec48b2db1c57 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 24 Jul 2022 01:01:03 -0700 Subject: [PATCH 5/6] fix Signed-off-by: Richard Liaw --- python/ray/runtime_context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index 0722230df649..10438402a142 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -165,7 +165,8 @@ def get_assigned_resources(self): """Get the assigned resources to this worker. By default for tasks, this will return {"CPU": 1}. - By default for actors, this will return {}. + By default for actors, this will return {}. This is because + actors do not have CPUs assigned to them by default. Returns: A dictionary mapping the name of a resource to a float, where From cf58ba3c523ad1c715610d1b6b28727714b28932 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 24 Jul 2022 01:16:59 -0700 Subject: [PATCH 6/6] assertion Signed-off-by: Richard Liaw --- python/ray/runtime_context.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index aa332915c72a..4155b0827ff7 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -273,6 +273,10 @@ def get_assigned_resources(self): the float represents the amount of that resource reserved for this worker. """ + assert ( + self.worker.mode == ray._private.worker.WORKER_MODE + ), f"This method is only available when the process is a\ + worker. Current mode: {self.worker.mode}" self.worker.check_connected() resource_id_map = self.worker.core_worker.resource_ids() resource_map = {