Skip to content

Commit

Permalink
[core][pg] Fix pg bundles can't reschedule bug when the two nodes bot…
Browse files Browse the repository at this point in the history
…h dead (#24875)

When a bundle is rescheduled due to node death, if other bundles of this pg also trigger rescheduling due to node death, there will be a bug that the bundle cannot be scheduled.

Reason:
step 1: Node A is down, and then bundle 1 of PG deployed on this node enters this GcsPlacementGroupManager::OnNodeDead process. This PG state will be RESCHEDULING and going to scheduling.

step 2: Just when this PG was being scheduled, another node B also went down. Bundle 2 of this PG also enters this GcsPlacementGroupManager::OnNodeDead process.

step 3: Because this PG state is RESCHEDULING, the bundle 2 can't be added to pending queue。 In the end, the bundle 2 cannot be rescheduled.
  • Loading branch information
larrylian authored Nov 21, 2022
1 parent 073e7bc commit ddfeae3
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 0 deletions.
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ py_test_module_list(
"test_placement_group.py",
"test_placement_group_2.py",
"test_placement_group_4.py",
"test_placement_group_failover.py",
"test_ray_init.py",
"test_ray_init_2.py",
"test_resource_demand_scheduler.py",
Expand Down
62 changes: 62 additions & 0 deletions python/ray/tests/test_placement_group_failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest
import sys
import ray
import ray.cluster_utils
from ray._private.test_utils import (
get_other_nodes,
)

MB = 1024 * 1024


@ray.remote(num_cpus=1)
class Actor(object):
def __init__(self):
self.n = 0

def value(self):
return self.n


# Test whether the bundles spread on two nodes can be rescheduled successfully
# when both nodes die at the same time.
def test_placement_group_failover_when_two_nodes_die(monkeypatch, ray_start_cluster):
with monkeypatch.context() as m:
m.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_client.PrepareBundleResources=2000000:2000000",
)
cluster = ray_start_cluster
num_nodes = 4
nodes = []
for _ in range(num_nodes):
nodes.append(cluster.add_node(num_cpus=1))
ray.init(address=cluster.address)

bundles = [{"CPU": 1, "memory": 100 * MB} for _ in range(num_nodes)]
placement_group = ray.util.placement_group(
name="name", strategy="STRICT_SPREAD", bundles=bundles
)
assert placement_group.wait(3000)

# add more nodes for pg bundle rescedule
other_nodes = get_other_nodes(cluster, exclude_head=True)
other_nodes_num = len(other_nodes)
for i in range(other_nodes_num):
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

for node in other_nodes:
cluster.remove_node(node)

# Create actors with echo bundle to make sure all bundle are ready.
for i in range(num_nodes):
actor = Actor.options(
placement_group=placement_group, placement_group_bundle_index=i
).remote()
object_ref = actor.value.remote()
ray.get(object_ref, timeout=5)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
44 changes: 44 additions & 0 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ GcsPlacementGroup::GetUnplacedBundles() const {
return unplaced_bundles;
}

bool GcsPlacementGroup::HasUnplacedBundles() const {
return !GetUnplacedBundles().empty();
}

rpc::PlacementStrategy GcsPlacementGroup::GetStrategy() const {
return placement_group_table_data_.strategy();
}
Expand Down Expand Up @@ -320,6 +324,11 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
placement_group->GetPlacementGroupTableData(),
[this, placement_group_id](Status status) {
RAY_CHECK_OK(status);

if (RescheduleIfStillHasUnplacedBundles(placement_group_id)) {
// Don't do callback if it still has unplaced bundles.
return;
}
// Invoke all callbacks for all `WaitPlacementGroupUntilReady` requests of this
// placement group and remove all of them from
// placement_group_to_create_callbacks_.
Expand Down Expand Up @@ -728,6 +737,9 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
if (iter != registered_placement_groups_.end()) {
for (const auto &bundle_index : bundle.second) {
iter->second->GetMutableBundle(bundle_index)->clear_node_id();
RAY_LOG(INFO) << "Rescheduling a bundle when a node dies, placement group id:"
<< iter->second->GetPlacementGroupID()
<< " bundle index:" << bundle_index;
}
// TODO(ffbin): If we have a placement group bundle that requires a unique resource
// (for example gpu resource when there’s only one gpu node), this can postpone
Expand Down Expand Up @@ -919,5 +931,37 @@ void GcsPlacementGroupManager::RecordMetrics() const {
placement_group_state_counter_->FlushOnChangeCallbacks();
}

bool GcsPlacementGroupManager::IsInPendingQueue(
const PlacementGroupID &placement_group_id) const {
auto pending_it = std::find_if(pending_placement_groups_.begin(),
pending_placement_groups_.end(),
[&placement_group_id](const auto &val) {
return val.second.second->GetPlacementGroupID() ==
placement_group_id;
});
return pending_it != pending_placement_groups_.end();
}

bool GcsPlacementGroupManager::RescheduleIfStillHasUnplacedBundles(
const PlacementGroupID &placement_group_id) {
auto iter = registered_placement_groups_.find(placement_group_id);
if (iter != registered_placement_groups_.end()) {
auto &placement_group = iter->second;
if (placement_group->HasUnplacedBundles()) {
if ((!IsInPendingQueue(placement_group->GetPlacementGroupID())) &&
placement_group->GetState() != rpc::PlacementGroupTableData::REMOVED) {
RAY_LOG(INFO) << "The placement group still has unplaced bundles, so put "
"it to pending queue again, id:"
<< placement_group->GetPlacementGroupID();
placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
AddToPendingQueue(placement_group, 0);
SchedulePendingPlacementGroups();
return true;
}
}
}
return false;
}

} // namespace gcs
} // namespace ray
9 changes: 9 additions & 0 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class GcsPlacementGroup {
/// Get the unplaced bundles of this placement group.
std::vector<std::shared_ptr<const BundleSpecification>> GetUnplacedBundles() const;

/// Check if there are unplaced bundles.
bool HasUnplacedBundles() const;

/// Get the Strategy
rpc::PlacementStrategy GetStrategy() const;

Expand Down Expand Up @@ -404,6 +407,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
// Update placement group load information so that the autoscaler can use it.
void UpdatePlacementGroupLoad();

/// Check if this placement group is waiting for scheduling.
bool IsInPendingQueue(const PlacementGroupID &placement_group_id) const;

/// Reschedule this placement group if it still has unplaced bundles.
bool RescheduleIfStillHasUnplacedBundles(const PlacementGroupID &placement_group_id);

/// The io loop that is used to delay execution of tasks (e.g.,
/// execute_after).
instrumented_io_context &io_context_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
RAY_CHECK_OK(status);
promise.set_value();
});

// mock all bundles of pg have prepared and committed resource.
int bundles_size = placement_group->GetPlacementGroupTableData().bundles_size();
for (int bundle_index = 0; bundle_index < bundles_size; bundle_index++) {
placement_group->GetMutableBundle(bundle_index)
->set_node_id(NodeID::FromRandom().Binary());
}
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
promise.get_future().get();
}
Expand Down

0 comments on commit ddfeae3

Please sign in to comment.