Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] add client side health-check to detect network failures. #31640

Merged
merged 10 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,17 @@ def init_grpc_channel(
asynchronous: bool = False,
):
grpc_module = aiogrpc if asynchronous else grpc

options = options or []
options_dict = dict(options)
options_dict["grpc.keepalive_time_ms"] = options_dict.get(
"grpc.keepalive_time_ms", ray._config.grpc_client_keepalive_time_ms()
)
options_dict["grpc.keepalive_timeout_ms"] = options_dict.get(
"grpc.keepalive_timeout_ms", ray._config.grpc_client_keepalive_timeout_ms()
)
options = options_dict.items()

if os.environ.get("RAY_USE_TLS", "0").lower() in ("1", "true"):
server_cert_chain, private_key, ca_cert = load_certs_from_env()
credentials = grpc.ssl_channel_credentials(
Expand Down
8 changes: 8 additions & 0 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ cdef extern from "ray/common/ray_config.h" nogil:
int64_t health_check_failure_threshold() const

uint64_t memory_monitor_refresh_ms() const

int64_t grpc_keepalive_time_ms() const

int64_t grpc_keepalive_timeout_ms() const

int64_t grpc_client_keepalive_time_ms() const

int64_t grpc_client_keepalive_timeout_ms() const
16 changes: 16 additions & 0 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,19 @@ cdef class Config:
@staticmethod
def memory_monitor_refresh_ms():
return (RayConfig.instance().memory_monitor_refresh_ms())

@staticmethod
def grpc_keepalive_time_ms():
return RayConfig.instance().grpc_keepalive_time_ms()

@staticmethod
def grpc_keepalive_timeout_ms():
return RayConfig.instance().grpc_keepalive_timeout_ms()

@staticmethod
def grpc_client_keepalive_time_ms():
return RayConfig.instance().grpc_client_keepalive_time_ms()

@staticmethod
def grpc_client_keepalive_timeout_ms():
return RayConfig.instance().grpc_client_keepalive_timeout_ms()
12 changes: 12 additions & 0 deletions src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <sstream>

#include "absl/container/flat_hash_map.h"
#include "ray/common/ray_config.h"
#include "ray/common/status.h"

namespace ray {
Expand Down Expand Up @@ -149,4 +150,15 @@ inline absl::flat_hash_map<K, V> MapFromProtobuf(
return absl::flat_hash_map<K, V>(pb_map.begin(), pb_map.end());
}

inline grpc::ChannelArguments CreateDefaultChannelArguments() {
grpc::ChannelArguments arguments;
if (::RayConfig::instance().grpc_client_keepalive_time_ms() > 0) {
arguments.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
::RayConfig::instance().grpc_client_keepalive_time_ms());
arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
::RayConfig::instance().grpc_client_keepalive_timeout_ms());
}
return arguments;
}

} // namespace ray
16 changes: 11 additions & 5 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,16 +661,22 @@ RAY_CONFIG(bool, isolate_workers_across_task_types, true)
/// ServerCall instance number of each RPC service handler
RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)

/// grpc keepalive sent interval
/// grpc keepalive sent interval for server.
/// This is only configured in GCS server now.
/// NOTE: It is not ideal for other components because
RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000)

/// grpc keepalive timeout.
RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000)
scv119 marked this conversation as resolved.
Show resolved Hide resolved

/// NOTE: we set a loose client keep alive because
/// they have a failure model that considers network failures as component failures
/// and this configuration break that assumption. We should apply to every other component
/// after we change this failure assumption from code.
RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000)
/// grpc keepalive timeout for client.
RAY_CONFIG(int64_t, grpc_client_keepalive_time_ms, 300000)

/// grpc keepalive timeout
RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000)
/// grpc keepalive timeout for client.
RAY_CONFIG(int64_t, grpc_client_keepalive_timeout_ms, 120000)
scv119 marked this conversation as resolved.
Show resolved Hide resolved

/// Whether to use log reporter in event framework
RAY_CONFIG(bool, event_log_reporter_enabled, false)
Expand Down
3 changes: 2 additions & 1 deletion src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <thread>

#include "absl/container/btree_map.h"
#include "ray/common/grpc_util.h"
#include "ray/common/network_util.h"
#include "ray/rpc/grpc_client.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
Expand Down Expand Up @@ -184,7 +185,7 @@ class GcsRpcClient {
gcs_port_(port),
io_context_(&client_call_manager.GetMainService()),
timer_(std::make_unique<boost::asio::deadline_timer>(*io_context_)) {
grpc::ChannelArguments arguments;
grpc::ChannelArguments arguments = CreateDefaultChannelArguments();
arguments.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
Expand Down
5 changes: 3 additions & 2 deletions src/ray/rpc/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class GrpcClient {
ClientCallManager &call_manager,
bool use_tls = false)
: client_call_manager_(call_manager), use_tls_(use_tls) {
std::shared_ptr<grpc::Channel> channel = BuildChannel(address, port);
std::shared_ptr<grpc::Channel> channel =
BuildChannel(address, port, CreateDefaultChannelArguments());
channel_ = BuildChannel(address, port);
stub_ = GrpcService::NewStub(channel_);
}
Expand All @@ -109,9 +110,9 @@ class GrpcClient {
int num_threads,
bool use_tls = false)
: client_call_manager_(call_manager), use_tls_(use_tls) {
grpc::ChannelArguments argument = CreateDefaultChannelArguments();
grpc::ResourceQuota quota;
quota.SetMaxThreads(num_threads);
grpc::ChannelArguments argument;
argument.SetResourceQuota(quota);
argument.SetInt(GRPC_ARG_ENABLE_HTTP_PROXY,
::RayConfig::instance().grpc_enable_http_proxy() ? 1 : 0);
Expand Down