Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce pull based health check to GCS. #29442

Merged
merged 20 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ cc_library(
":scheduler",
":worker_rpc",
"@boost//:bimap",
"@com_github_grpc_grpc//src/proto/grpc/health/v1:health_proto",
"@com_google_absl//absl/container:btree",
],
)
Expand Down Expand Up @@ -1810,6 +1811,20 @@ cc_library(
],
)

cc_test(
name = "gcs_health_check_manager_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_node_manager_test",
size = "small",
Expand Down
19 changes: 15 additions & 4 deletions dashboard/modules/healthz/tests/test_healthz.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from ray._private.test_utils import find_free_port, wait_for_condition


def test_healthz_head(ray_start_cluster):
@pytest.mark.parametrize("pull_based", [True, False])
def test_healthz_head(pull_based, monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false")
dashboard_port = find_free_port()
h = ray_start_cluster.add_node(dashboard_port=dashboard_port)
uri = f"http://localhost:{dashboard_port}/api/gcs_healthz"
Expand All @@ -20,7 +22,9 @@ def test_healthz_head(ray_start_cluster):
assert "Read timed out" in str(e)


def test_healthz_agent_1(ray_start_cluster):
@pytest.mark.parametrize("pull_based", [True, False])
def test_healthz_agent_1(pull_based, monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false")
agent_port = find_free_port()
h = ray_start_cluster.add_node(dashboard_agent_listen_port=agent_port)
uri = f"http://localhost:{agent_port}/api/local_raylet_healthz"
Expand All @@ -32,9 +36,16 @@ def test_healthz_agent_1(ray_start_cluster):
assert requests.get(uri).status_code == 200


@pytest.mark.parametrize("pull_based", [True, False])
@pytest.mark.skipif(sys.platform == "win32", reason="SIGSTOP only on posix")
def test_healthz_agent_2(monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "3")
def test_healthz_agent_2(pull_based, monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false")
if pull_based:
monkeypatch.setenv("RAY_health_check_failure_threshold", "3")
monkeypatch.setenv("RAY_health_check_timeout_ms", "100")
monkeypatch.setenv("RAY_health_check_initial_delay_ms", "0")
else:
monkeypatch.setenv("RAY_num_heartbeats_timeout", "3")

agent_port = find_free_port()
h = ray_start_cluster.add_node(dashboard_agent_listen_port=agent_port)
Expand Down
10 changes: 10 additions & 0 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,13 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_string REDIS_CLIENT_KEY() const

c_string REDIS_SERVER_NAME() const

c_bool pull_based_healthcheck() const

int64_t health_check_initial_delay_ms() const

int64_t health_check_period_ms() const

int64_t health_check_timeout_ms() const

int64_t health_check_failure_threshold() const
20 changes: 20 additions & 0 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,23 @@ cdef class Config:
@staticmethod
def REDIS_SERVER_NAME():
return RayConfig.instance().REDIS_SERVER_NAME()

@staticmethod
def pull_based_healthcheck():
return RayConfig.instance().pull_based_healthcheck()

@staticmethod
def health_check_initial_delay_ms():
return RayConfig.instance().health_check_initial_delay_ms()

@staticmethod
def health_check_period_ms():
return RayConfig.instance().health_check_period_ms()

@staticmethod
def health_check_timeout_ms():
return RayConfig.instance().health_check_timeout_ms()

@staticmethod
def health_check_failure_threshold():
return RayConfig.instance().health_check_failure_threshold()
14 changes: 13 additions & 1 deletion python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,24 @@ async def f(self):
{
"num_cpus": 0,
"_system_config": {
"pull_based_healthcheck": False,
"raylet_death_check_interval_milliseconds": 10 * 1000,
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"timeout_ms_task_wait_for_death_info": 100,
},
}
},
{
"num_cpus": 0,
"_system_config": {
"pull_based_healthcheck": True,
"raylet_death_check_interval_milliseconds": 10 * 1000,
"health_check_initial_delay_ms": 0,
"health_check_failure_threshold": 10,
"health_check_period_ms": 100,
"timeout_ms_task_wait_for_death_info": 100,
},
},
],
indirect=True,
)
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"ray_object_directory_lookups",
"ray_object_directory_added_locations",
"ray_object_directory_removed_locations",
"ray_heartbeat_report_ms_sum",
"ray_process_startup_time_ms_sum",
"ray_internal_num_processes_started",
"ray_internal_num_spilled_tasks",
Expand Down Expand Up @@ -78,6 +77,9 @@
if not ray._raylet.Config.use_ray_syncer():
_METRICS.append("ray_outbound_heartbeat_size_kb_sum")

if not ray._raylet.Config.pull_based_healthcheck():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding metrics for pull based heartbeat too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be in other PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add it in this PR

Copy link
Contributor Author

@fishbone fishbone Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkooo567 I think we should do a better job to monitor the RPC requests.
The cpp -> python -> opencensus is not a good practice I believe.

I can add gRPC metrics here. But in long term, we should do cpp -> opencensus agent directly.
We can use tools builtin gRPC to do this (https://github.com/grpc/grpc/blob/5f6c357e741207f4af7e3b642a486bdda12c93df/include/grpcpp/opencensus.h#L35)

_METRICS.append("ray_heartbeat_report_ms_sum")

# This list of metrics should be kept in sync with
# ray/python/ray/autoscaler/_private/prom_metrics.py
_AUTOSCALER_METRICS = [
Expand Down
18 changes: 15 additions & 3 deletions python/ray/tests/test_multi_node_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ def test_shutdown():
"ray_start_cluster_head",
[
generate_system_config_map(
num_heartbeats_timeout=3, object_timeout_milliseconds=12345
)
num_heartbeats_timeout=3,
object_timeout_milliseconds=12345,
pull_based_healthcheck=False,
),
generate_system_config_map(
health_check_initial_delay_ms=0,
health_check_failure_threshold=3,
object_timeout_milliseconds=12345,
pull_based_healthcheck=True,
),
],
indirect=True,
)
Expand All @@ -63,7 +71,11 @@ def test_system_config(ray_start_cluster_head):
@ray.remote
def f():
assert ray._config.object_timeout_milliseconds() == 12345
assert ray._config.num_heartbeats_timeout() == 3
if ray._config.pull_based_healthcheck():
assert ray._config.health_check_initial_delay_ms() == 0
assert ray._config.health_check_failure_threshold() == 3
else:
assert ray._config.num_heartbeats_timeout() == 3

ray.get([f.remote() for _ in range(5)])

Expand Down
8 changes: 8 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,11 @@ RAY_CONFIG(std::string, REDIS_SERVER_NAME, "")
// The delay is a random number between the interval. If method equals '*',
// it will apply to all methods.
RAY_CONFIG(std::string, testing_asio_delay_us, "")

/// A feature flag to enable pull based health check.
/// TODO: Turn it on by default
RAY_CONFIG(bool, pull_based_healthcheck, false)
RAY_CONFIG(int64_t, health_check_initial_delay_ms, 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel like it is unnecessary? It'd take 30 seconds until the node is marked as dead anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. 30s is old protocol. This is the new protocol. So here, if a node is dead, it only need to take 3s to mark the node dead.

RAY_CONFIG(int64_t, health_check_period_ms, 1000)
RAY_CONFIG(int64_t, health_check_timeout_ms, 5000)
RAY_CONFIG(int64_t, health_check_failure_threshold, 5)
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ class GcsClientReconnectionTest : public ::testing::Test {
auto channel =
grpc::CreateChannel(absl::StrCat("127.0.0.1:", config_.grpc_server_port),
grpc::InsecureChannelCredentials());
std::unique_ptr<rpc::NodeInfoGcsService::Stub> stub =
rpc::NodeInfoGcsService::NewStub(std::move(channel));
auto stub = grpc::health::v1::Health::NewStub(channel);
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
const rpc::CheckAliveRequest request;
rpc::CheckAliveReply reply;
auto status = stub->CheckAlive(&context, request, &reply);
if (!status.ok()) {
::grpc::health::v1::HealthCheckRequest request;
::grpc::health::v1::HealthCheckResponse reply;
auto status = stub->Check(&context, request, &reply);
if (!status.ok() ||
reply.status() != ::grpc::health::v1::HealthCheckResponse::SERVING) {
RAY_LOG(WARNING) << "Unable to reach GCS: " << status.error_code() << " "
<< status.error_message();
return false;
Expand Down
121 changes: 121 additions & 0 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_health_check_manager.h"

namespace ray {
namespace gcs {

GcsHealthCheckManager::GcsHealthCheckManager(
instrumented_io_context &io_service,
std::function<void(const NodeID &)> on_node_death_callback,
int64_t initial_delay_ms,
int64_t timeout_ms,
int64_t period_ms,
int64_t failure_threshold)
: io_service_(io_service),
on_node_death_callback_(on_node_death_callback),
initial_delay_ms_(initial_delay_ms),
timeout_ms_(timeout_ms),
period_ms_(period_ms),
failure_threshold_(failure_threshold) {}
fishbone marked this conversation as resolved.
Show resolved Hide resolved

GcsHealthCheckManager::~GcsHealthCheckManager() {}

void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id]() {
auto iter = inflight_health_checks_.find(node_id);
if (iter == inflight_health_checks_.end()) {
return;
}
inflight_health_checks_.erase(iter);
},
"GcsHealthCheckManager::RemoveNode");
}

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed.";
on_node_death_callback_(node_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to check if this is running inside io_service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's easy to do.
Also I don't think we should do this. It's totally normal for the callback running on another thread.
As how to make sure it's thread safe that's another story.
I think one practice is to always dispatch in the callback (this component is doing this, check the public api)

inflight_health_checks_.erase(node_id);
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
std::vector<NodeID> nodes;
for (const auto &[node_id, _] : inflight_health_checks_) {
nodes.emplace_back(node_id);
}
return nodes;
}

void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
using ::grpc::health::v1::HealthCheckResponse;
fishbone marked this conversation as resolved.
Show resolved Hide resolved

context = std::make_shared<grpc::ClientContext>();

auto deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(manager->timeout_ms_);
context->set_deadline(deadline);
stub->async()->Check(
context.get(),
&request,
&response,
[this, context = this->context](::grpc::Status status) {
if (status.error_code() == ::grpc::StatusCode::CANCELLED) {
fishbone marked this conversation as resolved.
Show resolved Hide resolved
return;
}

manager->io_service_.post(
[this, status]() {
RAY_LOG(DEBUG) << "Health check status: " << int(response.status());

if (status.ok() && response.status() == HealthCheckResponse::SERVING) {
// Health check passed
health_check_remaining = manager->failure_threshold_;
} else {
--health_check_remaining;
RAY_LOG(WARNING) << "Health check failed for node " << node_id
<< ", remaining checks " << health_check_remaining;
}

if (health_check_remaining == 0) {
manager->io_service_.post([this]() { manager->FailNode(node_id); }, "");
} else {
// Do another health check.
timer.expires_from_now(
boost::posix_time::milliseconds(manager->period_ms_));
timer.async_wait([this](auto ec) {
if (ec != boost::asio::error::operation_aborted) {
StartHealthCheck();
}
});
}
},
"HealthCheck");
});
}

void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel, node_id]() {
RAY_CHECK(inflight_health_checks_.count(node_id) == 0);
auto context = std::make_unique<HealthCheckContext>(this, channel, node_id);
inflight_health_checks_.emplace(std::make_pair(node_id, std::move(context)));
},
"GcsHealthCheckManager::AddNode");
}

} // namespace gcs
} // namespace ray
Loading