Skip to content

Commit

Permalink
[Test] Try fixing a flaky gcs heartbeat manager test. (#27096)
Browse files Browse the repository at this point in the history
Heartbeat manager starts its own thread to run its background task and that shares the same data structured used within HandleReportHeartbeat (heartbeats_). That said, both methods should run in the same thread. This achieves it by running HandleReportHeartbeat within the io_service thread
  • Loading branch information
rkooo567 authored Jul 29, 2022
1 parent 749d313 commit c1ac2bb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
2 changes: 0 additions & 2 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,8 @@ async def _update_nodes(self):
and self._node_update_cnt * FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS
< FREQUENT_UPDATE_TIMEOUT_SECONDS
):
logger.info("SANG-TODO a")
await asyncio.sleep(FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS)
else:
logger.info("SANG-TODO b")
if head_node_not_registered:
logger.warning(
"Head node is not registered even after "
Expand Down
64 changes: 48 additions & 16 deletions src/ray/gcs/gcs_server/test/gcs_heartbeat_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <chrono>

#include "absl/synchronization/mutex.h"
#include "gtest/gtest.h"

using namespace ray;
Expand All @@ -35,8 +36,11 @@ class GcsHeartbeatManagerTest : public ::testing::Test {
}

void SetUp() override {
heartbeat_manager = std::make_unique<GcsHeartbeatManager>(
io_service, [this](const NodeID &node_id) { dead_nodes.push_back(node_id); });
heartbeat_manager =
std::make_unique<GcsHeartbeatManager>(io_service, [this](const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
dead_nodes.push_back(node_id);
});
heartbeat_manager->Start();
}

Expand All @@ -50,7 +54,11 @@ class GcsHeartbeatManagerTest : public ::testing::Test {

instrumented_io_context io_service;
std::unique_ptr<GcsHeartbeatManager> heartbeat_manager;
std::vector<NodeID> dead_nodes;
mutable absl::Mutex mutex_;
// This field needs to be protected because it is accessed
// by a different thread created by `heartbeat_manager`.
std::vector<NodeID> dead_nodes GUARDED_BY(mutex_);
;
};

TEST_F(GcsHeartbeatManagerTest, TestBasicTimeout) {
Expand All @@ -59,26 +67,36 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicTimeout) {
AddNode(node_1);

while (absl::Now() - start < absl::Seconds(1)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
}

std::this_thread::sleep_for(2s);

ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
{
absl::MutexLock lock(&mutex_);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
}
}

TEST_F(GcsHeartbeatManagerTest, TestBasicReport) {
auto node_1 = NodeID::FromRandom();
auto start = absl::Now();
AddNode(node_1);

rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->set_node_id(node_1.Binary());
while (absl::Now() - start < absl::Seconds(3)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
// std::function<void(ray::Status, std::function<void()>, std::function<void()>)>'
heartbeat_manager->HandleReportHeartbeat(request, &reply, [](auto, auto, auto) {});
io_service.post(
[&]() {
rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->set_node_id(node_1.Binary());
heartbeat_manager->HandleReportHeartbeat(
request, &reply, [](auto, auto, auto) {});
},
"HandleReportHeartbeat");
std::this_thread::sleep_for(0.1s);
}
}

Expand All @@ -99,11 +117,15 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicRestart) {
heartbeat_manager->Initialize(init_data);

while (absl::Now() - start < absl::Seconds(3)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
}

std::this_thread::sleep_for(2s);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
{
absl::MutexLock lock(&mutex_);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
}
}

TEST_F(GcsHeartbeatManagerTest, TestBasicRestart2) {
Expand All @@ -122,18 +144,28 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicRestart2) {

heartbeat_manager->Initialize(init_data);

rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;

while (absl::Now() - start < absl::Seconds(1)) {
request.mutable_heartbeat()->set_node_id(node_1.Binary());
heartbeat_manager->HandleReportHeartbeat(request, &reply, [](auto, auto, auto) {});
io_service.post(
[&]() {
rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->set_node_id(node_1.Binary());
heartbeat_manager->HandleReportHeartbeat(
request, &reply, [](auto, auto, auto) {});
},
"HandleReportHeartbeat");
// Added a sleep to avoid io service overloaded.
std::this_thread::sleep_for(0.1s);
}

while (absl::Now() - start < absl::Seconds(1)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
}

std::this_thread::sleep_for(2s);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
{
absl::MutexLock lock(&mutex_);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
}
}

0 comments on commit c1ac2bb

Please sign in to comment.