From b3136dd9820ff66fe4da67bb45f78eef6f03b400 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 05:36:29 +0000 Subject: [PATCH 01/17] merge Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- BUILD.bazel | 15 + .../modules/healthz/tests/test_healthz.py | 19 +- python/ray/includes/ray_config.pxd | 10 + python/ray/includes/ray_config.pxi | 20 ++ python/ray/tests/test_failure.py | 14 +- python/ray/tests/test_metrics_agent.py | 4 +- python/ray/tests/test_multi_node_2.py | 18 +- src/ray/common/ray_config_def.h | 7 + .../test/gcs_client_reconnection_test.cc | 12 +- .../gcs_server/gcs_health_check_manager.cc | 121 ++++++++ .../gcs/gcs_server/gcs_health_check_manager.h | 151 ++++++++++ .../gcs/gcs_server/gcs_heartbeat_manager.h | 1 - src/ray/gcs/gcs_server/gcs_server.cc | 84 ++++-- src/ray/gcs/gcs_server/gcs_server.h | 3 + .../test/gcs_health_check_manager_test.cc | 274 ++++++++++++++++++ .../gcs_server/test/gcs_server_rpc_test.cc | 14 +- src/ray/raylet/node_manager.cc | 4 +- src/ray/rpc/grpc_server.h | 2 + 18 files changed, 729 insertions(+), 44 deletions(-) create mode 100644 src/ray/gcs/gcs_server/gcs_health_check_manager.cc create mode 100644 src/ray/gcs/gcs_server/gcs_health_check_manager.h create mode 100644 src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index c80e3bd2cd1f..17cbf09a23fa 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -546,6 +546,7 @@ cc_library( ":scheduler", ":worker_rpc", "@boost//:bimap", + "@com_github_grpc_grpc//src/proto/grpc/health/v1:health_proto", "@com_google_absl//absl/container:btree", ], ) @@ -1810,6 +1811,20 @@ cc_library( ], ) +cc_test( + name = "gcs_health_check_manager_test", + size = "small", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc", + ], + copts = COPTS, + tags = ["team:core"], + deps = [ + ":gcs_server_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "gcs_node_manager_test", size = "small", diff --git a/dashboard/modules/healthz/tests/test_healthz.py b/dashboard/modules/healthz/tests/test_healthz.py index 49574ad3f657..836a064c42d0 100644 --- a/dashboard/modules/healthz/tests/test_healthz.py +++ b/dashboard/modules/healthz/tests/test_healthz.py @@ -7,7 +7,9 @@ from ray._private.test_utils import find_free_port, wait_for_condition -def test_healthz_head(ray_start_cluster): +@pytest.mark.parametrize("pull_based", [True, False]) +def test_healthz_head(pull_based, monkeypatch, ray_start_cluster): + monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false") dashboard_port = find_free_port() h = ray_start_cluster.add_node(dashboard_port=dashboard_port) uri = f"http://localhost:{dashboard_port}/api/gcs_healthz" @@ -20,7 +22,9 @@ def test_healthz_head(ray_start_cluster): assert "Read timed out" in str(e) -def test_healthz_agent_1(ray_start_cluster): +@pytest.mark.parametrize("pull_based", [True, False]) +def test_healthz_agent_1(pull_based, monkeypatch, ray_start_cluster): + monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false") agent_port = find_free_port() h = ray_start_cluster.add_node(dashboard_agent_listen_port=agent_port) uri = f"http://localhost:{agent_port}/api/local_raylet_healthz" @@ -32,9 +36,16 @@ def test_healthz_agent_1(ray_start_cluster): assert requests.get(uri).status_code == 200 +@pytest.mark.parametrize("pull_based", [True, False]) @pytest.mark.skipif(sys.platform == "win32", reason="SIGSTOP only on posix") -def test_healthz_agent_2(monkeypatch, ray_start_cluster): - monkeypatch.setenv("RAY_num_heartbeats_timeout", "3") +def test_healthz_agent_2(pull_based, monkeypatch, ray_start_cluster): + monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false") + if pull_based: + monkeypatch.setenv("RAY_health_check_failure_threshold", "3") + monkeypatch.setenv("RAY_health_check_timeout_ms", "100") + monkeypatch.setenv("RAY_health_check_initial_delay_ms", "0") + else: + monkeypatch.setenv("RAY_num_heartbeats_timeout", "3") agent_port = find_free_port() h = ray_start_cluster.add_node(dashboard_agent_listen_port=agent_port) diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 20b089a94a34..d68985f4068b 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -80,3 +80,13 @@ cdef extern from "ray/common/ray_config.h" nogil: c_string REDIS_CLIENT_KEY() const c_string REDIS_SERVER_NAME() const + + c_bool pull_based_healthcheck() const + + int64_t health_check_initial_delay_ms() const + + int64_t health_check_period_ms() const + + int64_t health_check_timeout_ms() const + + int64_t health_check_failure_threshold() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index aade455e2fc0..8f80c30cf621 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -136,3 +136,23 @@ cdef class Config: @staticmethod def REDIS_SERVER_NAME(): return RayConfig.instance().REDIS_SERVER_NAME() + + @staticmethod + def pull_based_healthcheck(): + return RayConfig.instance().pull_based_healthcheck() + + @staticmethod + def health_check_initial_delay_ms(): + return RayConfig.instance().health_check_initial_delay_ms() + + @staticmethod + def health_check_period_ms(): + return RayConfig.instance().health_check_period_ms() + + @staticmethod + def health_check_timeout_ms(): + return RayConfig.instance().health_check_timeout_ms() + + @staticmethod + def health_check_failure_threshold(): + return RayConfig.instance().health_check_failure_threshold() diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 9d385fde6e54..6a4d6c3016ae 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -562,12 +562,24 @@ async def f(self): { "num_cpus": 0, "_system_config": { + "pull_based_healthcheck": False, "raylet_death_check_interval_milliseconds": 10 * 1000, "num_heartbeats_timeout": 10, "raylet_heartbeat_period_milliseconds": 100, "timeout_ms_task_wait_for_death_info": 100, }, - } + }, + { + "num_cpus": 0, + "_system_config": { + "pull_based_healthcheck": True, + "raylet_death_check_interval_milliseconds": 10 * 1000, + "health_check_initial_delay_ms": 0, + "health_check_failure_threshold": 10, + "health_check_period_ms": 100, + "timeout_ms_task_wait_for_death_info": 100, + }, + }, ], indirect=True, ) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index b5f0fe99f2b8..293f6c0d9f75 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -44,7 +44,6 @@ "ray_object_directory_lookups", "ray_object_directory_added_locations", "ray_object_directory_removed_locations", - "ray_heartbeat_report_ms_sum", "ray_process_startup_time_ms_sum", "ray_internal_num_processes_started", "ray_internal_num_spilled_tasks", @@ -78,6 +77,9 @@ if not ray._raylet.Config.use_ray_syncer(): _METRICS.append("ray_outbound_heartbeat_size_kb_sum") +if not ray._raylet.Config.pull_based_healthcheck(): + _METRICS.append("ray_heartbeat_report_ms_sum") + # This list of metrics should be kept in sync with # ray/python/ray/autoscaler/_private/prom_metrics.py _AUTOSCALER_METRICS = [ diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 36c7391be8c2..328ed76d43bf 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -43,8 +43,16 @@ def test_shutdown(): "ray_start_cluster_head", [ generate_system_config_map( - num_heartbeats_timeout=3, object_timeout_milliseconds=12345 - ) + num_heartbeats_timeout=3, + object_timeout_milliseconds=12345, + pull_based_healthcheck=False, + ), + generate_system_config_map( + health_check_initial_delay_ms=0, + health_check_failure_threshold=3, + object_timeout_milliseconds=12345, + pull_based_healthcheck=True, + ), ], indirect=True, ) @@ -63,7 +71,11 @@ def test_system_config(ray_start_cluster_head): @ray.remote def f(): assert ray._config.object_timeout_milliseconds() == 12345 - assert ray._config.num_heartbeats_timeout() == 3 + if ray._config.pull_based_healthcheck(): + assert ray._config.health_check_initial_delay_ms() == 0 + assert ray._config.health_check_failure_threshold() == 3 + else: + assert ray._config.num_heartbeats_timeout() == 3 ray.get([f.remote() for _ in range(5)]) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7ff8b67ec59f..5afbc849217e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -699,3 +699,10 @@ RAY_CONFIG(std::string, REDIS_SERVER_NAME, "") // The delay is a random number between the interval. If method equals '*', // it will apply to all methods. RAY_CONFIG(std::string, testing_asio_delay_us, "") + +/// A feature flag to enable pull based health check. +RAY_CONFIG(bool, pull_based_healthcheck, true) +RAY_CONFIG(int64_t, health_check_initial_delay_ms, 1000) +RAY_CONFIG(int64_t, health_check_period_ms, 1000) +RAY_CONFIG(int64_t, health_check_timeout_ms, 5000) +RAY_CONFIG(int64_t, health_check_failure_threshold, 5) diff --git a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc index ec3f479eb192..5b70732392e2 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc @@ -68,14 +68,14 @@ class GcsClientReconnectionTest : public ::testing::Test { auto channel = grpc::CreateChannel(absl::StrCat("127.0.0.1:", config_.grpc_server_port), grpc::InsecureChannelCredentials()); - std::unique_ptr stub = - rpc::NodeInfoGcsService::NewStub(std::move(channel)); + auto stub = grpc::health::v1::Health::NewStub(channel); grpc::ClientContext context; context.set_deadline(std::chrono::system_clock::now() + 1s); - const rpc::CheckAliveRequest request; - rpc::CheckAliveReply reply; - auto status = stub->CheckAlive(&context, request, &reply); - if (!status.ok()) { + ::grpc::health::v1::HealthCheckRequest request; + ::grpc::health::v1::HealthCheckResponse reply; + auto status = stub->Check(&context, request, &reply); + if (!status.ok() || + reply.status() != ::grpc::health::v1::HealthCheckResponse::SERVING) { RAY_LOG(WARNING) << "Unable to reach GCS: " << status.error_code() << " " << status.error_message(); return false; diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc new file mode 100644 index 000000000000..032b7ae28802 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -0,0 +1,121 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/gcs_server/gcs_health_check_manager.h" + +namespace ray { +namespace gcs { + +GcsHealthCheckManager::GcsHealthCheckManager( + instrumented_io_context &io_service, + std::function on_node_death_callback, + int64_t initial_delay_ms, + int64_t timeout_ms, + int64_t period_ms, + int64_t failure_threshold) + : io_service_(io_service), + on_node_death_callback_(on_node_death_callback), + initial_delay_ms_(initial_delay_ms), + timeout_ms_(timeout_ms), + period_ms_(period_ms), + failure_threshold_(failure_threshold) {} + +GcsHealthCheckManager::~GcsHealthCheckManager() {} + +void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { + io_service_.dispatch( + [this, node_id]() { + auto iter = inflight_health_checks_.find(node_id); + if (iter == inflight_health_checks_.end()) { + return; + } + inflight_health_checks_.erase(iter); + }, + "GcsHealthCheckManager::RemoveNode"); +} + +void GcsHealthCheckManager::FailNode(const NodeID &node_id) { + RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed."; + on_node_death_callback_(node_id); + inflight_health_checks_.erase(node_id); +} + +std::vector GcsHealthCheckManager::GetAllNodes() const { + std::vector nodes; + for(const auto& [node_id, _] : inflight_health_checks_) { + nodes.empalce_back(node_id); + } + return nodes; +} + +void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { + using ::grpc::health::v1::HealthCheckResponse; + + context = std::make_shared(); + + auto deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(manager->timeout_ms_); + context->set_deadline(deadline); + stub->async()->Check( + context.get(), + &request, + &response, + [this, context = this->context](::grpc::Status status) { + manager->io_service_.post( + [this, status]() { + if (status.error_code() == ::grpc::StatusCode::CANCELLED) { + return; + } + + RAY_LOG(DEBUG) << "Health check status: " << int(response.status()); + + if (status.ok() && response.status() == HealthCheckResponse::SERVING) { + // Health check passed + health_check_remaining = manager->failure_threshold_; + } else { + --health_check_remaining; + RAY_LOG(WARNING) << "Health check failed for node " << node_id + << ", remaining checks " << health_check_remaining; + } + + if (health_check_remaining == 0) { + manager->io_service_.post([this]() { manager->FailNode(node_id); }, ""); + } else { + // Do another health check. + timer.expires_from_now( + boost::posix_time::milliseconds(manager->period_ms_)); + timer.async_wait([this](auto ec) { + if (ec != boost::asio::error::operation_aborted) { + StartHealthCheck(); + } + }); + } + }, + "HealthCheck"); + }); +} + +void GcsHealthCheckManager::AddNode(const NodeID &node_id, + std::shared_ptr channel) { + io_service_.dispatch( + [this, channel, node_id]() { + RAY_CHECK(inflight_health_checks_.count(node_id) == 0); + auto context = std::make_unique(this, channel, node_id); + inflight_health_checks_.emplace(std::make_pair(node_id, std::move(context))); + }, + "GcsHealthCheckManager::AddNode"); +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h new file mode 100644 index 000000000000..6e73e89c86b7 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -0,0 +1,151 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "absl/container/flat_hash_map.h" +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/id.h" +#include "ray/common/ray_config.h" +#include "src/proto/grpc/health/v1/health.grpc.pb.h" + +class GcsHealthCheckManagerTest; + +namespace ray { +namespace gcs { + +/// GcsHealthCheckManager is used to track the healthiness of the nodes in the ray +/// cluster. The health check is done in pull based way, which means this module will send +/// health check to the raylets to see whether the raylet is healthy or not. If the raylet +/// is not healthy for certain times, the module will think the raylet dead. +class GcsHealthCheckManager { + public: + /// Constructor of GcsHealthCheckManager. + /// + /// \param io_service The thread where all operations in this class should run. + /// \param on_node_death_callback The callback function when some node is marked as + /// failure. \param initial_delay_ms The delay for the first health check. \param + /// period_ms The interval between two health checks for the same node. \param + /// failure_threshold The threshold before a node will be marked as dead due to health + /// check failure. + GcsHealthCheckManager( + instrumented_io_context &io_service, + std::function on_node_death_callback, + int64_t initial_delay_ms = RayConfig::instance().health_check_initial_delay_ms(), + int64_t timeout_ms = RayConfig::instance().health_check_timeout_ms(), + int64_t period_ms = RayConfig::instance().health_check_period_ms(), + int64_t failure_threshold = RayConfig::instance().health_check_failure_threshold()); + + ~GcsHealthCheckManager(); + + /// Start to track the healthiness of a node. + /// + /// \param node_id The id of the node. + /// \param channel The gRPC channel to the node. + void AddNode(const NodeID &node_id, std::shared_ptr channel); + + /// Stop tracking the healthiness of a node. + /// + /// \param node_id The id of the node to stop tracking. + void RemoveNode(const NodeID &node_id); + + /// Return all the nodes monitored. + /// + /// \return A list of node id which are being monitored by this class. + std::vector GetAllNodes() const; + private: + + void FailNode(const NodeID &node_id); + + /// This is for testing. We'll use mock timer in gtest. +#if defined(_TESTING_RAY_TIMER) + using Timer = _TESTING_RAY_TIMER; +#else + using Timer = boost::asio::deadline_timer; +#endif + + /// The context for the health check. It's to support unary call. + /// It can be updated to support streaming call for efficiency. + class HealthCheckContext { + public: + HealthCheckContext(GcsHealthCheckManager *_manager, + std::shared_ptr channel, + NodeID node_id) + : manager(_manager), + node_id(node_id), + timer(manager->io_service_), + health_check_remaining(manager->failure_threshold_) { + stub = grpc::health::v1::Health::NewStub(channel); + timer.expires_from_now(boost::posix_time::milliseconds(manager->initial_delay_ms_)); + timer.async_wait([this](auto ec) { + if (ec != boost::asio::error::operation_aborted) { + StartHealthCheck(); + } + }); + } + + ~HealthCheckContext() { + timer.cancel(); + if (context != nullptr) { + context->TryCancel(); + } + } + + private: + void StartHealthCheck(); + + GcsHealthCheckManager *manager; + + NodeID node_id; + + /// gRPC related fields + std::unique_ptr<::grpc::health::v1::Health::Stub> stub; + + // The context is used in the gRPC callback which is in another + // thread, so we need it to be a shared_ptr. + std::shared_ptr context; + ::grpc::health::v1::HealthCheckRequest request; + ::grpc::health::v1::HealthCheckResponse response; + + /// The timer is used to do async wait before the next try. + Timer timer; + + /// The remaining check left. If it reaches 0, the node will be marked as dead. + int64_t health_check_remaining; + }; + + /// The main service. All method needs to run on this thread. + instrumented_io_context &io_service_; + + /// Callback when the node failed. + std::function on_node_death_callback_; + + /// The context of the health check for each nodes. + absl::flat_hash_map> inflight_health_checks_; + + /// The delay for the first health check request. + const int64_t initial_delay_ms_; + /// Timeout for each health check request. + const int64_t timeout_ms_; + /// Intervals between two health check. + const int64_t period_ms_; + /// The number of failures before the node is considered as dead. + const int64_t failure_threshold_; + +}; + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h index c788e0de636f..2124ba669068 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h @@ -1,4 +1,3 @@ - // Copyright 2017 The Ray Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 5ac3400553f2..fb202d6d28b7 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -178,7 +178,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // be run. Otherwise the node failure detector will mistake // some living nodes as dead as the timer inside node failure // detector is already run. - gcs_heartbeat_manager_->Start(); + if (gcs_heartbeat_manager_) { + gcs_heartbeat_manager_->Start(); + } RecordMetrics(); @@ -206,7 +208,9 @@ void GcsServer::Stop() { // GcsHeartbeatManager is still checking nodes' heartbeat timeout. Since RPC Server // won't handle heartbeat calls anymore, some nodes will be marked as dead during this // time, causing many nodes die after GCS's failure. - gcs_heartbeat_manager_->Stop(); + if (gcs_heartbeat_manager_) { + gcs_heartbeat_manager_->Stop(); + } if (RayConfig::instance().use_ray_syncer()) { ray_syncer_io_context_.stop(); ray_syncer_thread_->join(); @@ -239,19 +243,35 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_node_manager_); - gcs_heartbeat_manager_ = std::make_shared( - heartbeat_manager_io_service_, /*on_node_death_callback=*/ - [this](const NodeID &node_id) { - main_service_.post( - [this, node_id] { return gcs_node_manager_->OnNodeFailure(node_id); }, - "GcsServer.NodeDeathCallback"); - }); - // Initialize by gcs tables data. - gcs_heartbeat_manager_->Initialize(gcs_init_data); - // Register service. - heartbeat_info_service_.reset(new rpc::HeartbeatInfoGrpcService( - heartbeat_manager_io_service_, *gcs_heartbeat_manager_)); - rpc_server_.RegisterService(*heartbeat_info_service_); + auto node_death_callback = [this](const NodeID &node_id) { + main_service_.post( + [this, node_id] { return gcs_node_manager_->OnNodeFailure(node_id); }, + "GcsServer.NodeDeathCallback"); + }; + + if (RayConfig::instance().pull_based_healthcheck()) { + gcs_healthcheck_manager_ = + std::make_unique(main_service_, node_death_callback); + for (const auto &item : gcs_init_data.Nodes()) { + if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { + rpc::Address remote_address; + remote_address.set_raylet_id(item.second.node_id()); + remote_address.set_ip_address(item.second.node_manager_address()); + remote_address.set_port(item.second.node_manager_port()); + auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(remote_address); + gcs_healthcheck_manager_->AddNode(item.first, raylet_client->GetChannel()); + } + } + } else { + gcs_heartbeat_manager_ = std::make_shared( + heartbeat_manager_io_service_, /*on_node_death_callback=*/node_death_callback); + // Initialize by gcs tables data. + gcs_heartbeat_manager_->Initialize(gcs_init_data); + // Register service. + heartbeat_info_service_.reset(new rpc::HeartbeatInfoGrpcService( + heartbeat_manager_io_service_, *gcs_heartbeat_manager_)); + rpc_server_.RegisterService(*heartbeat_info_service_); + } } void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { @@ -599,15 +619,26 @@ void GcsServer::InstallEventListeners() { gcs_resource_manager_->OnNodeAdd(*node); gcs_placement_group_manager_->OnNodeAdd(node_id); gcs_actor_manager_->SchedulePendingActors(); - gcs_heartbeat_manager_->AddNode(*node); + if (gcs_heartbeat_manager_) { + gcs_heartbeat_manager_->AddNode(*node); + } + + rpc::Address address; + address.set_raylet_id(node->node_id()); + address.set_ip_address(node->node_manager_address()); + address.set_port(node->node_manager_port()); + + auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(address); + + if (gcs_healthcheck_manager_) { + RAY_CHECK(raylet_client != nullptr); + auto channel = raylet_client->GetChannel(); + RAY_CHECK(channel != nullptr); + gcs_healthcheck_manager_->AddNode(node_id, channel); + } cluster_task_manager_->ScheduleAndDispatchTasks(); - if (RayConfig::instance().use_ray_syncer()) { - rpc::Address address; - address.set_raylet_id(node->node_id()); - address.set_ip_address(node->node_manager_address()); - address.set_port(node->node_manager_port()); - auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(address); + if (RayConfig::instance().use_ray_syncer()) { ray_syncer_->Connect(raylet_client->GetChannel()); } else { gcs_ray_syncer_->AddNode(*node); @@ -623,7 +654,14 @@ void GcsServer::InstallEventListeners() { gcs_placement_group_manager_->OnNodeDead(node_id); gcs_actor_manager_->OnNodeDead(node_id, node_ip_address); raylet_client_pool_->Disconnect(node_id); - gcs_heartbeat_manager_->RemoveNode(node_id); + if (gcs_heartbeat_manager_) { + gcs_heartbeat_manager_->RemoveNode(node_id); + } + + if (gcs_healthcheck_manager_) { + gcs_healthcheck_manager_->RemoveNode(node_id); + } + if (RayConfig::instance().use_ray_syncer()) { ray_syncer_->Disconnect(node_id.Binary()); } else { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 6ca89364fd04..5fbee9ef60d4 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -18,6 +18,7 @@ #include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/runtime_env_manager.h" #include "ray/gcs/gcs_server/gcs_function_manager.h" +#include "ray/gcs/gcs_server/gcs_health_check_manager.h" #include "ray/gcs/gcs_server/gcs_heartbeat_manager.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_kv_manager.h" @@ -198,6 +199,8 @@ class GcsServer { std::shared_ptr cluster_task_manager_; /// The gcs node manager. std::shared_ptr gcs_node_manager_; + /// The health check manager. + std::shared_ptr gcs_healthcheck_manager_; /// The heartbeat manager. std::shared_ptr gcs_heartbeat_manager_; /// The gcs redis failure detector. diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc new file mode 100644 index 000000000000..53e601ba380d --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -0,0 +1,274 @@ +// Copyright 2020-2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include + +using namespace boost; + +// Copied from +// https://stackoverflow.com/questions/14191855/how-do-you-mock-the-time-for-boost-timers +class mock_time_traits { + typedef boost::asio::deadline_timer::traits_type source_traits; + + public: + typedef source_traits::time_type time_type; + typedef source_traits::duration_type duration_type; + + // Note this implemenation requires set_now(...) to be called before now() + static time_type now() { return *now_; } + + // After modifying the clock, we need to sleep the thread to give the io_service + // the opportunity to poll and notice the change in clock time + static void set_now(time_type t) { + now_ = t; + boost::this_thread::sleep_for(boost::chrono::milliseconds(2)); + } + + static time_type add(time_type t, duration_type d) { return source_traits::add(t, d); } + static duration_type subtract(time_type t1, time_type t2) { + return source_traits::subtract(t1, t2); + } + static bool less_than(time_type t1, time_type t2) { + return source_traits::less_than(t1, t2); + } + + // This function is called by asio to determine how often to check + // if the timer is ready to fire. By manipulating this function, we + // can make sure asio detects changes to now_ in a timely fashion. + static boost::posix_time::time_duration to_posix_duration(duration_type d) { + return d < boost::posix_time::milliseconds(1) ? d + : boost::posix_time::milliseconds(1); + } + + private: + static boost::optional now_; +}; + +boost::optional mock_time_traits::now_; + +using mock_deadline_timer = + boost::asio::basic_deadline_timer; + +#define _TESTING_RAY_TIMER mock_deadline_timer + +#include +#include + +#include + +#include "gtest/gtest.h" +#include "ray/gcs/gcs_server/gcs_health_check_manager.h" + +using namespace ray; +using namespace std::literals::chrono_literals; + +class GcsHealthCheckManagerTest : public ::testing::Test { + protected: + void SetUp() override { + grpc::EnableDefaultHealthCheckService(true); + mock_time_traits::set_now(boost::posix_time::time_from_string("2022-01-20 0:0:0.000")); + health_check = std::make_unique( + io_service, [this](const NodeID &id) { dead_nodes.insert(id); }); + port = 10000; + } + + void TearDown() override { + io_service.poll(); + io_service.stop(); + } + + NodeID AddServer() { + std::promise port_promise; + auto node_id = NodeID::FromRandom(); + + auto server = std::make_shared(node_id.Hex(), port, true); + + auto channel = grpc::CreateChannel("localhost:" + std::to_string(port), + grpc::InsecureChannelCredentials()); + server->Run(); + servers.emplace(node_id, server); + health_check->AddNode(node_id, channel); + ++port; + return node_id; + } + + void StopServing(const NodeID &id) { + auto iter = servers.find(id); + RAY_CHECK(iter != servers.end()); + iter->second->GetServer().GetHealthCheckService()->SetServingStatus(false); + } + + void StartServing(const NodeID &id) { + auto iter = servers.find(id); + RAY_CHECK(iter != servers.end()); + iter->second->GetServer().GetHealthCheckService()->SetServingStatus(true); + } + + void DeleteServer(const NodeID &id) { + auto iter = servers.find(id); + if (iter != servers.end()) { + iter->second->Shutdown(); + servers.erase(iter); + } + } + + + void AdvanceInitialDelay() { + auto delta = boost::posix_time::millisec(RayConfig::instance().health_check_initial_delay_ms() + 10); + mock_time_traits::set_now(mock_time_traits::now() + delta); + } + + void AdvanceNextDelay() { + auto delta = boost::posix_time::millisec(RayConfig::instance().health_check_period_ms() + 10); + mock_time_traits::set_now(mock_time_traits::now() + delta); + } + + void Run(size_t n = 1) { + while(n) { + auto i = io_service.run_one(); + n -= i; + io_service.restart(); + } + } + + int port; + std::unordered_map> servers; + std::unique_ptr health_check; + instrumented_io_context io_service; + + std::unordered_set dead_nodes; +}; + +TEST_F(GcsHealthCheckManagerTest, TestBasic) { + auto node_id = AddServer(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + // Run the first health check + AdvanceInitialDelay(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + ASSERT_TRUE(dead_nodes.empty()); + StopServing(node_id); + + for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + } + + Run(); // For failure callback. + + ASSERT_EQ(1, dead_nodes.size()); + ASSERT_TRUE(dead_nodes.count(node_id)); +} + +TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { + auto node_id = AddServer(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + // Run the first health check + AdvanceInitialDelay(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + ASSERT_TRUE(dead_nodes.empty()); + StopServing(node_id); + + for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + if(i == (RayConfig::instance().health_check_failure_threshold()) / 2) { + StartServing(node_id); + } + } + + Run(); // For failure callback. + + ASSERT_EQ(0, dead_nodes.size()); +} + +TEST_F(GcsHealthCheckManagerTest, Crashed) { + auto node_id = AddServer(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + // Run the first health check + AdvanceInitialDelay(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + ASSERT_TRUE(dead_nodes.empty()); + + // Check it again + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + ASSERT_TRUE(dead_nodes.empty()); + + DeleteServer(node_id); + + for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + } + + Run(); // For failure callback. + + ASSERT_EQ(1, dead_nodes.size()); + ASSERT_TRUE(dead_nodes.count(node_id)); +} + +TEST_F(GcsHealthCheckManagerTest, NodeRemoved) { + auto node_id = AddServer(); + io_service.run(); + ASSERT_TRUE(dead_nodes.empty()); + + // Run the first health check + AdvanceInitialDelay(); + Run(); + ASSERT_TRUE(dead_nodes.empty()); + + AdvanceNextDelay(); + Run(2); // One for starting RPC and one for the RPC callback. + ASSERT_TRUE(dead_nodes.empty()); + health_check->RemoveNode(node_id); + + // Make sure it's not monitored any more + for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + AdvanceNextDelay(); + io_service.poll(); + } + + ASSERT_EQ(0, dead_nodes.size()); + ASSERT_EQ(0, health_check->GetAllNodes().size()); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 5ea69fa41ee3..61cb5c8a06e4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -14,6 +14,7 @@ #include "gtest/gtest.h" #include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/ray_config.h" #include "ray/common/test_util.h" #include "ray/gcs/gcs_server/gcs_server.h" #include "ray/gcs/test/gcs_test_util.h" @@ -319,10 +320,12 @@ TEST_F(GcsServerTest, TestNodeInfo) { ASSERT_TRUE(node_info_list[0].state() == rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE); - // Report heartbeat - rpc::ReportHeartbeatRequest report_heartbeat_request; - report_heartbeat_request.mutable_heartbeat()->set_node_id(gcs_node_info->node_id()); - ASSERT_TRUE(ReportHeartbeat(report_heartbeat_request)); + if (!RayConfig::instance().pull_based_healthcheck()) { + // Report heartbeat + rpc::ReportHeartbeatRequest report_heartbeat_request; + report_heartbeat_request.mutable_heartbeat()->set_node_id(gcs_node_info->node_id()); + ASSERT_TRUE(ReportHeartbeat(report_heartbeat_request)); + } // Unregister node info rpc::DrainNodeRequest unregister_node_info_request; @@ -336,6 +339,9 @@ TEST_F(GcsServerTest, TestNodeInfo) { } TEST_F(GcsServerTest, TestHeartbeatWithNoRegistering) { + if (RayConfig::instance().pull_based_healthcheck()) { + GTEST_SKIP(); + } // Create gcs node info auto gcs_node_info = Mocker::GenNodeInfo(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 78d7586b491a..7a431e369363 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -462,7 +462,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service, ray::Status NodeManager::RegisterGcs() { // Start sending heartbeat here to ensure it happening after raylet being registered. - heartbeat_sender_.reset(new HeartbeatSender(self_node_id_, gcs_client_)); + if (!RayConfig::instance().pull_based_healthcheck()) { + heartbeat_sender_.reset(new HeartbeatSender(self_node_id_, gcs_client_)); + } auto on_node_change = [this](const NodeID &node_id, const GcsNodeInfo &data) { if (data.state() == GcsNodeInfo::ALIVE) { NodeAdded(data); diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 67eb82aa7eb7..256e9aabe62f 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -118,6 +118,8 @@ class GrpcServer { void RegisterService(GrpcService &service); void RegisterService(grpc::Service &service); + grpc::Server &GetServer() { return *server_; } + protected: /// This function runs in a background thread. It keeps polling events from the /// `ServerCompletionQueue`, and dispaches the event to the `ServiceHandler` instances From 8691d162fab59f2ebb4fe89c854828f894bfac78 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 05:36:57 +0000 Subject: [PATCH 02/17] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/gcs_health_check_manager.cc | 2 +- .../gcs/gcs_server/gcs_health_check_manager.h | 6 +-- .../test/gcs_health_check_manager_test.cc | 52 ++++++++++--------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 032b7ae28802..92a4a3dfa884 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -53,7 +53,7 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) { std::vector GcsHealthCheckManager::GetAllNodes() const { std::vector nodes; - for(const auto& [node_id, _] : inflight_health_checks_) { + for (const auto &[node_id, _] : inflight_health_checks_) { nodes.empalce_back(node_id); } return nodes; diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index 6e73e89c86b7..34358eb2c449 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -66,8 +66,8 @@ class GcsHealthCheckManager { /// /// \return A list of node id which are being monitored by this class. std::vector GetAllNodes() const; - private: + private: void FailNode(const NodeID &node_id); /// This is for testing. We'll use mock timer in gtest. @@ -134,7 +134,8 @@ class GcsHealthCheckManager { std::function on_node_death_callback_; /// The context of the health check for each nodes. - absl::flat_hash_map> inflight_health_checks_; + absl::flat_hash_map> + inflight_health_checks_; /// The delay for the first health check request. const int64_t initial_delay_ms_; @@ -144,7 +145,6 @@ class GcsHealthCheckManager { const int64_t period_ms_; /// The number of failures before the node is considered as dead. const int64_t failure_threshold_; - }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 53e601ba380d..53f71a3672ec 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -69,11 +69,11 @@ using mock_deadline_timer = #define _TESTING_RAY_TIMER mock_deadline_timer -#include -#include - #include +#include +#include + #include "gtest/gtest.h" #include "ray/gcs/gcs_server/gcs_health_check_manager.h" @@ -84,7 +84,8 @@ class GcsHealthCheckManagerTest : public ::testing::Test { protected: void SetUp() override { grpc::EnableDefaultHealthCheckService(true); - mock_time_traits::set_now(boost::posix_time::time_from_string("2022-01-20 0:0:0.000")); + mock_time_traits::set_now( + boost::posix_time::time_from_string("2022-01-20 0:0:0.000")); health_check = std::make_unique( io_service, [this](const NodeID &id) { dead_nodes.insert(id); }); port = 10000; @@ -130,19 +131,20 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } } - void AdvanceInitialDelay() { - auto delta = boost::posix_time::millisec(RayConfig::instance().health_check_initial_delay_ms() + 10); + auto delta = boost::posix_time::millisec( + RayConfig::instance().health_check_initial_delay_ms() + 10); mock_time_traits::set_now(mock_time_traits::now() + delta); } void AdvanceNextDelay() { - auto delta = boost::posix_time::millisec(RayConfig::instance().health_check_period_ms() + 10); + auto delta = + boost::posix_time::millisec(RayConfig::instance().health_check_period_ms() + 10); mock_time_traits::set_now(mock_time_traits::now() + delta); } void Run(size_t n = 1) { - while(n) { + while (n) { auto i = io_service.run_one(); n -= i; io_service.restart(); @@ -168,16 +170,16 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { ASSERT_TRUE(dead_nodes.empty()); AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); StopServing(node_id); - for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. } - Run(); // For failure callback. + Run(); // For failure callback. ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); @@ -194,19 +196,19 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { ASSERT_TRUE(dead_nodes.empty()); AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); StopServing(node_id); - for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. - if(i == (RayConfig::instance().health_check_failure_threshold()) / 2) { + Run(2); // One for starting RPC and one for the RPC callback. + if (i == (RayConfig::instance().health_check_failure_threshold()) / 2) { StartServing(node_id); } } - Run(); // For failure callback. + Run(); // For failure callback. ASSERT_EQ(0, dead_nodes.size()); } @@ -222,22 +224,22 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) { ASSERT_TRUE(dead_nodes.empty()); AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); // Check it again AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); DeleteServer(node_id); - for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. } - Run(); // For failure callback. + Run(); // For failure callback. ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); @@ -254,12 +256,12 @@ TEST_F(GcsHealthCheckManagerTest, NodeRemoved) { ASSERT_TRUE(dead_nodes.empty()); AdvanceNextDelay(); - Run(2); // One for starting RPC and one for the RPC callback. + Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); health_check->RemoveNode(node_id); // Make sure it's not monitored any more - for(auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { + for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { AdvanceNextDelay(); io_service.poll(); } @@ -270,5 +272,5 @@ TEST_F(GcsHealthCheckManagerTest, NodeRemoved) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); + return RUN_ALL_TESTS(); } From 9203adb4e22f1c80801fb00750116843ddfdf28b Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 05:48:58 +0000 Subject: [PATCH 03/17] up Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs/gcs_server/gcs_health_check_manager.cc | 2 +- .../test/gcs_health_check_manager_test.cc | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 92a4a3dfa884..81cbfce1449a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -54,7 +54,7 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) { std::vector GcsHealthCheckManager::GetAllNodes() const { std::vector nodes; for (const auto &[node_id, _] : inflight_health_checks_) { - nodes.empalce_back(node_id); + nodes.emplace_back(node_id); } return nodes; } diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 53f71a3672ec..8c5b5c47e97a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -144,6 +144,13 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } void Run(size_t n = 1) { + // If n == 0 it mean we just run it and return. + if(n == 0) { + io_service.run(); + io_service.restart(); + return; + } + while (n) { auto i = io_service.run_one(); n -= i; @@ -161,7 +168,7 @@ class GcsHealthCheckManagerTest : public ::testing::Test { TEST_F(GcsHealthCheckManagerTest, TestBasic) { auto node_id = AddServer(); - Run(); + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check @@ -187,7 +194,7 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { auto node_id = AddServer(); - Run(); + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check @@ -215,7 +222,7 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { TEST_F(GcsHealthCheckManagerTest, Crashed) { auto node_id = AddServer(); - Run(); + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check @@ -247,7 +254,7 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) { TEST_F(GcsHealthCheckManagerTest, NodeRemoved) { auto node_id = AddServer(); - io_service.run(); + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check From 5313f650bd2a0c4561099246504249feb8505aef Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 05:49:11 +0000 Subject: [PATCH 04/17] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/test/gcs_health_check_manager_test.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 8c5b5c47e97a..1bc2c6025caf 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -145,7 +145,7 @@ class GcsHealthCheckManagerTest : public ::testing::Test { void Run(size_t n = 1) { // If n == 0 it mean we just run it and return. - if(n == 0) { + if (n == 0) { io_service.run(); io_service.restart(); return; @@ -168,7 +168,7 @@ class GcsHealthCheckManagerTest : public ::testing::Test { TEST_F(GcsHealthCheckManagerTest, TestBasic) { auto node_id = AddServer(); - Run(0); // Initial run + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check @@ -194,7 +194,7 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { auto node_id = AddServer(); - Run(0); // Initial run + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check @@ -222,7 +222,7 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { TEST_F(GcsHealthCheckManagerTest, Crashed) { auto node_id = AddServer(); - Run(0); // Initial run + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check @@ -254,7 +254,7 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) { TEST_F(GcsHealthCheckManagerTest, NodeRemoved) { auto node_id = AddServer(); - Run(0); // Initial run + Run(0); // Initial run ASSERT_TRUE(dead_nodes.empty()); // Run the first health check From 9ddd08b346802afa370035941f030625e6c6fa16 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 05:49:56 +0000 Subject: [PATCH 05/17] turn it off Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/common/ray_config_def.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 5afbc849217e..3d70ae08fd45 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -701,7 +701,8 @@ RAY_CONFIG(std::string, REDIS_SERVER_NAME, "") RAY_CONFIG(std::string, testing_asio_delay_us, "") /// A feature flag to enable pull based health check. -RAY_CONFIG(bool, pull_based_healthcheck, true) +/// TODO: Turn it on by default +RAY_CONFIG(bool, pull_based_healthcheck, false) RAY_CONFIG(int64_t, health_check_initial_delay_ms, 1000) RAY_CONFIG(int64_t, health_check_period_ms, 1000) RAY_CONFIG(int64_t, health_check_timeout_ms, 5000) From de6eea8bbc0b8c5df6abaed2619cc8fd5c8176ea Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 23:12:36 +0000 Subject: [PATCH 06/17] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 8 ++++---- .../gcs/gcs_server/test/gcs_health_check_manager_test.cc | 8 +++++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 81cbfce1449a..d914fdde4d3f 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -72,12 +72,12 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { &request, &response, [this, context = this->context](::grpc::Status status) { + if (status.error_code() == ::grpc::StatusCode::CANCELLED) { + return; + } + manager->io_service_.post( [this, status]() { - if (status.error_code() == ::grpc::StatusCode::CANCELLED) { - return; - } - RAY_LOG(DEBUG) << "Health check status: " << int(response.status()); if (status.ok() && response.status() == HealthCheckResponse::SERVING) { diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 1bc2c6025caf..e430c54369a4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -94,6 +94,9 @@ class GcsHealthCheckManagerTest : public ::testing::Test { void TearDown() override { io_service.poll(); io_service.stop(); + for (auto [_, server] : servers) { + server->Shutdown(); + } } NodeID AddServer() { @@ -159,10 +162,9 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } int port; - std::unordered_map> servers; - std::unique_ptr health_check; instrumented_io_context io_service; - + std::unique_ptr health_check; + std::unordered_map> servers; std::unordered_set dead_nodes; }; From f16d691db2cbcc4d5447b36b8ecf7ca6166ce2fe Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 21 Oct 2022 23:22:17 +0000 Subject: [PATCH 07/17] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index e430c54369a4..79f7937533d1 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -94,6 +94,8 @@ class GcsHealthCheckManagerTest : public ::testing::Test { void TearDown() override { io_service.poll(); io_service.stop(); + + // Stop the servers. for (auto [_, server] : servers) { server->Shutdown(); } From 067c6af3123e8a93ea49295ba86d24bdd36941ab Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 24 Oct 2022 21:06:20 +0000 Subject: [PATCH 08/17] fix comments Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/gcs_health_check_manager.cc | 14 +++++++------- .../gcs/gcs_server/gcs_health_check_manager.h | 19 +++++++++++++------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index d914fdde4d3f..02b357532f6a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -36,11 +36,11 @@ GcsHealthCheckManager::~GcsHealthCheckManager() {} void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { io_service_.dispatch( [this, node_id]() { - auto iter = inflight_health_checks_.find(node_id); - if (iter == inflight_health_checks_.end()) { + auto iter = health_check_contexts_.find(node_id); + if (iter == health_check_contexts_.end()) { return; } - inflight_health_checks_.erase(iter); + health_check_contexts_.erase(iter); }, "GcsHealthCheckManager::RemoveNode"); } @@ -48,12 +48,12 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { void GcsHealthCheckManager::FailNode(const NodeID &node_id) { RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed."; on_node_death_callback_(node_id); - inflight_health_checks_.erase(node_id); + health_check_contexts_.erase(node_id); } std::vector GcsHealthCheckManager::GetAllNodes() const { std::vector nodes; - for (const auto &[node_id, _] : inflight_health_checks_) { + for (const auto &[node_id, _] : health_check_contexts_) { nodes.emplace_back(node_id); } return nodes; @@ -110,9 +110,9 @@ void GcsHealthCheckManager::AddNode(const NodeID &node_id, std::shared_ptr channel) { io_service_.dispatch( [this, channel, node_id]() { - RAY_CHECK(inflight_health_checks_.count(node_id) == 0); + RAY_CHECK(health_check_contexts_.count(node_id) == 0); auto context = std::make_unique(this, channel, node_id); - inflight_health_checks_.emplace(std::make_pair(node_id, std::move(context))); + health_check_contexts_.emplace(std::make_pair(node_id, std::move(context))); }, "GcsHealthCheckManager::AddNode"); } diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index 34358eb2c449..073151f23dc8 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -31,16 +31,24 @@ namespace gcs { /// cluster. The health check is done in pull based way, which means this module will send /// health check to the raylets to see whether the raylet is healthy or not. If the raylet /// is not healthy for certain times, the module will think the raylet dead. +/// When the node is dead a callback passed in the constructor will be called and this +/// node will be removed from GcsHealthCheckManager. The node can be added into this class +/// later. Although the same node id is not supposed to be reused in ray cluster, this is +/// not enforced in this class. The implementation of this class try its bes not to couple +/// itself with GCS so we can move it to ray/common/health_check_manager.h in the future. +/// TODO (iycheng): Move the GcsHealthCheckManager to common and decouple it from GCS +/// completely. class GcsHealthCheckManager { public: /// Constructor of GcsHealthCheckManager. /// /// \param io_service The thread where all operations in this class should run. /// \param on_node_death_callback The callback function when some node is marked as - /// failure. \param initial_delay_ms The delay for the first health check. \param - /// period_ms The interval between two health checks for the same node. \param - /// failure_threshold The threshold before a node will be marked as dead due to health - /// check failure. + /// failure. + /// \param initial_delay_ms The delay for the first health check. + /// \param period_ms The interval between two health checks for the same node. + /// \param failure_threshold The threshold before a node will be marked as dead due to + /// health check failure. GcsHealthCheckManager( instrumented_io_context &io_service, std::function on_node_death_callback, @@ -134,8 +142,7 @@ class GcsHealthCheckManager { std::function on_node_death_callback_; /// The context of the health check for each nodes. - absl::flat_hash_map> - inflight_health_checks_; + absl::flat_hash_map> health_check_contexts_; /// The delay for the first health check request. const int64_t initial_delay_ms_; From 131f4713213063c06e8b38edfdb6c8b879cc7e8f Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 26 Oct 2022 02:13:41 +0000 Subject: [PATCH 09/17] fix mac test Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/gcs_health_check_manager.cc | 47 ++++++++++--------- .../gcs/gcs_server/gcs_health_check_manager.h | 44 +++++++++-------- .../test/gcs_health_check_manager_test.cc | 3 ++ 3 files changed, 54 insertions(+), 40 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 02b357532f6a..ed888509aad7 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -62,40 +62,45 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; - context = std::make_shared(); + context_ = std::make_shared(); auto deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(manager->timeout_ms_); - context->set_deadline(deadline); - stub->async()->Check( - context.get(), - &request, - &response, - [this, context = this->context](::grpc::Status status) { + std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_); + context_->set_deadline(deadline); + stub_->async()->Check( + context_.get(), + &request_, + &response_, + [this, stopped = this->stopped_, context = this->context_](::grpc::Status status) { if (status.error_code() == ::grpc::StatusCode::CANCELLED) { return; } - manager->io_service_.post( - [this, status]() { - RAY_LOG(DEBUG) << "Health check status: " << int(response.status()); + manager_->io_service_.post( + [this, stopped, status]() { + // Stopped has to be read in the same thread where it's updated. + if (*stopped) { + return; + } + RAY_LOG(DEBUG) << "Health check status: " << int(response_.status()); - if (status.ok() && response.status() == HealthCheckResponse::SERVING) { + if (status.ok() && response_.status() == HealthCheckResponse::SERVING) { // Health check passed - health_check_remaining = manager->failure_threshold_; + health_check_remaining_ = manager_->failure_threshold_; } else { - --health_check_remaining; - RAY_LOG(WARNING) << "Health check failed for node " << node_id - << ", remaining checks " << health_check_remaining; + --health_check_remaining_; + RAY_LOG(WARNING) << "Health check failed for node " << node_id_ + << ", remaining checks " << health_check_remaining_; } - if (health_check_remaining == 0) { - manager->io_service_.post([this]() { manager->FailNode(node_id); }, ""); + if (health_check_remaining_ == 0) { + manager_->io_service_.post([this]() { manager_->FailNode(node_id_); }, + ""); } else { // Do another health check. - timer.expires_from_now( - boost::posix_time::milliseconds(manager->period_ms_)); - timer.async_wait([this](auto ec) { + timer_.expires_from_now( + boost::posix_time::milliseconds(manager_->period_ms_)); + timer_.async_wait([this](auto ec) { if (ec != boost::asio::error::operation_aborted) { StartHealthCheck(); } diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index 073151f23dc8..1807a78951aa 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -89,16 +89,18 @@ class GcsHealthCheckManager { /// It can be updated to support streaming call for efficiency. class HealthCheckContext { public: - HealthCheckContext(GcsHealthCheckManager *_manager, + HealthCheckContext(GcsHealthCheckManager *manager, std::shared_ptr channel, NodeID node_id) - : manager(_manager), - node_id(node_id), - timer(manager->io_service_), - health_check_remaining(manager->failure_threshold_) { - stub = grpc::health::v1::Health::NewStub(channel); - timer.expires_from_now(boost::posix_time::milliseconds(manager->initial_delay_ms_)); - timer.async_wait([this](auto ec) { + : manager_(manager), + node_id_(node_id), + stopped_(std::make_shared(false)), + timer_(manager->io_service_), + health_check_remaining_(manager->failure_threshold_) { + stub_ = grpc::health::v1::Health::NewStub(channel); + timer_.expires_from_now( + boost::posix_time::milliseconds(manager_->initial_delay_ms_)); + timer_.async_wait([this](auto ec) { if (ec != boost::asio::error::operation_aborted) { StartHealthCheck(); } @@ -106,33 +108,37 @@ class GcsHealthCheckManager { } ~HealthCheckContext() { - timer.cancel(); - if (context != nullptr) { - context->TryCancel(); + timer_.cancel(); + if (context_ != nullptr) { + context_->TryCancel(); } + *stopped_ = true; } private: void StartHealthCheck(); - GcsHealthCheckManager *manager; + GcsHealthCheckManager *manager_; - NodeID node_id; + NodeID node_id_; + + // Whether the health check has stopped. + std::shared_ptr stopped_; /// gRPC related fields - std::unique_ptr<::grpc::health::v1::Health::Stub> stub; + std::unique_ptr<::grpc::health::v1::Health::Stub> stub_; // The context is used in the gRPC callback which is in another // thread, so we need it to be a shared_ptr. - std::shared_ptr context; - ::grpc::health::v1::HealthCheckRequest request; - ::grpc::health::v1::HealthCheckResponse response; + std::shared_ptr context_; + ::grpc::health::v1::HealthCheckRequest request_; + ::grpc::health::v1::HealthCheckResponse response_; /// The timer is used to do async wait before the next try. - Timer timer; + Timer timer_; /// The remaining check left. If it reaches 0, the node will be marked as dead. - int64_t health_check_remaining; + int64_t health_check_remaining_; }; /// The main service. All method needs to run on this thread. diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 79f7937533d1..2566a045aadf 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -99,6 +99,9 @@ class GcsHealthCheckManagerTest : public ::testing::Test { for (auto [_, server] : servers) { server->Shutdown(); } + + // Allow gRPC to cleanup. + boost::this_thread::sleep_for(boost::chrono::seconds(2)); } NodeID AddServer() { From 61002410d2db7bfed160a71e81cfc3cb4c478a78 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 31 Oct 2022 17:21:11 +0000 Subject: [PATCH 10/17] fix some comments Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs/gcs_server/gcs_health_check_manager.cc | 16 ++++++++++++---- .../gcs/gcs_server/gcs_health_check_manager.h | 12 +++++++----- src/ray/gcs/gcs_server/gcs_server.cc | 3 +++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index ed888509aad7..c7b92a191b1d 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -29,7 +29,13 @@ GcsHealthCheckManager::GcsHealthCheckManager( initial_delay_ms_(initial_delay_ms), timeout_ms_(timeout_ms), period_ms_(period_ms), - failure_threshold_(failure_threshold) {} + failure_threshold_(failure_threshold) { + RAY_CHECK(on_node_death_callback != nullptr); + RAY_CHECK(initial_delay_ms >= 0); + RAY_CHECK(timeout_ms >= 0); + RAY_CHECK(period_ms >= 0); + RAY_CHECK(failure_threshold >= 0); +} GcsHealthCheckManager::~GcsHealthCheckManager() {} @@ -72,10 +78,10 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { &request_, &response_, [this, stopped = this->stopped_, context = this->context_](::grpc::Status status) { + // This callback is done in gRPC's thread pool. if (status.error_code() == ::grpc::StatusCode::CANCELLED) { return; } - manager_->io_service_.post( [this, stopped, status]() { // Stopped has to be read in the same thread where it's updated. @@ -100,8 +106,10 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { // Do another health check. timer_.expires_from_now( boost::posix_time::milliseconds(manager_->period_ms_)); - timer_.async_wait([this](auto ec) { - if (ec != boost::asio::error::operation_aborted) { + timer_.async_wait([this, stopped](auto ec) { + // We need to check stopped here as well since cancel + // won't impact the queued tasks. + if (ec != boost::asio::error::operation_aborted && !*stopped) { StartHealthCheck(); } }); diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index 1807a78951aa..6cc416b54f82 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -30,14 +30,12 @@ namespace gcs { /// GcsHealthCheckManager is used to track the healthiness of the nodes in the ray /// cluster. The health check is done in pull based way, which means this module will send /// health check to the raylets to see whether the raylet is healthy or not. If the raylet -/// is not healthy for certain times, the module will think the raylet dead. +/// is not healthy for certain times, the module will think the raylet is dead. /// When the node is dead a callback passed in the constructor will be called and this /// node will be removed from GcsHealthCheckManager. The node can be added into this class /// later. Although the same node id is not supposed to be reused in ray cluster, this is -/// not enforced in this class. The implementation of this class try its bes not to couple -/// itself with GCS so we can move it to ray/common/health_check_manager.h in the future. -/// TODO (iycheng): Move the GcsHealthCheckManager to common and decouple it from GCS -/// completely. +/// not enforced in this class. +/// TODO (iycheng): Move the GcsHealthCheckManager to ray/common. class GcsHealthCheckManager { public: /// Constructor of GcsHealthCheckManager. @@ -76,6 +74,10 @@ class GcsHealthCheckManager { std::vector GetAllNodes() const; private: + /// Fail a node when health check failed. It'll stop the health checking and + /// call on_node_death_callback. + /// + /// \param node_id The id of the node. void FailNode(const NodeID &node_id); /// This is for testing. We'll use mock timer in gtest. diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index af502d3706cf..d9e6c174c77a 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -181,6 +181,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { if (gcs_heartbeat_manager_) { gcs_heartbeat_manager_->Start(); } + RAY_CHECK(int(gcs_heartbeat_manager_ != nullptr) + + int(gcs_healthcheck_manager_ != nullptr) == + 1); RecordMetrics(); From 702ababbaa6a2b1c1a4fbad43c45c5eb3c72696e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 31 Oct 2022 17:45:16 +0000 Subject: [PATCH 11/17] fix add metrics Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_metrics_agent.py | 5 ++++- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 12 +++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 293f6c0d9f75..ffa4d77fb1cf 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -77,7 +77,10 @@ if not ray._raylet.Config.use_ray_syncer(): _METRICS.append("ray_outbound_heartbeat_size_kb_sum") -if not ray._raylet.Config.pull_based_healthcheck(): +if ray._raylet.Config.pull_based_healthcheck(): + print("???:") + _METRICS.append("ray_health_check_rpc_latency_ms") +else: _METRICS.append("ray_heartbeat_report_ms_sum") # This list of metrics should be kept in sync with diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index c7b92a191b1d..831f345ef81a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -14,6 +14,13 @@ #include "ray/gcs/gcs_server/gcs_health_check_manager.h" +#include "ray/stats/metric.h" +DEFINE_stats(health_check_rpc_latency_ms, + "Latency of rpc request for health check.", + (), + ({1, 10, 100, 1000, 10000}, ), + ray::stats::HISTOGRAM); + namespace ray { namespace gcs { @@ -77,8 +84,11 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { context_.get(), &request_, &response_, - [this, stopped = this->stopped_, context = this->context_](::grpc::Status status) { + [this, stopped = this->stopped_, context = this->context_, now = absl::Now()]( + ::grpc::Status status) { // This callback is done in gRPC's thread pool. + STATS_health_check_rpc_latency_ms.Record( + absl::ToInt64Milliseconds(absl::Now() - now)); if (status.error_code() == ::grpc::StatusCode::CANCELLED) { return; } From f779d4e6255b8b90d59e60730af0c49ca57a1f02 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 31 Oct 2022 18:15:32 +0000 Subject: [PATCH 12/17] fix metrics Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_metrics_agent.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index ffa4d77fb1cf..62f7e7632558 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -77,12 +77,6 @@ if not ray._raylet.Config.use_ray_syncer(): _METRICS.append("ray_outbound_heartbeat_size_kb_sum") -if ray._raylet.Config.pull_based_healthcheck(): - print("???:") - _METRICS.append("ray_health_check_rpc_latency_ms") -else: - _METRICS.append("ray_heartbeat_report_ms_sum") - # This list of metrics should be kept in sync with # ray/python/ray/autoscaler/_private/prom_metrics.py _AUTOSCALER_METRICS = [ @@ -115,6 +109,10 @@ "dashboard_api_requests_count", ] +if ray._raylet.Config.pull_based_healthcheck(): + _METRICS.append("ray_health_check_rpc_latency_ms_sum") +else: + _METRICS.append("ray_heartbeat_report_ms_sum") @pytest.fixture def _setup_cluster_for_test(request, ray_start_cluster): From cc79f6ad29862cda8742a74691f91172b599f97d Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 31 Oct 2022 18:15:54 +0000 Subject: [PATCH 13/17] lint Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_failure.py | 8 ++++---- python/ray/tests/test_metrics_agent.py | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index a2acc327bcd9..6a4d6c3016ae 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -367,11 +367,11 @@ def foo(): @pytest.mark.skip("This test does not work yet.") -@pytest.mark.parametrize("ray_start_object_store_memory", [10**6], indirect=True) +@pytest.mark.parametrize("ray_start_object_store_memory", [10 ** 6], indirect=True) def test_put_error1(ray_start_object_store_memory, error_pubsub): p = error_pubsub num_objects = 3 - object_size = 4 * 10**5 + object_size = 4 * 10 ** 5 # Define a task with a single dependency, a numpy array, that returns # another array. @@ -412,11 +412,11 @@ def put_arg_task(): @pytest.mark.skip("This test does not work yet.") -@pytest.mark.parametrize("ray_start_object_store_memory", [10**6], indirect=True) +@pytest.mark.parametrize("ray_start_object_store_memory", [10 ** 6], indirect=True) def test_put_error2(ray_start_object_store_memory): # This is the same as the previous test, but it calls ray.put directly. num_objects = 3 - object_size = 4 * 10**5 + object_size = 4 * 10 ** 5 # Define a task with a single dependency, a numpy array, that returns # another array. diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 0cbfd858aaef..9963e372dcfd 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -152,6 +152,7 @@ else: _METRICS.append("ray_heartbeat_report_ms_sum") + @pytest.fixture def _setup_cluster_for_test(request, ray_start_cluster): enable_metrics_collection = request.param From 386bac3396e561f81cf6b49d1b088a190365cfc1 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 1 Nov 2022 05:49:25 +0000 Subject: [PATCH 14/17] fix comments Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/common/ray_config_def.h | 7 +- .../gcs/gcs_server/gcs_health_check_manager.h | 5 - .../test/gcs_health_check_manager_test.cc | 97 ++++--------------- 3 files changed, 21 insertions(+), 88 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 6263fd7ad94f..dc4c7a338d9b 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -703,7 +703,8 @@ RAY_CONFIG(std::string, testing_asio_delay_us, "") /// A feature flag to enable pull based health check. /// TODO: Turn it on by default RAY_CONFIG(bool, pull_based_healthcheck, false) -RAY_CONFIG(int64_t, health_check_initial_delay_ms, 1000) -RAY_CONFIG(int64_t, health_check_period_ms, 1000) -RAY_CONFIG(int64_t, health_check_timeout_ms, 5000) +RAY_CONFIG(bool, pull_based_healthcheck, true) +RAY_CONFIG(int64_t, health_check_initial_delay_ms, 5000) +RAY_CONFIG(int64_t, health_check_period_ms, 3000) +RAY_CONFIG(int64_t, health_check_timeout_ms, 10000) RAY_CONFIG(int64_t, health_check_failure_threshold, 5) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index 6cc416b54f82..b504ce079343 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -80,12 +80,7 @@ class GcsHealthCheckManager { /// \param node_id The id of the node. void FailNode(const NodeID &node_id); - /// This is for testing. We'll use mock timer in gtest. -#if defined(_TESTING_RAY_TIMER) - using Timer = _TESTING_RAY_TIMER; -#else using Timer = boost::asio::deadline_timer; -#endif /// The context for the health check. It's to support unary call. /// It can be updated to support streaming call for efficiency. diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 2566a045aadf..d28e263008da 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -22,53 +22,6 @@ #include using namespace boost; - -// Copied from -// https://stackoverflow.com/questions/14191855/how-do-you-mock-the-time-for-boost-timers -class mock_time_traits { - typedef boost::asio::deadline_timer::traits_type source_traits; - - public: - typedef source_traits::time_type time_type; - typedef source_traits::duration_type duration_type; - - // Note this implemenation requires set_now(...) to be called before now() - static time_type now() { return *now_; } - - // After modifying the clock, we need to sleep the thread to give the io_service - // the opportunity to poll and notice the change in clock time - static void set_now(time_type t) { - now_ = t; - boost::this_thread::sleep_for(boost::chrono::milliseconds(2)); - } - - static time_type add(time_type t, duration_type d) { return source_traits::add(t, d); } - static duration_type subtract(time_type t1, time_type t2) { - return source_traits::subtract(t1, t2); - } - static bool less_than(time_type t1, time_type t2) { - return source_traits::less_than(t1, t2); - } - - // This function is called by asio to determine how often to check - // if the timer is ready to fire. By manipulating this function, we - // can make sure asio detects changes to now_ in a timely fashion. - static boost::posix_time::time_duration to_posix_duration(duration_type d) { - return d < boost::posix_time::milliseconds(1) ? d - : boost::posix_time::milliseconds(1); - } - - private: - static boost::optional now_; -}; - -boost::optional mock_time_traits::now_; - -using mock_deadline_timer = - boost::asio::basic_deadline_timer; - -#define _TESTING_RAY_TIMER mock_deadline_timer - #include #include @@ -82,12 +35,17 @@ using namespace std::literals::chrono_literals; class GcsHealthCheckManagerTest : public ::testing::Test { protected: + GcsHealthCheckManagerTest() {} void SetUp() override { grpc::EnableDefaultHealthCheckService(true); - mock_time_traits::set_now( - boost::posix_time::time_from_string("2022-01-20 0:0:0.000")); + health_check = std::make_unique( - io_service, [this](const NodeID &id) { dead_nodes.insert(id); }); + io_service, + [this](const NodeID &id) { dead_nodes.insert(id); }, + initial_delay_ms, + timeout_ms, + period_ms, + failure_threshold); port = 10000; } @@ -139,18 +97,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } } - void AdvanceInitialDelay() { - auto delta = boost::posix_time::millisec( - RayConfig::instance().health_check_initial_delay_ms() + 10); - mock_time_traits::set_now(mock_time_traits::now() + delta); - } - - void AdvanceNextDelay() { - auto delta = - boost::posix_time::millisec(RayConfig::instance().health_check_period_ms() + 10); - mock_time_traits::set_now(mock_time_traits::now() + delta); - } - void Run(size_t n = 1) { // If n == 0 it mean we just run it and return. if (n == 0) { @@ -171,6 +117,10 @@ class GcsHealthCheckManagerTest : public ::testing::Test { std::unique_ptr health_check; std::unordered_map> servers; std::unordered_set dead_nodes; + const int64_t initial_delay_ms = 1000; + const int64_t timeout_ms = 1000; + const int64_t period_ms = 1000; + const int64_t failure_threshold = 5; }; TEST_F(GcsHealthCheckManagerTest, TestBasic) { @@ -179,17 +129,14 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { ASSERT_TRUE(dead_nodes.empty()); // Run the first health check - AdvanceInitialDelay(); Run(); ASSERT_TRUE(dead_nodes.empty()); - AdvanceNextDelay(); Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); StopServing(node_id); - for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { - AdvanceNextDelay(); + for (auto i = 0; i < failure_threshold; ++i) { Run(2); // One for starting RPC and one for the RPC callback. } @@ -205,19 +152,16 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { ASSERT_TRUE(dead_nodes.empty()); // Run the first health check - AdvanceInitialDelay(); Run(); ASSERT_TRUE(dead_nodes.empty()); - AdvanceNextDelay(); Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); StopServing(node_id); - for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { - AdvanceNextDelay(); + for (auto i = 0; i < failure_threshold; ++i) { Run(2); // One for starting RPC and one for the RPC callback. - if (i == (RayConfig::instance().health_check_failure_threshold()) / 2) { + if (i == failure_threshold / 2) { StartServing(node_id); } } @@ -233,23 +177,19 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) { ASSERT_TRUE(dead_nodes.empty()); // Run the first health check - AdvanceInitialDelay(); Run(); ASSERT_TRUE(dead_nodes.empty()); - AdvanceNextDelay(); Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); // Check it again - AdvanceNextDelay(); Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); DeleteServer(node_id); - for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { - AdvanceNextDelay(); + for (auto i = 0; i < failure_threshold; ++i) { Run(2); // One for starting RPC and one for the RPC callback. } @@ -265,18 +205,15 @@ TEST_F(GcsHealthCheckManagerTest, NodeRemoved) { ASSERT_TRUE(dead_nodes.empty()); // Run the first health check - AdvanceInitialDelay(); Run(); ASSERT_TRUE(dead_nodes.empty()); - AdvanceNextDelay(); Run(2); // One for starting RPC and one for the RPC callback. ASSERT_TRUE(dead_nodes.empty()); health_check->RemoveNode(node_id); // Make sure it's not monitored any more - for (auto i = 0; i < RayConfig::instance().health_check_failure_threshold(); ++i) { - AdvanceNextDelay(); + for (auto i = 0; i < failure_threshold; ++i) { io_service.poll(); } From f9508bc70e2d96aed6a0c0c7841ea70d683a9b51 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 1 Nov 2022 06:27:45 +0000 Subject: [PATCH 15/17] add event Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/common/ray_config_def.h | 1 - src/ray/raylet/node_manager.cc | 9 +++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index dc4c7a338d9b..58e0e6b90329 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -703,7 +703,6 @@ RAY_CONFIG(std::string, testing_asio_delay_us, "") /// A feature flag to enable pull based health check. /// TODO: Turn it on by default RAY_CONFIG(bool, pull_based_healthcheck, false) -RAY_CONFIG(bool, pull_based_healthcheck, true) RAY_CONFIG(int64_t, health_check_initial_delay_ms, 5000) RAY_CONFIG(int64_t, health_check_period_ms, 3000) RAY_CONFIG(int64_t, health_check_timeout_ms, 10000) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 14d49198d382..24ed5e3f2bf2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -170,6 +170,7 @@ void HeartbeatSender::Heartbeat() { RAY_CHECK_OK( gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) { if (status.IsDisconnected()) { + RAY_EVENT(FATAL, "RAYLET_MARKED_DEAD") << "This node has beem marked as dead."; RAY_LOG(FATAL) << "This node has beem marked as dead."; } })); @@ -1028,6 +1029,14 @@ void NodeManager::NodeRemoved(const NodeID &node_id) { if (node_id == self_node_id_) { if (!is_node_drained_) { + // TODO(iycheng): Don't duplicate log here once we enable event by default. + RAY_EVENT(FATAL, "RAYLET_MARKED_DEAD") + << "[Timeout] Exiting because this node manager has mistakenly been marked as " + "dead by the " + << "GCS: GCS didn't receive heartbeats from this node for " + << RayConfig::instance().num_heartbeats_timeout() * + RayConfig::instance().raylet_heartbeat_period_milliseconds() + << " ms. This is likely because the machine or raylet has become overloaded."; RAY_LOG(FATAL) << "[Timeout] Exiting because this node manager has mistakenly been marked as " "dead by the " From e4882d2d2826ab0ac9f4739cb7bdc56fe9280667 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 1 Nov 2022 17:57:13 +0000 Subject: [PATCH 16/17] lint Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_failure.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 6a4d6c3016ae..a2acc327bcd9 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -367,11 +367,11 @@ def foo(): @pytest.mark.skip("This test does not work yet.") -@pytest.mark.parametrize("ray_start_object_store_memory", [10 ** 6], indirect=True) +@pytest.mark.parametrize("ray_start_object_store_memory", [10**6], indirect=True) def test_put_error1(ray_start_object_store_memory, error_pubsub): p = error_pubsub num_objects = 3 - object_size = 4 * 10 ** 5 + object_size = 4 * 10**5 # Define a task with a single dependency, a numpy array, that returns # another array. @@ -412,11 +412,11 @@ def put_arg_task(): @pytest.mark.skip("This test does not work yet.") -@pytest.mark.parametrize("ray_start_object_store_memory", [10 ** 6], indirect=True) +@pytest.mark.parametrize("ray_start_object_store_memory", [10**6], indirect=True) def test_put_error2(ray_start_object_store_memory): # This is the same as the previous test, but it calls ray.put directly. num_objects = 3 - object_size = 4 * 10 ** 5 + object_size = 4 * 10**5 # Define a task with a single dependency, a numpy array, that returns # another array. From 02b34696338958bb4343ea6c3258a04b56ed6334 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 1 Nov 2022 18:09:24 +0000 Subject: [PATCH 17/17] fix test Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- dashboard/modules/healthz/tests/test_healthz.py | 1 + python/ray/tests/test_multi_node_2.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/dashboard/modules/healthz/tests/test_healthz.py b/dashboard/modules/healthz/tests/test_healthz.py index 836a064c42d0..727e1ac8a7bd 100644 --- a/dashboard/modules/healthz/tests/test_healthz.py +++ b/dashboard/modules/healthz/tests/test_healthz.py @@ -43,6 +43,7 @@ def test_healthz_agent_2(pull_based, monkeypatch, ray_start_cluster): if pull_based: monkeypatch.setenv("RAY_health_check_failure_threshold", "3") monkeypatch.setenv("RAY_health_check_timeout_ms", "100") + monkeypatch.setenv("RAY_health_check_period_ms", "1000") monkeypatch.setenv("RAY_health_check_initial_delay_ms", "0") else: monkeypatch.setenv("RAY_num_heartbeats_timeout", "3") diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 9d799152997a..d65b7937f68f 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -48,6 +48,7 @@ def test_shutdown(): ), generate_system_config_map( health_check_initial_delay_ms=0, + health_check_period_ms=1000, health_check_failure_threshold=3, object_timeout_milliseconds=12345, pull_based_healthcheck=True, @@ -73,6 +74,7 @@ def f(): if ray._config.pull_based_healthcheck(): assert ray._config.health_check_initial_delay_ms() == 0 assert ray._config.health_check_failure_threshold() == 3 + assert ray._config.health_check_period_ms() == 1000 else: assert ray._config.num_heartbeats_timeout() == 3