From 18e164bad344d916cad33686659292eef7387d53 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Wed, 8 Jun 2022 21:04:49 -0700 Subject: [PATCH 1/3] fix --- src/ray/rpc/server_call.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 7b42c5ac81df..ea1a5eb332cb 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -377,7 +377,7 @@ class ServerCallFactoryImpl : public ServerCallFactory { /// Maximum request number to handle at the same time. /// -1 means no limit. - uint64_t max_active_rpcs_; + int64_t max_active_rpcs_; }; } // namespace rpc From 3251f2d86be4d0878f051c27af8178f5987113a5 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 9 Jun 2022 09:38:06 -0700 Subject: [PATCH 2/3] fix unlimited --- src/ray/rpc/grpc_server.cc | 7 +++---- src/ray/rpc/server_call.h | 16 ++++++---------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index c55422569de4..30dc8eaa784f 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -107,10 +107,9 @@ void GrpcServer::Run() { // Create calls for all the server call factories. for (auto &entry : server_call_factories_) { for (int i = 0; i < num_threads_; i++) { - // Create a buffer of 100 calls for each RPC handler. - // TODO(edoakes): a small buffer should be fine and seems to have better - // performance, but we don't currently handle backpressure on the client. - int buffer_size = 100; + // When there is no max active RPC limit, a call will be added to the completetion + // queue before processing starts, so adding only 1 call is enough. + int buffer_size = 1; if (entry->GetMaxActiveRPCs() != -1) { buffer_size = entry->GetMaxActiveRPCs(); } diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index ea1a5eb332cb..5c7e6e576d91 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -162,6 +162,12 @@ class ServerCallImpl : public ServerCall { start_time_ = absl::GetCurrentTimeNanos(); ray::stats::STATS_grpc_server_req_handling.Record(1.0, call_name_); if (!io_service_.stopped()) { + if (factory_.GetMaxActiveRPCs() == -1) { + // Create a new `ServerCall` as completion queue tag before handling the request + // when no back pressure limit is set, so that new requests can continue to be + // pulled from the completion queue before this request is done. + factory_.CreateCall(); + } io_service_.post([this] { HandleRequestImpl(); }, call_name_); } else { // Handle service for rpc call has stopped, we must handle the call here @@ -173,16 +179,6 @@ class ServerCallImpl : public ServerCall { void HandleRequestImpl() { state_ = ServerCallState::PROCESSING; - // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in - // a different thread, and will cause `this` to be deleted. - const auto &factory = factory_; - if (factory.GetMaxActiveRPCs() == -1) { - // Create a new `ServerCall` to accept the next incoming request. - // We create this before handling the request only when no back pressure limit is - // set. So that the it can be populated by the completion queue in the background if - // a new request comes in. - factory.CreateCall(); - } (service_handler_.*handle_request_function_)( request_, reply_, From 764f3a92ca552867624e19471ea65fe8b062371b Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 9 Jun 2022 21:11:47 -0700 Subject: [PATCH 3/3] increase buffer size --- src/ray/rpc/grpc_server.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 30dc8eaa784f..522ee317c293 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -108,8 +108,10 @@ void GrpcServer::Run() { for (auto &entry : server_call_factories_) { for (int i = 0; i < num_threads_; i++) { // When there is no max active RPC limit, a call will be added to the completetion - // queue before processing starts, so adding only 1 call is enough. - int buffer_size = 1; + // queue before RPC processing starts. In this case, the buffer size only + // determines the number of tags in the completion queue, instead of the number of + // inflight RPCs being processed. + int buffer_size = 128; if (entry->GetMaxActiveRPCs() != -1) { buffer_size = entry->GetMaxActiveRPCs(); }