Skip to content

Commit

Permalink
[core][scalability] Offload RPC Finish to a thread pool. (ray-project…
Browse files Browse the repository at this point in the history
…#30131)

Right now, GCS/Raylet only has one thread running there. That thread is likely to become a bottleneck when load increased.

For request like kv, it's really cheap, but the RPC overhead is actually big compared with the cheap operations.
This potentially can cost a lot of issues and we only have one thread in the GCS/Raylet which makes the things worse.

Before moving to multi-threading GCS/Raylet, one thing we can do is to execute Finish in a dedicated thread pool.

Finish did a lot of things, like serialize the message which might be expensive. And Finish is easily to be offloaded from the main thread, so we can get a lot of gains.
  • Loading branch information
fishbone authored Nov 24, 2022
1 parent afd0dcc commit b79e5b0
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 59 deletions.
2 changes: 1 addition & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ def kill_gcs_server(self, check_alive: bool = True):
dead.
"""
self._kill_process_type(
ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive
ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive, wait=True
)
# Clear GCS client and address to indicate no GCS server is running.
self._gcs_address = None
Expand Down
4 changes: 3 additions & 1 deletion release/ray_release/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class Test(dict):
"RELEASE_DEFAULT_PROJECT",
"prj_FKRmeV5pA6X72aVscFALNC32",
)
DEFAULT_PYTHON_VERSION = (3, 7)
DEFAULT_PYTHON_VERSION = tuple(
int(v) for v in os.environ.get("RELEASE_PY", "3.7").split(".")
)

RELEASE_PACKAGE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))

Expand Down
1 change: 1 addition & 0 deletions src/ray/common/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cstdint>
#include <sstream>
#include <string>
#include <thread>

#include "absl/strings/escaping.h"
#include "ray/util/logging.h"
Expand Down
37 changes: 23 additions & 14 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ RAY_CONFIG(int64_t, worker_cap_max_backoff_delay_ms, 1000 * 10)
/// The fraction of resource utilization on a node after which the scheduler starts
/// to prefer spreading tasks to other nodes. This balances between locality and
/// even balancing of load. Low values (min 0.0) encourage more load spreading.
RAY_CONFIG(float, scheduler_spread_threshold, 0.5);
RAY_CONFIG(float, scheduler_spread_threshold, 0.5)

/// Whether to only report the usage of pinned copies of objects in the
/// object_store_memory resource. This means nodes holding secondary copies only
Expand Down Expand Up @@ -289,7 +289,7 @@ RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 100)
RAY_CONFIG(int64_t, worker_register_timeout_seconds, 60)

/// The maximum number of workers to iterate whenever we analyze the resources usage.
RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128);
RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128)

/// A value to add to workers' OOM score adjustment, so that the OS prioritizes
/// killing these over the raylet. 0 or positive values only (negative values
Expand Down Expand Up @@ -347,9 +347,13 @@ RAY_CONFIG(int32_t, maximum_profile_table_rows_count, 10 * 1000)
/// message.
RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20)
/// Number of threads used by rpc server in gcs server.
RAY_CONFIG(uint32_t, gcs_server_rpc_server_thread_num, 1)
RAY_CONFIG(uint32_t,
gcs_server_rpc_server_thread_num,
std::max(1U, std::thread::hardware_concurrency() / 4U))
/// Number of threads used by rpc server in gcs server.
RAY_CONFIG(uint32_t, gcs_server_rpc_client_thread_num, 1)
RAY_CONFIG(uint32_t,
gcs_server_rpc_client_thread_num,
std::max(1U, std::thread::hardware_concurrency() / 4U))
/// Allow up to 5 seconds for connecting to gcs service.
/// Note: this only takes effect when gcs service is enabled.
RAY_CONFIG(int64_t, gcs_service_connect_retries, 50)
Expand All @@ -365,7 +369,7 @@ RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200)
/// Exponential backoff params for gcs to retry creating a placement group
RAY_CONFIG(uint64_t, gcs_create_placement_group_retry_min_interval_ms, 100)
RAY_CONFIG(uint64_t, gcs_create_placement_group_retry_max_interval_ms, 1000)
RAY_CONFIG(double, gcs_create_placement_group_retry_multiplier, 1.5);
RAY_CONFIG(double, gcs_create_placement_group_retry_multiplier, 1.5)
/// Maximum number of destroyed actors in GCS server memory cache.
RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
/// Maximum number of dead nodes in GCS server memory cache.
Expand Down Expand Up @@ -473,7 +477,7 @@ RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000)

/// If the agent manager fails to communicate with the dashboard agent, we will retry
/// after this interval.
RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000);
RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000)

/// The maximum number of resource shapes included in the resource
/// load reported by each raylet.
Expand Down Expand Up @@ -549,10 +553,10 @@ RAY_CONFIG(bool, is_external_storage_type_fs, true)

/// Control the capacity threshold for ray local file system (for object store).
/// Once we are over the capacity, all subsequent object creation will fail.
RAY_CONFIG(float, local_fs_capacity_threshold, 0.95);
RAY_CONFIG(float, local_fs_capacity_threshold, 0.95)

/// Control the frequency of checking the disk usage.
RAY_CONFIG(uint64_t, local_fs_monitor_interval_ms, 100);
RAY_CONFIG(uint64_t, local_fs_monitor_interval_ms, 100)

/* Configuration parameters for locality-aware scheduling. */
/// Whether to enable locality-aware leasing. If enabled, then Ray will consider task
Expand All @@ -575,7 +579,7 @@ RAY_CONFIG(int64_t, timeout_ms_task_wait_for_death_info, 1000)

/// The core worker heartbeat interval. During heartbeat, it'll
/// report the loads to raylet.
RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000);
RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000)

/// Maximum amount of memory that will be used by running tasks' args.
RAY_CONFIG(float, max_task_args_memory_fraction, 0.7)
Expand All @@ -602,7 +606,7 @@ RAY_CONFIG(uint64_t, subscriber_timeout_ms, 300 * 1000)
RAY_CONFIG(uint64_t, gcs_actor_table_min_duration_ms, /* 5 min */ 60 * 1000 * 5)

/// Whether to enable GCS-based actor scheduling.
RAY_CONFIG(bool, gcs_actor_scheduling_enabled, false);
RAY_CONFIG(bool, gcs_actor_scheduling_enabled, false)

RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024)

Expand Down Expand Up @@ -630,10 +634,10 @@ RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU")
RAY_CONFIG(std::string, custom_unit_instance_resources, "")

// Maximum size of the batches when broadcasting resources to raylet.
RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512);
RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512)

// Maximum ray sync message batch size in bytes (1MB by default) between nodes.
RAY_CONFIG(uint64_t, max_sync_message_batch_bytes, 1 * 1024 * 1024);
RAY_CONFIG(uint64_t, max_sync_message_batch_bytes, 1 * 1024 * 1024)

// If enabled and worker stated in container, the container will add
// resource limit.
Expand All @@ -656,10 +660,10 @@ RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)
/// they have a failure model that considers network failures as component failures
/// and this configuration break that assumption. We should apply to every other component
/// after we change this failure assumption from code.
RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000);
RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000)

/// grpc keepalive timeout
RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000);
RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000)

/// Whether to use log reporter in event framework
RAY_CONFIG(bool, event_log_reporter_enabled, false)
Expand Down Expand Up @@ -715,6 +719,11 @@ RAY_CONFIG(int64_t, health_check_period_ms, 3000)
RAY_CONFIG(int64_t, health_check_timeout_ms, 10000)
RAY_CONFIG(int64_t, health_check_failure_threshold, 5)

/// The pool size for grpc server call.
RAY_CONFIG(int64_t,
num_server_call_thread,
std::max((int64_t)1, (int64_t)(std::thread::hardware_concurrency() / 4U)))

/// Use madvise to prevent worker/raylet coredumps from including
/// the mapped plasma pages.
RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true)
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ void CoreWorker::Exit(
exit_type,
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainAndResetServerCallExecutor();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
Shutdown();
},
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
gcs_client_.reset();

server_io_service_->stop();
rpc::DrainAndResetServerCallExecutor();
server_io_service_thread_->join();
gcs_server_->Stop();
gcs_server_.reset();
Expand Down
36 changes: 19 additions & 17 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,20 @@ void GcsActorManager::HandleGetActorInfo(rpc::GetActorInfoRequest request,
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;

const auto &registered_actor_iter = registered_actors_.find(actor_id);
GcsActor *ptr = nullptr;
if (registered_actor_iter != registered_actors_.end()) {
reply->unsafe_arena_set_allocated_actor_table_data(
registered_actor_iter->second->GetMutableActorTableData());
ptr = registered_actor_iter->second.get();
} else {
const auto &destroyed_actor_iter = destroyed_actors_.find(actor_id);
if (destroyed_actor_iter != destroyed_actors_.end()) {
reply->unsafe_arena_set_allocated_actor_table_data(
destroyed_actor_iter->second->GetMutableActorTableData());
ptr = destroyed_actor_iter->second.get();
}
}

if (ptr != nullptr) {
*reply->mutable_actor_table_data() = ptr->GetActorTableData();
}

RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand All @@ -332,7 +335,6 @@ void GcsActorManager::HandleGetAllActorInfo(rpc::GetAllActorInfoRequest request,
auto limit = request.has_limit() ? request.limit() : -1;
RAY_LOG(DEBUG) << "Getting all actor info.";
++counts_[CountType::GET_ALL_ACTOR_INFO_REQUEST];

if (request.show_dead_jobs() == false) {
auto total_actors = registered_actors_.size() + destroyed_actors_.size();
reply->set_total(total_actors);
Expand All @@ -343,18 +345,15 @@ void GcsActorManager::HandleGetAllActorInfo(rpc::GetAllActorInfoRequest request,
break;
}
count += 1;

reply->mutable_actor_table_data()->UnsafeArenaAddAllocated(
const_cast<rpc::ActorTableData *>(iter.second->GetMutableActorTableData()));
*reply->add_actor_table_data() = iter.second->GetActorTableData();
}

for (const auto &iter : destroyed_actors_) {
if (limit != -1 && count >= limit) {
break;
}
count += 1;

reply->mutable_actor_table_data()->UnsafeArenaAddAllocated(
const_cast<rpc::ActorTableData *>(iter.second->GetMutableActorTableData()));
*reply->add_actor_table_data() = iter.second->GetActorTableData();
}
RAY_LOG(DEBUG) << "Finished getting all actor info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand All @@ -366,12 +365,16 @@ void GcsActorManager::HandleGetAllActorInfo(rpc::GetAllActorInfoRequest request,
// jobs, so fetch it from redis.
Status status = gcs_table_storage_->ActorTable().GetAll(
[reply, send_reply_callback, limit](
const absl::flat_hash_map<ActorID, rpc::ActorTableData> &result) {
absl::flat_hash_map<ActorID, rpc::ActorTableData> &&result) {
auto total_actors = result.size();
reply->set_total(total_actors);

reply->set_total(total_actors);
auto arena = reply->GetArena();
RAY_CHECK(arena != nullptr);
auto ptr = google::protobuf::Arena::Create<
absl::flat_hash_map<ActorID, rpc::ActorTableData>>(arena, std::move(result));
auto count = 0;
for (const auto &pair : result) {
for (const auto &pair : *ptr) {
if (limit != -1 && count >= limit) {
break;
}
Expand Down Expand Up @@ -411,9 +414,8 @@ void GcsActorManager::HandleGetNamedActorInfo(
RAY_LOG(WARNING) << stream.str();
status = Status::NotFound(stream.str());
} else {
reply->unsafe_arena_set_allocated_actor_table_data(
iter->second->GetMutableActorTableData());
reply->unsafe_arena_set_allocated_task_spec(iter->second->GetMutableTaskSpec());
*reply->mutable_actor_table_data() = iter->second->GetActorTableData();
*reply->mutable_task_spec() = *iter->second->GetMutableTaskSpec();
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
// The request will be sent when call send_reply_callback and after that, reply will
// not be used any more. But entry is still valid.
for (const auto &entry : alive_nodes_) {
reply->mutable_node_info_list()->UnsafeArenaAddAllocated(entry.second.get());
*reply->add_node_info_list() = *entry.second;
}
for (const auto &entry : dead_nodes_) {
reply->mutable_node_info_list()->UnsafeArenaAddAllocated(entry.second.get());
*reply->add_node_info_list() = *entry.second;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ int main(int argc, char *argv[]) {
auto handler = [&main_service, &gcs_server](const boost::system::error_code &error,
int signal_number) {
RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down...";
main_service.stop();
ray::rpc::DrainAndResetServerCallExecutor();
gcs_server.Stop();
ray::stats::Shutdown();
main_service.stop();
};
boost::asio::signal_set signals(main_service);
#ifdef _WIN32
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ class GcsServerTest : public ::testing::Test {
}

void TearDown() override {
gcs_server_->Stop();
io_service_.stop();
rpc::DrainAndResetServerCallExecutor();
gcs_server_->Stop();
thread_io_service_->join();
gcs_server_.reset();
}
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,7 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request,
return;
}
auto shutdown_after_reply = []() {
rpc::DrainAndResetServerCallExecutor();
// Note that the callback is posted to the io service after the shutdown GRPC request
// is replied. Otherwise, the RPC might not be replied to GCS before it shutsdown
// itself. Implementation note: When raylet is shutdown by ray stop, the CLI sends a
Expand Down
19 changes: 18 additions & 1 deletion src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ GrpcServer::GrpcServer(std::string name,
grpc::channelz::experimental::InitChannelzService();
}

void GrpcServer::Shutdown() {
if (!is_closed_) {
// Drain the executor threads.
// Shutdown the server with an immediate deadline.
// TODO(edoakes): do we want to do this in all cases?
server_->Shutdown(gpr_now(GPR_CLOCK_REALTIME));
for (const auto &cq : cqs_) {
cq->Shutdown();
}
for (auto &polling_thread : polling_threads_) {
polling_thread.join();
}
is_closed_ = true;
RAY_LOG(DEBUG) << "gRPC server of " << name_ << " shutdown.";
server_.reset();
}
}

void GrpcServer::Run() {
uint32_t specified_port = port_;
std::string server_address((listen_to_localhost_only_ ? "127.0.0.1:" : "0.0.0.0:") +
Expand Down Expand Up @@ -171,7 +189,6 @@ void GrpcServer::PollEventsFromCompletionQueue(int index) {
case ServerCallState::PENDING:
// We've received a new incoming request. Now this call object is used to
// track this request.
server_call->SetState(ServerCallState::PROCESSING);
server_call->HandleRequest();
break;
case ServerCallState::SENDING_REPLY:
Expand Down
17 changes: 1 addition & 16 deletions src/ray/rpc/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,7 @@ class GrpcServer {
void Run();

// Shutdown this server
void Shutdown() {
if (!is_closed_) {
// Shutdown the server with an immediate deadline.
// TODO(edoakes): do we want to do this in all cases?
server_->Shutdown(gpr_now(GPR_CLOCK_REALTIME));
for (const auto &cq : cqs_) {
cq->Shutdown();
}
for (auto &polling_thread : polling_threads_) {
polling_thread.join();
}
is_closed_ = true;
RAY_LOG(DEBUG) << "gRPC server of " << name_ << " shutdown.";
server_.reset();
}
}
void Shutdown();

/// Get the port of this gRPC server.
int GetPort() const { return port_; }
Expand Down
40 changes: 40 additions & 0 deletions src/ray/rpc/server_call.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/rpc/server_call.h"

#include "ray/common/ray_config.h"

namespace ray {
namespace rpc {
namespace {

std::unique_ptr<boost::asio::thread_pool> &_GetServerCallExecutor() {
static auto thread_pool = std::make_unique<boost::asio::thread_pool>(
::RayConfig::instance().num_server_call_thread());
return thread_pool;
}

} // namespace

boost::asio::thread_pool &GetServerCallExecutor() { return *_GetServerCallExecutor(); }

void DrainAndResetServerCallExecutor() {
GetServerCallExecutor().join();
_GetServerCallExecutor() = std::make_unique<boost::asio::thread_pool>(
::RayConfig::instance().num_server_call_thread());
}

} // namespace rpc
} // namespace ray
Loading

0 comments on commit b79e5b0

Please sign in to comment.