Skip to content

Commit

Permalink
[core] Fix GCS FD usage increase regression. (ray-project#35624)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

After GCS client is moved to cpp, the FD usage is increased by one. Previously it's 2 and after this, it's 3.

In the fix, we reuse the channel to make sure only 2 connections between GCS and CoreWorker. We still create 3 channels, but we use the same arguments to create the channels and depends on gRPC to reuse the TCP connections created.

The reason why previously it's 2 hasn't been figured out. Maybe gRPC has some work hidden which can reuse the connection in sone way.

## Related issue number
ray-project#34635
Signed-off-by: e428265 <[email protected]>
  • Loading branch information
fishbone authored and arvind-chandra committed Aug 31, 2023
1 parent 12b465c commit 24ab5ac
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 31 deletions.
45 changes: 41 additions & 4 deletions python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ray.experimental.internal_kv import _internal_kv_list
from ray.tests.conftest import call_ray_start
import subprocess
import psutil


@pytest.fixture
Expand Down Expand Up @@ -269,11 +270,9 @@ def test_gcs_connection_no_leak(ray_start_cluster):
ray.init(cluster.address)

def get_gcs_num_of_connections():
import psutil

p = psutil.Process(gcs_server_pid)
print(">>", p.num_fds())
return p.num_fds()
print(">>", len(p.connections()))
return len(p.connections())

# Wait for everything to be ready.
import time
Expand Down Expand Up @@ -438,6 +437,44 @@ def f():
assert ray.get(f.remote())


def test_gcs_fd_usage(shutdown_only):
ray.init(
_system_config={
"prestart_worker_first_driver": False,
"enable_worker_prestart": False,
},
)
gcs_process = ray._private.worker._global_node.all_processes["gcs_server"][0]
gcs_process = psutil.Process(gcs_process.process.pid)
print("GCS connections", len(gcs_process.connections()))

@ray.remote(runtime_env={"env_vars": {"Hello": "World"}})
class A:
def f(self):
import os

return os.environ.get("Hello")

# In case there are still some pre-start workers, consume all of them
aa = [A.remote() for _ in range(32)]
for a in aa:
assert ray.get(a.f.remote()) == "World"
base_fd_num = len(gcs_process.connections())
print("GCS connections", base_fd_num)

bb = [A.remote() for _ in range(16)]
for b in bb:
assert ray.get(b.f.remote()) == "World"
new_fd_num = len(gcs_process.connections())
print("GCS connections", new_fd_num)
# each worker has two connections:
# GCS -> CoreWorker
# CoreWorker -> GCS
# Sometimes, there is one more sockets opened. The reason
# is still unknown.
assert (new_fd_num - base_fd_num) <= len(bb) * 2 + 1


if __name__ == "__main__":
import pytest
import os
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
// The max gRPC message size (the gRPC internal default is 4MB). We use a higher
// limit in Ray to avoid crashing with many small inlined task arguments.
// Keep in sync with GCS_STORAGE_MAX_SIZE in packaging.py.
RAY_CONFIG(int64_t, max_grpc_message_size, 500 * 1024 * 1024)
RAY_CONFIG(int64_t, max_grpc_message_size, 512 * 1024 * 1024)

// Retry timeout for trying to create a gRPC server. Only applies if the number
// of retries is non zero.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ std::pair<std::string, int> GcsClient::GetGcsServerAddress() const {
PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) {}

Status PythonGcsClient::Connect() {
auto arguments = PythonGrpcChannelArguments();
channel_ = rpc::BuildChannel(options_.gcs_address_, options_.gcs_port_, arguments);
channel_ =
rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_);
kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_);
runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_);
node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_);
Expand Down
12 changes: 2 additions & 10 deletions src/ray/gcs/pubsub/gcs_pub_sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/gcs/pubsub/gcs_pub_sub.h"

#include "absl/strings/str_cat.h"
#include "ray/rpc/gcs_server/gcs_rpc_client.h"
#include "ray/rpc/grpc_client.h"

namespace ray {
Expand Down Expand Up @@ -213,14 +214,6 @@ Status GcsSubscriber::SubscribeAllWorkerFailures(
return Status::OK();
}

grpc::ChannelArguments PythonGrpcChannelArguments() {
grpc::ChannelArguments arguments;
arguments.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, 512 * 1024 * 1024);
arguments.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60 * 1000);
arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 60 * 1000);
return arguments;
}

PythonGcsPublisher::PythonGcsPublisher(const std::string &gcs_address) {
std::vector<std::string> address = absl::StrSplit(gcs_address, ':');
RAY_LOG(DEBUG) << "Connect to gcs server via address: " << gcs_address;
Expand All @@ -230,8 +223,7 @@ PythonGcsPublisher::PythonGcsPublisher(const std::string &gcs_address) {
}

Status PythonGcsPublisher::Connect() {
auto arguments = PythonGrpcChannelArguments();
channel_ = rpc::BuildChannel(gcs_address_, gcs_port_, arguments);
channel_ = rpc::GcsRpcClient::CreateGcsChannel(gcs_address_, gcs_port_);
pubsub_stub_ = rpc::InternalPubSubGcsService::NewStub(channel_);
return Status::OK();
}
Expand Down
4 changes: 0 additions & 4 deletions src/ray/gcs/pubsub/gcs_pub_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,5 @@ class RAY_EXPORT PythonGcsPublisher {
int gcs_port_;
};

/// Construct the arguments for synchronous gRPC clients
/// (the ones wrapped in Python)
grpc::ChannelArguments PythonGrpcChannelArguments();

} // namespace gcs
} // namespace ray
24 changes: 14 additions & 10 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ class Executor {

/// Client used for communicating with gcs server.
class GcsRpcClient {
public:
static std::shared_ptr<grpc::Channel> CreateGcsChannel(const std::string &address,
int port) {
grpc::ChannelArguments arguments = CreateDefaultChannelArguments();
arguments.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_min_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_initial_reconnect_backoff_ms());
return BuildChannel(address, port, arguments);
}

public:
/// Constructor. GcsRpcClient is not thread safe.
///
Expand All @@ -190,16 +203,7 @@ class GcsRpcClient {
gcs_port_(port),
io_context_(&client_call_manager.GetMainService()),
timer_(std::make_unique<boost::asio::deadline_timer>(*io_context_)) {
grpc::ChannelArguments arguments = CreateDefaultChannelArguments();
arguments.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_min_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_initial_reconnect_backoff_ms());

channel_ = BuildChannel(address, port, arguments);

channel_ = CreateGcsChannel(address, port);
// If not the reconnection will continue to work.
auto deadline =
std::chrono::system_clock::now() +
Expand Down

0 comments on commit 24ab5ac

Please sign in to comment.