Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce pull based health check to GCS. #29442

Merged
merged 20 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ cc_library(
":scheduler",
":worker_rpc",
"@boost//:bimap",
"@com_github_grpc_grpc//src/proto/grpc/health/v1:health_proto",
"@com_google_absl//absl/container:btree",
],
)
Expand Down Expand Up @@ -1810,6 +1811,20 @@ cc_library(
],
)

cc_test(
name = "gcs_health_check_manager_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_node_manager_test",
size = "small",
Expand Down
19 changes: 15 additions & 4 deletions dashboard/modules/healthz/tests/test_healthz.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from ray._private.test_utils import find_free_port, wait_for_condition


def test_healthz_head(ray_start_cluster):
@pytest.mark.parametrize("pull_based", [True, False])
def test_healthz_head(pull_based, monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false")
dashboard_port = find_free_port()
h = ray_start_cluster.add_node(dashboard_port=dashboard_port)
uri = f"http://localhost:{dashboard_port}/api/gcs_healthz"
Expand All @@ -20,7 +22,9 @@ def test_healthz_head(ray_start_cluster):
assert "Read timed out" in str(e)


def test_healthz_agent_1(ray_start_cluster):
@pytest.mark.parametrize("pull_based", [True, False])
def test_healthz_agent_1(pull_based, monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false")
agent_port = find_free_port()
h = ray_start_cluster.add_node(dashboard_agent_listen_port=agent_port)
uri = f"http://localhost:{agent_port}/api/local_raylet_healthz"
Expand All @@ -32,9 +36,16 @@ def test_healthz_agent_1(ray_start_cluster):
assert requests.get(uri).status_code == 200


@pytest.mark.parametrize("pull_based", [True, False])
@pytest.mark.skipif(sys.platform == "win32", reason="SIGSTOP only on posix")
def test_healthz_agent_2(monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "3")
def test_healthz_agent_2(pull_based, monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_pull_based_healthcheck", "true" if pull_based else "false")
if pull_based:
monkeypatch.setenv("RAY_health_check_failure_threshold", "3")
monkeypatch.setenv("RAY_health_check_timeout_ms", "100")
monkeypatch.setenv("RAY_health_check_initial_delay_ms", "0")
else:
monkeypatch.setenv("RAY_num_heartbeats_timeout", "3")

agent_port = find_free_port()
h = ray_start_cluster.add_node(dashboard_agent_listen_port=agent_port)
Expand Down
10 changes: 10 additions & 0 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,13 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_string REDIS_CLIENT_KEY() const

c_string REDIS_SERVER_NAME() const

c_bool pull_based_healthcheck() const

int64_t health_check_initial_delay_ms() const

int64_t health_check_period_ms() const

int64_t health_check_timeout_ms() const

int64_t health_check_failure_threshold() const
20 changes: 20 additions & 0 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,23 @@ cdef class Config:
@staticmethod
def REDIS_SERVER_NAME():
return RayConfig.instance().REDIS_SERVER_NAME()

@staticmethod
def pull_based_healthcheck():
return RayConfig.instance().pull_based_healthcheck()

@staticmethod
def health_check_initial_delay_ms():
return RayConfig.instance().health_check_initial_delay_ms()

@staticmethod
def health_check_period_ms():
return RayConfig.instance().health_check_period_ms()

@staticmethod
def health_check_timeout_ms():
return RayConfig.instance().health_check_timeout_ms()

@staticmethod
def health_check_failure_threshold():
return RayConfig.instance().health_check_failure_threshold()
22 changes: 17 additions & 5 deletions python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,11 @@ def foo():


@pytest.mark.skip("This test does not work yet.")
@pytest.mark.parametrize("ray_start_object_store_memory", [10**6], indirect=True)
@pytest.mark.parametrize("ray_start_object_store_memory", [10 ** 6], indirect=True)
def test_put_error1(ray_start_object_store_memory, error_pubsub):
p = error_pubsub
num_objects = 3
object_size = 4 * 10**5
object_size = 4 * 10 ** 5

# Define a task with a single dependency, a numpy array, that returns
# another array.
Expand Down Expand Up @@ -412,11 +412,11 @@ def put_arg_task():


@pytest.mark.skip("This test does not work yet.")
@pytest.mark.parametrize("ray_start_object_store_memory", [10**6], indirect=True)
@pytest.mark.parametrize("ray_start_object_store_memory", [10 ** 6], indirect=True)
def test_put_error2(ray_start_object_store_memory):
# This is the same as the previous test, but it calls ray.put directly.
num_objects = 3
object_size = 4 * 10**5
object_size = 4 * 10 ** 5

# Define a task with a single dependency, a numpy array, that returns
# another array.
Expand Down Expand Up @@ -562,12 +562,24 @@ async def f(self):
{
"num_cpus": 0,
"_system_config": {
"pull_based_healthcheck": False,
"raylet_death_check_interval_milliseconds": 10 * 1000,
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"timeout_ms_task_wait_for_death_info": 100,
},
}
},
{
"num_cpus": 0,
"_system_config": {
"pull_based_healthcheck": True,
"raylet_death_check_interval_milliseconds": 10 * 1000,
"health_check_initial_delay_ms": 0,
"health_check_failure_threshold": 10,
"health_check_period_ms": 100,
"timeout_ms_task_wait_for_death_info": 100,
},
},
],
indirect=True,
)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
"ray_object_directory_lookups",
"ray_object_directory_added_locations",
"ray_object_directory_removed_locations",
"ray_heartbeat_report_ms_sum",
"ray_process_startup_time_ms_sum",
"ray_internal_num_processes_started",
"ray_internal_num_spilled_tasks",
Expand Down Expand Up @@ -148,6 +147,11 @@
"ray_component_uss_mb",
]

if ray._raylet.Config.pull_based_healthcheck():
_METRICS.append("ray_health_check_rpc_latency_ms_sum")
else:
_METRICS.append("ray_heartbeat_report_ms_sum")


@pytest.fixture
def _setup_cluster_for_test(request, ray_start_cluster):
Expand Down
18 changes: 15 additions & 3 deletions python/ray/tests/test_multi_node_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ def test_shutdown():
"ray_start_cluster_head",
[
generate_system_config_map(
num_heartbeats_timeout=3, object_timeout_milliseconds=12345
)
num_heartbeats_timeout=3,
object_timeout_milliseconds=12345,
pull_based_healthcheck=False,
),
generate_system_config_map(
health_check_initial_delay_ms=0,
health_check_failure_threshold=3,
object_timeout_milliseconds=12345,
pull_based_healthcheck=True,
),
],
indirect=True,
)
Expand All @@ -62,7 +70,11 @@ def test_system_config(ray_start_cluster_head):
@ray.remote
def f():
assert ray._config.object_timeout_milliseconds() == 12345
assert ray._config.num_heartbeats_timeout() == 3
if ray._config.pull_based_healthcheck():
assert ray._config.health_check_initial_delay_ms() == 0
assert ray._config.health_check_failure_threshold() == 3
else:
assert ray._config.num_heartbeats_timeout() == 3

ray.get([f.remote() for _ in range(5)])

Expand Down
8 changes: 8 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,11 @@ RAY_CONFIG(std::string, REDIS_SERVER_NAME, "")
// The delay is a random number between the interval. If method equals '*',
// it will apply to all methods.
RAY_CONFIG(std::string, testing_asio_delay_us, "")

/// A feature flag to enable pull based health check.
/// TODO: Turn it on by default
RAY_CONFIG(bool, pull_based_healthcheck, false)
RAY_CONFIG(int64_t, health_check_initial_delay_ms, 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel like it is unnecessary? It'd take 30 seconds until the node is marked as dead anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. 30s is old protocol. This is the new protocol. So here, if a node is dead, it only need to take 3s to mark the node dead.

RAY_CONFIG(int64_t, health_check_period_ms, 1000)
RAY_CONFIG(int64_t, health_check_timeout_ms, 5000)
RAY_CONFIG(int64_t, health_check_failure_threshold, 5)
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ class GcsClientReconnectionTest : public ::testing::Test {
auto channel =
grpc::CreateChannel(absl::StrCat("127.0.0.1:", config_.grpc_server_port),
grpc::InsecureChannelCredentials());
std::unique_ptr<rpc::NodeInfoGcsService::Stub> stub =
rpc::NodeInfoGcsService::NewStub(std::move(channel));
auto stub = grpc::health::v1::Health::NewStub(channel);
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
const rpc::CheckAliveRequest request;
rpc::CheckAliveReply reply;
auto status = stub->CheckAlive(&context, request, &reply);
if (!status.ok()) {
::grpc::health::v1::HealthCheckRequest request;
::grpc::health::v1::HealthCheckResponse reply;
auto status = stub->Check(&context, request, &reply);
if (!status.ok() ||
reply.status() != ::grpc::health::v1::HealthCheckResponse::SERVING) {
RAY_LOG(WARNING) << "Unable to reach GCS: " << status.error_code() << " "
<< status.error_message();
return false;
Expand Down
144 changes: 144 additions & 0 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_health_check_manager.h"

#include "ray/stats/metric.h"
DEFINE_stats(health_check_rpc_latency_ms,
"Latency of rpc request for health check.",
(),
({1, 10, 100, 1000, 10000}, ),
ray::stats::HISTOGRAM);

namespace ray {
namespace gcs {

GcsHealthCheckManager::GcsHealthCheckManager(
instrumented_io_context &io_service,
std::function<void(const NodeID &)> on_node_death_callback,
int64_t initial_delay_ms,
int64_t timeout_ms,
int64_t period_ms,
int64_t failure_threshold)
: io_service_(io_service),
on_node_death_callback_(on_node_death_callback),
initial_delay_ms_(initial_delay_ms),
timeout_ms_(timeout_ms),
period_ms_(period_ms),
failure_threshold_(failure_threshold) {
RAY_CHECK(on_node_death_callback != nullptr);
RAY_CHECK(initial_delay_ms >= 0);
RAY_CHECK(timeout_ms >= 0);
RAY_CHECK(period_ms >= 0);
RAY_CHECK(failure_threshold >= 0);
}

GcsHealthCheckManager::~GcsHealthCheckManager() {}

void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id]() {
auto iter = health_check_contexts_.find(node_id);
if (iter == health_check_contexts_.end()) {
return;
}
health_check_contexts_.erase(iter);
},
"GcsHealthCheckManager::RemoveNode");
}

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed.";
on_node_death_callback_(node_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to check if this is running inside io_service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's easy to do.
Also I don't think we should do this. It's totally normal for the callback running on another thread.
As how to make sure it's thread safe that's another story.
I think one practice is to always dispatch in the callback (this component is doing this, check the public api)

health_check_contexts_.erase(node_id);
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
std::vector<NodeID> nodes;
for (const auto &[node_id, _] : health_check_contexts_) {
nodes.emplace_back(node_id);
}
return nodes;
}

void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
using ::grpc::health::v1::HealthCheckResponse;
fishbone marked this conversation as resolved.
Show resolved Hide resolved

context_ = std::make_shared<grpc::ClientContext>();

auto deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_);
context_->set_deadline(deadline);
stub_->async()->Check(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we not include this to existing RPC paths? Like raylet_client.cc. Seems more complicated to have a separate code path like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exiting path is way more complicated than this one. We shouldn't do it just because we are doing it.
Think about this, how to write unit test with this kind of deep coupling.

context_.get(),
&request_,
&response_,
[this, stopped = this->stopped_, context = this->context_, now = absl::Now()](
::grpc::Status status) {
// This callback is done in gRPC's thread pool.
STATS_health_check_rpc_latency_ms.Record(
absl::ToInt64Milliseconds(absl::Now() - now));
if (status.error_code() == ::grpc::StatusCode::CANCELLED) {
fishbone marked this conversation as resolved.
Show resolved Hide resolved
return;
}
manager_->io_service_.post(
[this, stopped, status]() {
// Stopped has to be read in the same thread where it's updated.
if (*stopped) {
return;
}
RAY_LOG(DEBUG) << "Health check status: " << int(response_.status());

if (status.ok() && response_.status() == HealthCheckResponse::SERVING) {
// Health check passed
health_check_remaining_ = manager_->failure_threshold_;
} else {
--health_check_remaining_;
RAY_LOG(WARNING) << "Health check failed for node " << node_id_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe DEBUG? Feel like this may be too spammy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. This should only happen when error happened (scale down won't trigger this). If it got printed a lot maybe it means we should increase the interval?

<< ", remaining checks " << health_check_remaining_;
}

if (health_check_remaining_ == 0) {
manager_->io_service_.post([this]() { manager_->FailNode(node_id_); },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't need to post again? Since we are already in the same io service

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the node will make 'this' not valid. So run it in another place after this. I can put some comments there.

"");
} else {
// Do another health check.
timer_.expires_from_now(
boost::posix_time::milliseconds(manager_->period_ms_));
timer_.async_wait([this, stopped](auto ec) {
// We need to check stopped here as well since cancel
// won't impact the queued tasks.
if (ec != boost::asio::error::operation_aborted && !*stopped) {
StartHealthCheck();
}
});
}
},
"HealthCheck");
});
}

void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel, node_id]() {
RAY_CHECK(health_check_contexts_.count(node_id) == 0);
auto context = std::make_unique<HealthCheckContext>(this, channel, node_id);
health_check_contexts_.emplace(std::make_pair(node_id, std::move(context)));
},
"GcsHealthCheckManager::AddNode");
}

} // namespace gcs
} // namespace ray
Loading