From 797285d657ad2fb1c01c01a94aa99e755eefd2bc Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 29 Nov 2022 16:32:23 -0800 Subject: [PATCH] Revert "[core][scalability] Offload RPC Finish to a thread pool. (#30131)" (#30737) This reverts commit b79e5b0ca21f912f6f55b2636059f2d1ef75676b. Signed-off-by: Weichen Xu --- python/ray/_private/node.py | 2 +- release/ray_release/config.py | 4 +- src/ray/common/ray_config.h | 1 - src/ray/common/ray_config_def.h | 37 +++++++---------- src/ray/core_worker/core_worker.cc | 1 - .../gcs/gcs_client/test/gcs_client_test.cc | 1 - src/ray/gcs/gcs_server/gcs_actor_manager.cc | 36 ++++++++--------- src/ray/gcs/gcs_server/gcs_node_manager.cc | 4 +- src/ray/gcs/gcs_server/gcs_server_main.cc | 3 +- .../gcs_server/test/gcs_server_rpc_test.cc | 3 +- src/ray/raylet/node_manager.cc | 1 - src/ray/rpc/grpc_server.cc | 19 +-------- src/ray/rpc/grpc_server.h | 17 +++++++- src/ray/rpc/server_call.cc | 40 ------------------- src/ray/rpc/server_call.h | 19 +++------ 15 files changed, 59 insertions(+), 129 deletions(-) delete mode 100644 src/ray/rpc/server_call.cc diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 44fc2a7580a8..209942adc3e0 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1299,7 +1299,7 @@ def kill_gcs_server(self, check_alive: bool = True): dead. """ self._kill_process_type( - ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive, wait=True + ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive ) # Clear GCS client and address to indicate no GCS server is running. self._gcs_address = None diff --git a/release/ray_release/config.py b/release/ray_release/config.py index d74d6dd1712d..79dd4baa0c17 100644 --- a/release/ray_release/config.py +++ b/release/ray_release/config.py @@ -31,9 +31,7 @@ class Test(dict): "RELEASE_DEFAULT_PROJECT", "prj_FKRmeV5pA6X72aVscFALNC32", ) -DEFAULT_PYTHON_VERSION = tuple( - int(v) for v in os.environ.get("RELEASE_PY", "3.7").split(".") -) +DEFAULT_PYTHON_VERSION = (3, 7) RELEASE_PACKAGE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) diff --git a/src/ray/common/ray_config.h b/src/ray/common/ray_config.h index c1c468904649..f24ca6b5d730 100644 --- a/src/ray/common/ray_config.h +++ b/src/ray/common/ray_config.h @@ -17,7 +17,6 @@ #include #include #include -#include #include "absl/strings/escaping.h" #include "ray/util/logging.h" diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index c32e733c39e4..d23ef840bad0 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -198,7 +198,7 @@ RAY_CONFIG(int64_t, worker_cap_max_backoff_delay_ms, 1000 * 10) /// The fraction of resource utilization on a node after which the scheduler starts /// to prefer spreading tasks to other nodes. This balances between locality and /// even balancing of load. Low values (min 0.0) encourage more load spreading. -RAY_CONFIG(float, scheduler_spread_threshold, 0.5) +RAY_CONFIG(float, scheduler_spread_threshold, 0.5); /// Whether to only report the usage of pinned copies of objects in the /// object_store_memory resource. This means nodes holding secondary copies only @@ -289,7 +289,7 @@ RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 100) RAY_CONFIG(int64_t, worker_register_timeout_seconds, 60) /// The maximum number of workers to iterate whenever we analyze the resources usage. -RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128) +RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128); /// A value to add to workers' OOM score adjustment, so that the OS prioritizes /// killing these over the raylet. 0 or positive values only (negative values @@ -347,13 +347,9 @@ RAY_CONFIG(int32_t, maximum_profile_table_rows_count, 10 * 1000) /// message. RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20) /// Number of threads used by rpc server in gcs server. -RAY_CONFIG(uint32_t, - gcs_server_rpc_server_thread_num, - std::max(1U, std::thread::hardware_concurrency() / 4U)) +RAY_CONFIG(uint32_t, gcs_server_rpc_server_thread_num, 1) /// Number of threads used by rpc server in gcs server. -RAY_CONFIG(uint32_t, - gcs_server_rpc_client_thread_num, - std::max(1U, std::thread::hardware_concurrency() / 4U)) +RAY_CONFIG(uint32_t, gcs_server_rpc_client_thread_num, 1) /// Allow up to 5 seconds for connecting to gcs service. /// Note: this only takes effect when gcs service is enabled. RAY_CONFIG(int64_t, gcs_service_connect_retries, 50) @@ -369,7 +365,7 @@ RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200) /// Exponential backoff params for gcs to retry creating a placement group RAY_CONFIG(uint64_t, gcs_create_placement_group_retry_min_interval_ms, 100) RAY_CONFIG(uint64_t, gcs_create_placement_group_retry_max_interval_ms, 1000) -RAY_CONFIG(double, gcs_create_placement_group_retry_multiplier, 1.5) +RAY_CONFIG(double, gcs_create_placement_group_retry_multiplier, 1.5); /// Maximum number of destroyed actors in GCS server memory cache. RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000) /// Maximum number of dead nodes in GCS server memory cache. @@ -477,7 +473,7 @@ RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) /// If the agent manager fails to communicate with the dashboard agent, we will retry /// after this interval. -RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000) +RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000); /// The maximum number of resource shapes included in the resource /// load reported by each raylet. @@ -553,10 +549,10 @@ RAY_CONFIG(bool, is_external_storage_type_fs, true) /// Control the capacity threshold for ray local file system (for object store). /// Once we are over the capacity, all subsequent object creation will fail. -RAY_CONFIG(float, local_fs_capacity_threshold, 0.95) +RAY_CONFIG(float, local_fs_capacity_threshold, 0.95); /// Control the frequency of checking the disk usage. -RAY_CONFIG(uint64_t, local_fs_monitor_interval_ms, 100) +RAY_CONFIG(uint64_t, local_fs_monitor_interval_ms, 100); /* Configuration parameters for locality-aware scheduling. */ /// Whether to enable locality-aware leasing. If enabled, then Ray will consider task @@ -579,7 +575,7 @@ RAY_CONFIG(int64_t, timeout_ms_task_wait_for_death_info, 1000) /// The core worker heartbeat interval. During heartbeat, it'll /// report the loads to raylet. -RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000) +RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000); /// Maximum amount of memory that will be used by running tasks' args. RAY_CONFIG(float, max_task_args_memory_fraction, 0.7) @@ -606,7 +602,7 @@ RAY_CONFIG(uint64_t, subscriber_timeout_ms, 300 * 1000) RAY_CONFIG(uint64_t, gcs_actor_table_min_duration_ms, /* 5 min */ 60 * 1000 * 5) /// Whether to enable GCS-based actor scheduling. -RAY_CONFIG(bool, gcs_actor_scheduling_enabled, false) +RAY_CONFIG(bool, gcs_actor_scheduling_enabled, false); RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024) @@ -634,10 +630,10 @@ RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU") RAY_CONFIG(std::string, custom_unit_instance_resources, "") // Maximum size of the batches when broadcasting resources to raylet. -RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512) +RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512); // Maximum ray sync message batch size in bytes (1MB by default) between nodes. -RAY_CONFIG(uint64_t, max_sync_message_batch_bytes, 1 * 1024 * 1024) +RAY_CONFIG(uint64_t, max_sync_message_batch_bytes, 1 * 1024 * 1024); // If enabled and worker stated in container, the container will add // resource limit. @@ -660,10 +656,10 @@ RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100) /// they have a failure model that considers network failures as component failures /// and this configuration break that assumption. We should apply to every other component /// after we change this failure assumption from code. -RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000) +RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000); /// grpc keepalive timeout -RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000) +RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000); /// Whether to use log reporter in event framework RAY_CONFIG(bool, event_log_reporter_enabled, false) @@ -726,11 +722,6 @@ RAY_CONFIG(int64_t, health_check_timeout_ms, 10000) /// The threshold to consider a node dead. RAY_CONFIG(int64_t, health_check_failure_threshold, 5) -/// The pool size for grpc server call. -RAY_CONFIG(int64_t, - num_server_call_thread, - std::max((int64_t)1, (int64_t)(std::thread::hardware_concurrency() / 4U))) - /// Use madvise to prevent worker/raylet coredumps from including /// the mapped plasma pages. RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 926a090eb22b..7e691982f6c6 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -669,7 +669,6 @@ void CoreWorker::Exit( exit_type, detail = std::move(detail), creation_task_exception_pb_bytes]() { - rpc::DrainAndResetServerCallExecutor(); Disconnect(exit_type, detail, creation_task_exception_pb_bytes); Shutdown(); }, diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 9cef7fe50f21..645df351091d 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -105,7 +105,6 @@ class GcsClientTest : public ::testing::TestWithParam { gcs_client_.reset(); server_io_service_->stop(); - rpc::DrainAndResetServerCallExecutor(); server_io_service_thread_->join(); gcs_server_->Stop(); gcs_server_.reset(); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index ae828c1da477..9ab8c601c246 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -309,20 +309,17 @@ void GcsActorManager::HandleGetActorInfo(rpc::GetActorInfoRequest request, << ", job id = " << actor_id.JobId() << ", actor id = " << actor_id; const auto ®istered_actor_iter = registered_actors_.find(actor_id); - GcsActor *ptr = nullptr; if (registered_actor_iter != registered_actors_.end()) { - ptr = registered_actor_iter->second.get(); + reply->unsafe_arena_set_allocated_actor_table_data( + registered_actor_iter->second->GetMutableActorTableData()); } else { const auto &destroyed_actor_iter = destroyed_actors_.find(actor_id); if (destroyed_actor_iter != destroyed_actors_.end()) { - ptr = destroyed_actor_iter->second.get(); + reply->unsafe_arena_set_allocated_actor_table_data( + destroyed_actor_iter->second->GetMutableActorTableData()); } } - if (ptr != nullptr) { - *reply->mutable_actor_table_data() = ptr->GetActorTableData(); - } - RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId() << ", actor id = " << actor_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -335,6 +332,7 @@ void GcsActorManager::HandleGetAllActorInfo(rpc::GetAllActorInfoRequest request, auto limit = request.has_limit() ? request.limit() : -1; RAY_LOG(DEBUG) << "Getting all actor info."; ++counts_[CountType::GET_ALL_ACTOR_INFO_REQUEST]; + if (request.show_dead_jobs() == false) { auto total_actors = registered_actors_.size() + destroyed_actors_.size(); reply->set_total(total_actors); @@ -345,15 +343,18 @@ void GcsActorManager::HandleGetAllActorInfo(rpc::GetAllActorInfoRequest request, break; } count += 1; - *reply->add_actor_table_data() = iter.second->GetActorTableData(); - } + reply->mutable_actor_table_data()->UnsafeArenaAddAllocated( + const_cast(iter.second->GetMutableActorTableData())); + } for (const auto &iter : destroyed_actors_) { if (limit != -1 && count >= limit) { break; } count += 1; - *reply->add_actor_table_data() = iter.second->GetActorTableData(); + + reply->mutable_actor_table_data()->UnsafeArenaAddAllocated( + const_cast(iter.second->GetMutableActorTableData())); } RAY_LOG(DEBUG) << "Finished getting all actor info."; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -365,16 +366,12 @@ void GcsActorManager::HandleGetAllActorInfo(rpc::GetAllActorInfoRequest request, // jobs, so fetch it from redis. Status status = gcs_table_storage_->ActorTable().GetAll( [reply, send_reply_callback, limit]( - absl::flat_hash_map &&result) { + const absl::flat_hash_map &result) { auto total_actors = result.size(); - reply->set_total(total_actors); - auto arena = reply->GetArena(); - RAY_CHECK(arena != nullptr); - auto ptr = google::protobuf::Arena::Create< - absl::flat_hash_map>(arena, std::move(result)); + auto count = 0; - for (const auto &pair : *ptr) { + for (const auto &pair : result) { if (limit != -1 && count >= limit) { break; } @@ -414,8 +411,9 @@ void GcsActorManager::HandleGetNamedActorInfo( RAY_LOG(WARNING) << stream.str(); status = Status::NotFound(stream.str()); } else { - *reply->mutable_actor_table_data() = iter->second->GetActorTableData(); - *reply->mutable_task_spec() = *iter->second->GetMutableTaskSpec(); + reply->unsafe_arena_set_allocated_actor_table_data( + iter->second->GetMutableActorTableData()); + reply->unsafe_arena_set_allocated_task_spec(iter->second->GetMutableTaskSpec()); RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId() << ", actor id = " << actor_id; } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 13fb0459ef5c..784dbcf387a2 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -153,10 +153,10 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request, // The request will be sent when call send_reply_callback and after that, reply will // not be used any more. But entry is still valid. for (const auto &entry : alive_nodes_) { - *reply->add_node_info_list() = *entry.second; + reply->mutable_node_info_list()->UnsafeArenaAddAllocated(entry.second.get()); } for (const auto &entry : dead_nodes_) { - *reply->add_node_info_list() = *entry.second; + reply->mutable_node_info_list()->UnsafeArenaAddAllocated(entry.second.get()); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); ++counts_[CountType::GET_ALL_NODE_INFO_REQUEST]; diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 5c48c618d9b4..183a8b886fdd 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -106,10 +106,9 @@ int main(int argc, char *argv[]) { auto handler = [&main_service, &gcs_server](const boost::system::error_code &error, int signal_number) { RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down..."; - main_service.stop(); - ray::rpc::DrainAndResetServerCallExecutor(); gcs_server.Stop(); ray::stats::Shutdown(); + main_service.stop(); }; boost::asio::signal_set signals(main_service); #ifdef _WIN32 diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index f968f7f40ea0..61cb5c8a06e4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -58,9 +58,8 @@ class GcsServerTest : public ::testing::Test { } void TearDown() override { - io_service_.stop(); - rpc::DrainAndResetServerCallExecutor(); gcs_server_->Stop(); + io_service_.stop(); thread_io_service_->join(); gcs_server_.reset(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 80fdea5c7b8e..021b595017f6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2071,7 +2071,6 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request, return; } auto shutdown_after_reply = []() { - rpc::DrainAndResetServerCallExecutor(); // Note that the callback is posted to the io service after the shutdown GRPC request // is replied. Otherwise, the RPC might not be replied to GCS before it shutsdown // itself. Implementation note: When raylet is shutdown by ray stop, the CLI sends a diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 962017aca9f7..35b1f0983dd5 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -48,24 +48,6 @@ GrpcServer::GrpcServer(std::string name, grpc::channelz::experimental::InitChannelzService(); } -void GrpcServer::Shutdown() { - if (!is_closed_) { - // Drain the executor threads. - // Shutdown the server with an immediate deadline. - // TODO(edoakes): do we want to do this in all cases? - server_->Shutdown(gpr_now(GPR_CLOCK_REALTIME)); - for (const auto &cq : cqs_) { - cq->Shutdown(); - } - for (auto &polling_thread : polling_threads_) { - polling_thread.join(); - } - is_closed_ = true; - RAY_LOG(DEBUG) << "gRPC server of " << name_ << " shutdown."; - server_.reset(); - } -} - void GrpcServer::Run() { uint32_t specified_port = port_; std::string server_address((listen_to_localhost_only_ ? "127.0.0.1:" : "0.0.0.0:") + @@ -189,6 +171,7 @@ void GrpcServer::PollEventsFromCompletionQueue(int index) { case ServerCallState::PENDING: // We've received a new incoming request. Now this call object is used to // track this request. + server_call->SetState(ServerCallState::PROCESSING); server_call->HandleRequest(); break; case ServerCallState::SENDING_REPLY: diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 9053fffa06bf..256e9aabe62f 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -90,7 +90,22 @@ class GrpcServer { void Run(); // Shutdown this server - void Shutdown(); + void Shutdown() { + if (!is_closed_) { + // Shutdown the server with an immediate deadline. + // TODO(edoakes): do we want to do this in all cases? + server_->Shutdown(gpr_now(GPR_CLOCK_REALTIME)); + for (const auto &cq : cqs_) { + cq->Shutdown(); + } + for (auto &polling_thread : polling_threads_) { + polling_thread.join(); + } + is_closed_ = true; + RAY_LOG(DEBUG) << "gRPC server of " << name_ << " shutdown."; + server_.reset(); + } + } /// Get the port of this gRPC server. int GetPort() const { return port_; } diff --git a/src/ray/rpc/server_call.cc b/src/ray/rpc/server_call.cc deleted file mode 100644 index b28317598e05..000000000000 --- a/src/ray/rpc/server_call.cc +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2022 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/rpc/server_call.h" - -#include "ray/common/ray_config.h" - -namespace ray { -namespace rpc { -namespace { - -std::unique_ptr &_GetServerCallExecutor() { - static auto thread_pool = std::make_unique( - ::RayConfig::instance().num_server_call_thread()); - return thread_pool; -} - -} // namespace - -boost::asio::thread_pool &GetServerCallExecutor() { return *_GetServerCallExecutor(); } - -void DrainAndResetServerCallExecutor() { - GetServerCallExecutor().join(); - _GetServerCallExecutor() = std::make_unique( - ::RayConfig::instance().num_server_call_thread()); -} - -} // namespace rpc -} // namespace ray diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 31d078ff78f0..1049548aabb7 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -28,14 +28,6 @@ namespace ray { namespace rpc { -/// Get the thread pool for the gRPC server. -/// This pool is shared across gRPC servers. -boost::asio::thread_pool &GetServerCallExecutor(); - -/// For testing -/// Drain the executor and reset it. -void DrainAndResetServerCallExecutor(); - /// Represents the callback function to be called when a `ServiceHandler` finishes /// handling a request. /// \param status The status would be returned to client. @@ -208,8 +200,11 @@ class ServerCallImpl : public ServerCall { // is async and this `ServerCall` might be deleted right after `SendReply`. send_reply_success_callback_ = std::move(success); send_reply_failure_callback_ = std::move(failure); - boost::asio::post(GetServerCallExecutor(), - [this, status]() { SendReply(status); }); + + // When the handler is done with the request, tell gRPC to finish this request. + // Must send reply at the bottom of this callback, once we invoke this funciton, + // this server call might be deleted + SendReply(status); }); } @@ -246,12 +241,8 @@ class ServerCallImpl : public ServerCall { (end_time - start_time_) / 1000000.0, call_name_); } } - /// Tell gRPC to finish this request and send reply asynchronously. void SendReply(const Status &status) { - if (io_service_.stopped()) { - return; - } state_ = ServerCallState::SENDING_REPLY; response_writer_.Finish(*reply_, RayStatusToGrpcStatus(status), this); }