diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index b61e5aac9216..a4ba35d1756d 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -16,6 +16,7 @@ from ray.experimental.internal_kv import _internal_kv_list from ray.tests.conftest import call_ray_start import subprocess +import psutil @pytest.fixture @@ -269,11 +270,9 @@ def test_gcs_connection_no_leak(ray_start_cluster): ray.init(cluster.address) def get_gcs_num_of_connections(): - import psutil - p = psutil.Process(gcs_server_pid) - print(">>", p.num_fds()) - return p.num_fds() + print(">>", len(p.connections())) + return len(p.connections()) # Wait for everything to be ready. import time @@ -438,6 +437,44 @@ def f(): assert ray.get(f.remote()) +def test_gcs_fd_usage(shutdown_only): + ray.init( + _system_config={ + "prestart_worker_first_driver": False, + "enable_worker_prestart": False, + }, + ) + gcs_process = ray._private.worker._global_node.all_processes["gcs_server"][0] + gcs_process = psutil.Process(gcs_process.process.pid) + print("GCS connections", len(gcs_process.connections())) + + @ray.remote(runtime_env={"env_vars": {"Hello": "World"}}) + class A: + def f(self): + import os + + return os.environ.get("Hello") + + # In case there are still some pre-start workers, consume all of them + aa = [A.remote() for _ in range(32)] + for a in aa: + assert ray.get(a.f.remote()) == "World" + base_fd_num = len(gcs_process.connections()) + print("GCS connections", base_fd_num) + + bb = [A.remote() for _ in range(16)] + for b in bb: + assert ray.get(b.f.remote()) == "World" + new_fd_num = len(gcs_process.connections()) + print("GCS connections", new_fd_num) + # each worker has two connections: + # GCS -> CoreWorker + # CoreWorker -> GCS + # Sometimes, there is one more sockets opened. The reason + # is still unknown. + assert (new_fd_num - base_fd_num) <= len(bb) * 2 + 1 + + if __name__ == "__main__": import pytest import os diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0d4806ec45fe..c1ce38c94ab0 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -215,7 +215,7 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) // The max gRPC message size (the gRPC internal default is 4MB). We use a higher // limit in Ray to avoid crashing with many small inlined task arguments. // Keep in sync with GCS_STORAGE_MAX_SIZE in packaging.py. -RAY_CONFIG(int64_t, max_grpc_message_size, 500 * 1024 * 1024) +RAY_CONFIG(int64_t, max_grpc_message_size, 512 * 1024 * 1024) // Retry timeout for trying to create a gRPC server. Only applies if the number // of retries is non zero. diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index ae342b05eec0..689a9d499bab 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -146,8 +146,8 @@ std::pair GcsClient::GetGcsServerAddress() const { PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) {} Status PythonGcsClient::Connect() { - auto arguments = PythonGrpcChannelArguments(); - channel_ = rpc::BuildChannel(options_.gcs_address_, options_.gcs_port_, arguments); + channel_ = + rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.cc b/src/ray/gcs/pubsub/gcs_pub_sub.cc index b03a9157da46..4e765424a7e2 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.cc +++ b/src/ray/gcs/pubsub/gcs_pub_sub.cc @@ -15,6 +15,7 @@ #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "absl/strings/str_cat.h" +#include "ray/rpc/gcs_server/gcs_rpc_client.h" #include "ray/rpc/grpc_client.h" namespace ray { @@ -213,14 +214,6 @@ Status GcsSubscriber::SubscribeAllWorkerFailures( return Status::OK(); } -grpc::ChannelArguments PythonGrpcChannelArguments() { - grpc::ChannelArguments arguments; - arguments.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, 512 * 1024 * 1024); - arguments.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60 * 1000); - arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 60 * 1000); - return arguments; -} - PythonGcsPublisher::PythonGcsPublisher(const std::string &gcs_address) { std::vector address = absl::StrSplit(gcs_address, ':'); RAY_LOG(DEBUG) << "Connect to gcs server via address: " << gcs_address; @@ -230,8 +223,7 @@ PythonGcsPublisher::PythonGcsPublisher(const std::string &gcs_address) { } Status PythonGcsPublisher::Connect() { - auto arguments = PythonGrpcChannelArguments(); - channel_ = rpc::BuildChannel(gcs_address_, gcs_port_, arguments); + channel_ = rpc::GcsRpcClient::CreateGcsChannel(gcs_address_, gcs_port_); pubsub_stub_ = rpc::InternalPubSubGcsService::NewStub(channel_); return Status::OK(); } diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index db621938dc98..6497599658ca 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -165,9 +165,5 @@ class RAY_EXPORT PythonGcsPublisher { int gcs_port_; }; -/// Construct the arguments for synchronous gRPC clients -/// (the ones wrapped in Python) -grpc::ChannelArguments PythonGrpcChannelArguments(); - } // namespace gcs } // namespace ray diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index b24ee25a8b8a..9d2061177815 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -173,6 +173,19 @@ class Executor { /// Client used for communicating with gcs server. class GcsRpcClient { + public: + static std::shared_ptr CreateGcsChannel(const std::string &address, + int port) { + grpc::ChannelArguments arguments = CreateDefaultChannelArguments(); + arguments.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, + ::RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms()); + arguments.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, + ::RayConfig::instance().gcs_grpc_min_reconnect_backoff_ms()); + arguments.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, + ::RayConfig::instance().gcs_grpc_initial_reconnect_backoff_ms()); + return BuildChannel(address, port, arguments); + } + public: /// Constructor. GcsRpcClient is not thread safe. /// @@ -190,16 +203,7 @@ class GcsRpcClient { gcs_port_(port), io_context_(&client_call_manager.GetMainService()), timer_(std::make_unique(*io_context_)) { - grpc::ChannelArguments arguments = CreateDefaultChannelArguments(); - arguments.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, - ::RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms()); - arguments.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, - ::RayConfig::instance().gcs_grpc_min_reconnect_backoff_ms()); - arguments.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, - ::RayConfig::instance().gcs_grpc_initial_reconnect_backoff_ms()); - - channel_ = BuildChannel(address, port, arguments); - + channel_ = CreateGcsChannel(address, port); // If not the reconnection will continue to work. auto deadline = std::chrono::system_clock::now() +