Skip to content

Commit

Permalink
fix unlimited
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Jun 9, 2022
1 parent 18e164b commit 3251f2d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
7 changes: 3 additions & 4 deletions src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ void GrpcServer::Run() {
// Create calls for all the server call factories.
for (auto &entry : server_call_factories_) {
for (int i = 0; i < num_threads_; i++) {
// Create a buffer of 100 calls for each RPC handler.
// TODO(edoakes): a small buffer should be fine and seems to have better
// performance, but we don't currently handle backpressure on the client.
int buffer_size = 100;
// When there is no max active RPC limit, a call will be added to the completetion
// queue before processing starts, so adding only 1 call is enough.
int buffer_size = 1;
if (entry->GetMaxActiveRPCs() != -1) {
buffer_size = entry->GetMaxActiveRPCs();
}
Expand Down
16 changes: 6 additions & 10 deletions src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ class ServerCallImpl : public ServerCall {
start_time_ = absl::GetCurrentTimeNanos();
ray::stats::STATS_grpc_server_req_handling.Record(1.0, call_name_);
if (!io_service_.stopped()) {
if (factory_.GetMaxActiveRPCs() == -1) {
// Create a new `ServerCall` as completion queue tag before handling the request
// when no back pressure limit is set, so that new requests can continue to be
// pulled from the completion queue before this request is done.
factory_.CreateCall();
}
io_service_.post([this] { HandleRequestImpl(); }, call_name_);
} else {
// Handle service for rpc call has stopped, we must handle the call here
Expand All @@ -173,16 +179,6 @@ class ServerCallImpl : public ServerCall {

void HandleRequestImpl() {
state_ = ServerCallState::PROCESSING;
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
const auto &factory = factory_;
if (factory.GetMaxActiveRPCs() == -1) {
// Create a new `ServerCall` to accept the next incoming request.
// We create this before handling the request only when no back pressure limit is
// set. So that the it can be populated by the completion queue in the background if
// a new request comes in.
factory.CreateCall();
}
(service_handler_.*handle_request_function_)(
request_,
reply_,
Expand Down

0 comments on commit 3251f2d

Please sign in to comment.