From 08cf90e2e949d1170ae140733f68ba26b5eedd48 Mon Sep 17 00:00:00 2001 From: "senlin.zsl" Date: Wed, 10 Mar 2021 15:33:49 +0800 Subject: [PATCH] destruct reply as early as possible --- src/ray/rpc/client_call.h | 32 +++++++++++++++++--------------- src/ray/rpc/server_call.h | 18 ++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index aea10b348ddd..31be32a5d6af 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -62,7 +62,9 @@ class ClientCallImpl : public ClientCall { /// Constructor. /// /// \param[in] callback The callback function to handle the reply. - explicit ClientCallImpl(const ClientCallback &callback) : callback_(callback) {} + explicit ClientCallImpl( + const std::function)> &callback) + : reply_(new Reply), callback_(callback) {} Status GetStatus() override { absl::MutexLock lock(&mutex_); @@ -81,16 +83,16 @@ class ClientCallImpl : public ClientCall { status = return_status_; } if (callback_ != nullptr) { - callback_(status, reply_); + callback_(status, std::move(reply_)); } } private: /// The reply message. - Reply reply_; + std::shared_ptr reply_; /// The callback function to handle the reply. - ClientCallback callback_; + std::function)> callback_; /// The response reader. std::unique_ptr> response_reader_; @@ -204,7 +206,13 @@ class ClientCallManager { typename GrpcService::Stub &stub, const PrepareAsyncFunction prepare_async_function, const Request &request, const ClientCallback &callback) { - auto call = std::make_shared>(callback); + auto call = std::make_shared>( + [this, callback](const Status &status, std::shared_ptr reply) { + if (callback && !main_service_.stopped() && !shutdown_) { + main_service_.post([status, reply, callback] { callback(status, *reply); }); + } + }); + // Send request. // Find the next completion queue to wait for response. call->response_reader_ = (stub.*prepare_async_function)( @@ -218,7 +226,7 @@ class ClientCallManager { // `ClientCall` is safe to use. But `response_reader_->Finish` only accepts a raw // pointer. auto tag = new ClientCallTag(call); - call->response_reader_->Finish(&call->reply_, &call->status_, (void *)tag); + call->response_reader_->Finish(call->reply_.get(), &call->status_, (void *)tag); return call; } @@ -248,16 +256,10 @@ class ClientCallManager { } else if (status != grpc::CompletionQueue::TIMEOUT) { auto tag = reinterpret_cast(got_tag); tag->GetCall()->SetReturnStatus(); - if (ok && !main_service_.stopped() && !shutdown_) { - // Post the callback to the main event loop. - main_service_.post([tag]() { - tag->GetCall()->OnReplyReceived(); - // The call is finished, and we can delete this tag now. - delete tag; - }); - } else { - delete tag; + if (ok) { + tag->GetCall()->OnReplyReceived(); } + delete tag; } } } diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 7f84f601adcd..e65e6e7988b7 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -144,7 +144,7 @@ class ServerCallImpl : public ServerCall { // Handle service for rpc call has stopped, we must handle the call here // to send reply and remove it from cq RAY_LOG(DEBUG) << "Handle service has been closed."; - SendReply(Status::Invalid("HandleServiceClosed")); + SendReply(Reply(), Status::Invalid("HandleServiceClosed")); } } @@ -157,10 +157,11 @@ class ServerCallImpl : public ServerCall { // We create this before handling the request so that the it can be populated by // the completion queue in the background if a new request comes in. factory.CreateCall(); + auto reply = std::make_shared(); (service_handler_.*handle_request_function_)( - request_, &reply_, - [this](Status status, std::function success, - std::function failure) { + request_, reply.get(), + [this, reply](Status status, std::function success, + std::function failure) { // These two callbacks must be set before `SendReply`, because `SendReply` // is async and this `ServerCall` might be deleted right after `SendReply`. send_reply_success_callback_ = std::move(success); @@ -169,7 +170,7 @@ class ServerCallImpl : public ServerCall { // When the handler is done with the request, tell gRPC to finish this request. // Must send reply at the bottom of this callback, once we invoke this funciton, // this server call might be deleted - SendReply(status); + SendReply(*reply, status); }); } @@ -189,9 +190,9 @@ class ServerCallImpl : public ServerCall { private: /// Tell gRPC to finish this request and send reply asynchronously. - void SendReply(const Status &status) { + void SendReply(const Reply &reply, const Status &status) { state_ = ServerCallState::SENDING_REPLY; - response_writer_.Finish(reply_, RayStatusToGrpcStatus(status), this); + response_writer_.Finish(reply, RayStatusToGrpcStatus(status), this); } /// State of this call. @@ -219,9 +220,6 @@ class ServerCallImpl : public ServerCall { /// The request message. Request request_; - /// The reply message. - Reply reply_; - /// The callback when sending reply successes. std::function send_reply_success_callback_ = nullptr;