diff --git a/doc/source/ray-core/actors/scheduling.rst b/doc/source/ray-core/actors/scheduling.rst index 4bef2f537534..d719d8dd86e7 100644 --- a/doc/source/ray-core/actors/scheduling.rst +++ b/doc/source/ray-core/actors/scheduling.rst @@ -26,11 +26,14 @@ See :ref:`Placement Group ` for more details. Scheduling Strategy ------------------- Actors support a ``scheduling_strategy`` option to specify the strategy used to decide the best node among available nodes. -Currently the supported strategies for actors are ``"DEFAULT"`` and ``ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id, soft: bool)``. +Currently the supported strategies for actors are ``"DEFAULT"``, ``"SPREAD"`` and +``ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id, soft: bool)``. ``"DEFAULT"`` is the default strategy used by Ray. With the current implementation, Ray will try to pack actors on nodes until the resource utilization is beyond a certain threshold and spread actors afterwards. +"SPREAD" strategy will try to spread the actors among available nodes. + NodeAffinitySchedulingStrategy is a low-level strategy that allows an actor to be scheduled onto a particular node specified by its node id. The ``soft`` flag specifies whether the actor is allowed to run somewhere else if the specified node doesn't exist (e.g. if the node dies) or is infeasible because it does not have the resources required to run the actor. In these cases, if ``soft`` is True, the actor will be scheduled onto a different feasible node. @@ -48,7 +51,7 @@ Since nodes are randomly chosen, actors that don't require any resources are eff .. code-block:: python - @ray.remote + @ray.remote(num_cpus=1) class Actor: pass @@ -65,3 +68,6 @@ Since nodes are randomly chosen, actors that don't require any resources are eff soft = False, ) ).remote() + + # Spread actors across the cluster. + actors = [Actor.options(scheduling_strategy="SPREAD").remote() for i in range(100)] diff --git a/python/ray/tests/test_scheduling_2.py b/python/ray/tests/test_scheduling_2.py index cb10b9a151b2..2f414f87c414 100644 --- a/python/ray/tests/test_scheduling_2.py +++ b/python/ray/tests/test_scheduling_2.py @@ -443,7 +443,7 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client): @ray.remote def get_node_id(): - return ray.worker.global_worker.current_node_id + return ray.get_runtime_context().node_id.hex() worker_node_ids = { ray.get(get_node_id.options(resources={f"foo:{i}": 1}).remote()) @@ -457,12 +457,12 @@ def task1(): internal_kv._internal_kv_put("test_task1", "task1") while internal_kv._internal_kv_exists("test_task1"): time.sleep(0.1) - return ray.worker.global_worker.current_node_id + return ray.get_runtime_context().node_id.hex() @ray.remote def task2(): internal_kv._internal_kv_put("test_task2", "task2") - return ray.worker.global_worker.current_node_id + return ray.get_runtime_context().node_id.hex() locations = [] locations.append(task1.remote()) @@ -477,6 +477,25 @@ def task2(): internal_kv._internal_kv_del("test_task2") assert set(ray.get(locations)) == worker_node_ids + # Wait for updating driver raylet's resource view. + time.sleep(5) + + # Make sure actors can be spreaded as well. + @ray.remote(num_cpus=1) + class Actor: + def ping(self): + return ray.get_runtime_context().node_id.hex() + + actors = [] + locations = [] + for i in range(8): + actors.append(Actor.options(scheduling_strategy="SPREAD").remote()) + locations.append(ray.get(actors[-1].ping.remote())) + locations.sort() + expected_locations = list(worker_node_ids) * 4 + expected_locations.sort() + assert locations == expected_locations + @pytest.mark.skipif( platform.system() == "Windows", reason="FakeAutoscaler doesn't work on Windows"