From ddfeae3c86b997e7a0bf1391f37d2831a2da3542 Mon Sep 17 00:00:00 2001 From: Larry <554538252@qq.com> Date: Mon, 21 Nov 2022 20:14:13 +0800 Subject: [PATCH] [core][pg] Fix pg bundles can't reschedule bug when the two nodes both dead (#24875) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a bundle is rescheduled due to node death, if other bundles of this pg also trigger rescheduling due to node death, there will be a bug that the bundle cannot be scheduled. Reason: step 1: Node A is down, and then bundle 1 of PG deployed on this node enters this GcsPlacementGroupManager::OnNodeDead process. This PG state will be RESCHEDULING and going to scheduling. step 2: Just when this PG was being scheduled, another node B also went down. Bundle 2 of this PG also enters this GcsPlacementGroupManager::OnNodeDead process. step 3: Because this PG state is RESCHEDULING, the bundle 2 can't be added to pending queue。 In the end, the bundle 2 cannot be rescheduled. --- python/ray/tests/BUILD | 1 + .../tests/test_placement_group_failover.py | 62 +++++++++++++++++++ .../gcs_server/gcs_placement_group_manager.cc | 44 +++++++++++++ .../gcs_server/gcs_placement_group_manager.h | 9 +++ .../test/gcs_placement_group_manager_test.cc | 7 +++ 5 files changed, 123 insertions(+) create mode 100755 python/ray/tests/test_placement_group_failover.py diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 33288dc87c65..615d2b8e89a4 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -121,6 +121,7 @@ py_test_module_list( "test_placement_group.py", "test_placement_group_2.py", "test_placement_group_4.py", + "test_placement_group_failover.py", "test_ray_init.py", "test_ray_init_2.py", "test_resource_demand_scheduler.py", diff --git a/python/ray/tests/test_placement_group_failover.py b/python/ray/tests/test_placement_group_failover.py new file mode 100755 index 000000000000..3bbe88536443 --- /dev/null +++ b/python/ray/tests/test_placement_group_failover.py @@ -0,0 +1,62 @@ +import pytest +import sys +import ray +import ray.cluster_utils +from ray._private.test_utils import ( + get_other_nodes, +) + +MB = 1024 * 1024 + + +@ray.remote(num_cpus=1) +class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + +# Test whether the bundles spread on two nodes can be rescheduled successfully +# when both nodes die at the same time. +def test_placement_group_failover_when_two_nodes_die(monkeypatch, ray_start_cluster): + with monkeypatch.context() as m: + m.setenv( + "RAY_testing_asio_delay_us", + "NodeManagerService.grpc_client.PrepareBundleResources=2000000:2000000", + ) + cluster = ray_start_cluster + num_nodes = 4 + nodes = [] + for _ in range(num_nodes): + nodes.append(cluster.add_node(num_cpus=1)) + ray.init(address=cluster.address) + + bundles = [{"CPU": 1, "memory": 100 * MB} for _ in range(num_nodes)] + placement_group = ray.util.placement_group( + name="name", strategy="STRICT_SPREAD", bundles=bundles + ) + assert placement_group.wait(3000) + + # add more nodes for pg bundle rescedule + other_nodes = get_other_nodes(cluster, exclude_head=True) + other_nodes_num = len(other_nodes) + for i in range(other_nodes_num): + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + for node in other_nodes: + cluster.remove_node(node) + + # Create actors with echo bundle to make sure all bundle are ready. + for i in range(num_nodes): + actor = Actor.options( + placement_group=placement_group, placement_group_bundle_index=i + ).remote() + object_ref = actor.value.remote() + ray.get(object_ref, timeout=5) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 203a1136a8d2..285785c26883 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -71,6 +71,10 @@ GcsPlacementGroup::GetUnplacedBundles() const { return unplaced_bundles; } +bool GcsPlacementGroup::HasUnplacedBundles() const { + return !GetUnplacedBundles().empty(); +} + rpc::PlacementStrategy GcsPlacementGroup::GetStrategy() const { return placement_group_table_data_.strategy(); } @@ -320,6 +324,11 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( placement_group->GetPlacementGroupTableData(), [this, placement_group_id](Status status) { RAY_CHECK_OK(status); + + if (RescheduleIfStillHasUnplacedBundles(placement_group_id)) { + // Don't do callback if it still has unplaced bundles. + return; + } // Invoke all callbacks for all `WaitPlacementGroupUntilReady` requests of this // placement group and remove all of them from // placement_group_to_create_callbacks_. @@ -728,6 +737,9 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) { if (iter != registered_placement_groups_.end()) { for (const auto &bundle_index : bundle.second) { iter->second->GetMutableBundle(bundle_index)->clear_node_id(); + RAY_LOG(INFO) << "Rescheduling a bundle when a node dies, placement group id:" + << iter->second->GetPlacementGroupID() + << " bundle index:" << bundle_index; } // TODO(ffbin): If we have a placement group bundle that requires a unique resource // (for example gpu resource when there’s only one gpu node), this can postpone @@ -919,5 +931,37 @@ void GcsPlacementGroupManager::RecordMetrics() const { placement_group_state_counter_->FlushOnChangeCallbacks(); } +bool GcsPlacementGroupManager::IsInPendingQueue( + const PlacementGroupID &placement_group_id) const { + auto pending_it = std::find_if(pending_placement_groups_.begin(), + pending_placement_groups_.end(), + [&placement_group_id](const auto &val) { + return val.second.second->GetPlacementGroupID() == + placement_group_id; + }); + return pending_it != pending_placement_groups_.end(); +} + +bool GcsPlacementGroupManager::RescheduleIfStillHasUnplacedBundles( + const PlacementGroupID &placement_group_id) { + auto iter = registered_placement_groups_.find(placement_group_id); + if (iter != registered_placement_groups_.end()) { + auto &placement_group = iter->second; + if (placement_group->HasUnplacedBundles()) { + if ((!IsInPendingQueue(placement_group->GetPlacementGroupID())) && + placement_group->GetState() != rpc::PlacementGroupTableData::REMOVED) { + RAY_LOG(INFO) << "The placement group still has unplaced bundles, so put " + "it to pending queue again, id:" + << placement_group->GetPlacementGroupID(); + placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); + AddToPendingQueue(placement_group, 0); + SchedulePendingPlacementGroups(); + return true; + } + } + } + return false; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 27237147ad67..ca998bff6f3b 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -123,6 +123,9 @@ class GcsPlacementGroup { /// Get the unplaced bundles of this placement group. std::vector> GetUnplacedBundles() const; + /// Check if there are unplaced bundles. + bool HasUnplacedBundles() const; + /// Get the Strategy rpc::PlacementStrategy GetStrategy() const; @@ -404,6 +407,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { // Update placement group load information so that the autoscaler can use it. void UpdatePlacementGroupLoad(); + /// Check if this placement group is waiting for scheduling. + bool IsInPendingQueue(const PlacementGroupID &placement_group_id) const; + + /// Reschedule this placement group if it still has unplaced bundles. + bool RescheduleIfStillHasUnplacedBundles(const PlacementGroupID &placement_group_id); + /// The io loop that is used to delay execution of tasks (e.g., /// execute_after). instrumented_io_context &io_context_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index db89b0e29eb9..b997df7cdf0b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -140,6 +140,13 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { RAY_CHECK_OK(status); promise.set_value(); }); + + // mock all bundles of pg have prepared and committed resource. + int bundles_size = placement_group->GetPlacementGroupTableData().bundles_size(); + for (int bundle_index = 0; bundle_index < bundles_size; bundle_index++) { + placement_group->GetMutableBundle(bundle_index) + ->set_node_id(NodeID::FromRandom().Binary()); + } gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); promise.get_future().get(); }