From 707bd61e1928ab99cc456cf31b3f8069a0bb207a Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Tue, 30 Aug 2022 16:38:58 +0800 Subject: [PATCH 1/9] Adapt gcs scheduler Signed-off-by: Chong-Li --- dashboard/datacenter.py | 9 ++ dashboard/modules/node/node_head.py | 19 ++++ python/ray/autoscaler/_private/monitor.py | 6 ++ src/ray/gcs/gcs_server/gcs_node_manager.cc | 14 ++- src/ray/gcs/gcs_server/gcs_node_manager.h | 12 ++- .../gcs/gcs_server/gcs_resource_manager.cc | 94 +++++++++++-------- src/ray/gcs/gcs_server/gcs_server.cc | 2 +- src/ray/protobuf/gcs_service.proto | 7 ++ 8 files changed, 119 insertions(+), 44 deletions(-) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 2ba3655e071b..7e3e09d72a04 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -54,6 +54,9 @@ class DataSource: # {node ip (str): error entries by pid # (dict from pid to list of latest err entries)} ip_and_pid_to_errors = Dict() + # The stats (infeasible and pending tasks) of gcs. + # {task type(str): task list} + gcs_stats = Dict() class DataOrganizer: @@ -275,6 +278,7 @@ async def _get_actor(actor): @classmethod async def get_actor_creation_tasks(cls): + # Collect infeasible tasks in worker nodes. infeasible_tasks = sum( ( list(node_stats.get("infeasibleTasks", [])) @@ -282,6 +286,8 @@ async def get_actor_creation_tasks(cls): ), [], ) + # Collect infeasible tasks in gcs. + infeasible_tasks.extend(list(DataSource.gcs_stats.get("infeasibleTasks", []))) new_infeasible_tasks = [] for task in infeasible_tasks: task = dict(task) @@ -289,6 +295,7 @@ async def get_actor_creation_tasks(cls): task["state"] = "INFEASIBLE" new_infeasible_tasks.append(task) + # Collect pending tasks in worker nodes. resource_pending_tasks = sum( ( list(data.get("readyTasks", [])) @@ -296,6 +303,8 @@ async def get_actor_creation_tasks(cls): ), [], ) + # Collect pending tasks in gcs. + resource_pending_tasks.extend(list(DataSource.gcs_stats.get("readyTasks", []))) new_resource_pending_tasks = [] for task in resource_pending_tasks: task = dict(task) diff --git a/dashboard/modules/node/node_head.py b/dashboard/modules/node/node_head.py index 6d1531611a6e..a410c5fc44b5 100644 --- a/dashboard/modules/node/node_head.py +++ b/dashboard/modules/node/node_head.py @@ -38,6 +38,21 @@ def gcs_node_info_to_dict(message): ) +def gcs_stats_to_dict(message): + decode_keys = { + "actorId", + "jobId", + "taskId", + "parentTaskId", + "sourceActorId", + "callerId", + "rayletId", + "workerId", + "placementGroupId", + } + return dashboard_utils.message_to_dict(message, decode_keys) + + def node_stats_to_dict(message): decode_keys = { "actorId", @@ -117,6 +132,10 @@ async def _get_nodes(self): request = gcs_service_pb2.GetAllNodeInfoRequest() reply = await self._gcs_node_info_stub.GetAllNodeInfo(request, timeout=2) if reply.status.code == 0: + # Update gcs stats (include ready and infeasible tasks). + DataSource.gcs_stats = gcs_stats_to_dict(reply.gcs_stats) + + # Prepare info of worker nodes. result = {} for node_info in reply.node_info_list: node_info_dict = gcs_node_info_to_dict(node_info) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index d2905945488d..a9621a84025f 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -253,6 +253,12 @@ def update_load_metrics(self): mirror_node_types = {} cluster_full = False + if ( + hasattr(response, "cluster_full_of_actors_detected_by_gcs") + and response.cluster_full_of_actors_detected_by_gcs + ): + # GCS has detected the cluster full of actors. + cluster_full = True for resource_message in resources_batch_data.batch: node_id = resource_message.node_id # Generate node type config based on GCS reported node list. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 546aa8f16eae..529e8bdce0d5 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -30,10 +30,12 @@ namespace gcs { GcsNodeManager::GcsNodeManager( std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, - std::shared_ptr raylet_client_pool) + std::shared_ptr raylet_client_pool, + std::shared_ptr cluster_task_manager) : gcs_publisher_(std::move(gcs_publisher)), gcs_table_storage_(std::move(gcs_table_storage)), - raylet_client_pool_(std::move(raylet_client_pool)) {} + raylet_client_pool_(std::move(raylet_client_pool)), + cluster_task_manager_(std::move(cluster_task_manager)) {} void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, rpc::RegisterNodeReply *reply, @@ -137,6 +139,14 @@ void GcsNodeManager::DrainNode(const NodeID &node_id) { void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &request, rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (cluster_task_manager_) { + // Fill pending queue info of gcs. + rpc::GetNodeStatsReply node_stats; + cluster_task_manager_->FillPendingActorInfo(&node_stats); + reply->mutable_gcs_stats()->mutable_infeasible_tasks()->CopyFrom( + node_stats.infeasible_tasks()); + reply->mutable_gcs_stats()->mutable_ready_tasks()->CopyFrom(node_stats.ready_tasks()); + } // Here the unsafe allocate is safe here, because entry.second's life cycle is longer // then reply. // The request will be sent when call send_reply_callback and after that, reply will diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 0f6419173876..1126bb5364d2 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -21,6 +21,7 @@ #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" +#include "ray/raylet/scheduling/cluster_task_manager.h" #include "ray/rpc/client_call.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client.h" @@ -28,6 +29,7 @@ #include "src/ray/protobuf/gcs.pb.h" namespace ray { +using raylet::ClusterTaskManager; namespace gcs { /// GcsNodeManager is responsible for managing and monitoring nodes as well as handing @@ -39,9 +41,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - explicit GcsNodeManager(std::shared_ptr gcs_publisher, - std::shared_ptr gcs_table_storage, - std::shared_ptr raylet_client_pool); + explicit GcsNodeManager( + std::shared_ptr gcs_publisher, + std::shared_ptr gcs_table_storage, + std::shared_ptr raylet_client_pool, + std::shared_ptr cluster_task_manager = nullptr); /// Handle register rpc request come from raylet. void HandleRegisterNode(const rpc::RegisterNodeRequest &request, @@ -159,6 +163,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { CountType_MAX = 4, }; uint64_t counts_[CountType::CountType_MAX] = {0}; + + std::shared_ptr cluster_task_manager_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index ae92e31da87c..0762d4dd136f 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -93,15 +93,16 @@ void GcsResourceManager::HandleGetAllAvailableResources( if (using_resource_reports) { auto resource_iter = node_resource_usages_[node_id].resources_available().find(resource_name); - if (resource_iter != node_resource_usages_[node_id].resources_available().end()) { + if (resource_iter != node_resource_usages_[node_id].resources_available().end() && + resource_iter->second > 0) { resource.mutable_resources_available()->insert( {resource_name, resource_iter->second}); - continue; } + } else { + const auto &resource_value = node_resources.available.Get(resource_id); + resource.mutable_resources_available()->insert( + {resource_name, resource_value.Double()}); } - const auto &resource_value = node_resources.available.Get(resource_id); - resource.mutable_resources_available()->insert( - {resource_name, resource_value.Double()}); } reply->add_resources_list()->CopyFrom(resource); } @@ -111,18 +112,21 @@ void GcsResourceManager::HandleGetAllAvailableResources( void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) { NodeID node_id = NodeID::FromBinary(data.node_id()); - if (RayConfig::instance().gcs_actor_scheduling_enabled()) { - UpdateNodeNormalTaskResources(node_id, data); - } else { - if (!cluster_resource_manager_.UpdateNodeAvailableResourcesIfExist( - scheduling::NodeID(node_id.Binary()), data)) { - RAY_LOG(INFO) - << "[UpdateFromResourceReport]: received resource usage from unknown node id " - << node_id; + // Only need to update worker nodes' resource usage. + if (node_id != local_node_id_) { + if (RayConfig::instance().gcs_actor_scheduling_enabled()) { + UpdateNodeNormalTaskResources(node_id, data); + } else { + if (!cluster_resource_manager_.UpdateNodeAvailableResourcesIfExist( + scheduling::NodeID(node_id.Binary()), data)) { + RAY_LOG(INFO) + << "[UpdateFromResourceReport]: received resource usage from unknown node id " + << node_id; + } } - } - UpdateNodeResourceUsage(node_id, data); + UpdateNodeResourceUsage(node_id, data); + } } void GcsResourceManager::UpdateResourceLoads(const rpc::ResourcesData &data) { @@ -148,53 +152,67 @@ void GcsResourceManager::HandleReportResourceUsage( ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; } +void FillAggregateLoad(const rpc::ResourcesData &resources_data, + std::unordered_map, + rpc::ResourceDemand> *aggregate_load) { + auto load = resources_data.resource_load_by_shape(); + for (const auto &demand : load.resource_demands()) { + auto &aggregate_demand = (*aggregate_load)[demand.shape()]; + aggregate_demand.set_num_ready_requests_queued( + aggregate_demand.num_ready_requests_queued() + + demand.num_ready_requests_queued()); + aggregate_demand.set_num_infeasible_requests_queued( + aggregate_demand.num_infeasible_requests_queued() + + demand.num_infeasible_requests_queued()); + aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() + + demand.backlog_size()); + } +} + void GcsResourceManager::HandleGetAllResourceUsage( const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply, rpc::SendReplyCallback send_reply_callback) { - if (cluster_task_manager_ && RayConfig::instance().gcs_actor_scheduling_enabled()) { - rpc::ResourcesData resources_data; - cluster_task_manager_->FillPendingActorInfo(resources_data); - node_resource_usages_[local_node_id_].CopyFrom(resources_data); - } if (!node_resource_usages_.empty()) { - auto batch = std::make_shared(); + rpc::ResourceUsageBatchData batch; std::unordered_map, rpc::ResourceDemand> aggregate_load; + for (const auto &usage : node_resource_usages_) { // Aggregate the load reported by each raylet. - auto load = usage.second.resource_load_by_shape(); - for (const auto &demand : load.resource_demands()) { - auto &aggregate_demand = aggregate_load[demand.shape()]; - aggregate_demand.set_num_ready_requests_queued( - aggregate_demand.num_ready_requests_queued() + - demand.num_ready_requests_queued()); - aggregate_demand.set_num_infeasible_requests_queued( - aggregate_demand.num_infeasible_requests_queued() + - demand.num_infeasible_requests_queued()); - aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() + - demand.backlog_size()); - } + FillAggregateLoad(usage.second, &aggregate_load); + batch.add_batch()->CopyFrom(usage.second); + } - batch->add_batch()->CopyFrom(usage.second); + if (cluster_task_manager_) { + rpc::ResourcesData gcs_resources_data; + cluster_task_manager_->FillPendingActorInfo(gcs_resources_data); + // Aggregate the load (pending info) of gcs. + FillAggregateLoad(gcs_resources_data, &aggregate_load); + // We only export gcs's pending info without adding the corresponding + // `ResourcesData` to the `batch` list. So if gcs has detected cluster full of + // actors, set the dedicated field in reply. + if (gcs_resources_data.cluster_full_of_actors_detected()) { + reply->set_cluster_full_of_actors_detected_by_gcs(true); + } } for (const auto &demand : aggregate_load) { - auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands(); + auto demand_proto = batch.mutable_resource_load_by_shape()->add_resource_demands(); demand_proto->CopyFrom(demand.second); for (const auto &resource_pair : demand.first) { (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; } } - // Update placement group load to heartbeat batch. // This is updated only one per second. if (placement_group_load_.has_value()) { auto placement_group_load = placement_group_load_.value(); - auto placement_group_load_proto = batch->mutable_placement_group_load(); + auto placement_group_load_proto = batch.mutable_placement_group_load(); placement_group_load_proto->CopyFrom(*placement_group_load.get()); } - reply->mutable_resource_usage_data()->CopyFrom(*batch); + + reply->mutable_resource_usage_data()->CopyFrom(batch); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 2c2e2bfdce73..96fe52da3842 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -227,7 +227,7 @@ void GcsServer::Stop() { void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_publisher_); gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_); + gcs_publisher_, gcs_table_storage_, raylet_client_pool_, cluster_task_manager_); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index ef3e91ebdfb9..42bc23a02ee6 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -175,6 +175,12 @@ message GetAllNodeInfoRequest {} message GetAllNodeInfoReply { GcsStatus status = 1; repeated GcsNodeInfo node_info_list = 2; + GcsStats gcs_stats = 3; +} + +message GcsStats { + repeated TaskSpec infeasible_tasks = 1; + repeated TaskSpec ready_tasks = 2; } message ReportHeartbeatRequest { @@ -583,6 +589,7 @@ message GetAllResourceUsageRequest {} message GetAllResourceUsageReply { GcsStatus status = 1; ResourceUsageBatchData resource_usage_data = 2; + bool cluster_full_of_actors_detected_by_gcs = 3; } // Service for node resource info access. From ba9f4520a2ee6fb7b35ce78ba6a078d41e24cd71 Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Mon, 5 Sep 2022 18:10:36 +0800 Subject: [PATCH 2/9] Some fix Signed-off-by: Chong-Li --- dashboard/datacenter.py | 14 ++++--- dashboard/modules/node/node_head.py | 6 +-- src/ray/gcs/gcs_server/gcs_node_manager.cc | 6 +-- src/ray/gcs/gcs_server/gcs_node_manager.h | 3 ++ .../gcs/gcs_server/gcs_resource_manager.cc | 38 ++++++++++--------- src/ray/gcs/gcs_server/gcs_resource_manager.h | 8 ++++ src/ray/gcs/gcs_server/gcs_server.cc | 9 ++++- src/ray/protobuf/gcs_service.proto | 7 +++- 8 files changed, 58 insertions(+), 33 deletions(-) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 7e3e09d72a04..cebefd027254 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -54,9 +54,11 @@ class DataSource: # {node ip (str): error entries by pid # (dict from pid to list of latest err entries)} ip_and_pid_to_errors = Dict() - # The stats (infeasible and pending tasks) of gcs. + # The (infeasible and pending actor creation tasks) info of gcs. # {task type(str): task list} - gcs_stats = Dict() + # Note, this field is only updated with gcs actor scheduler + # is enabled. + gcs_info = Dict() class DataOrganizer: @@ -286,8 +288,8 @@ async def get_actor_creation_tasks(cls): ), [], ) - # Collect infeasible tasks in gcs. - infeasible_tasks.extend(list(DataSource.gcs_stats.get("infeasibleTasks", []))) + # Collect infeasible actor creation tasks in gcs. + infeasible_tasks.extend(list(DataSource.gcs_info.get("infeasibleTasks", []))) new_infeasible_tasks = [] for task in infeasible_tasks: task = dict(task) @@ -303,8 +305,8 @@ async def get_actor_creation_tasks(cls): ), [], ) - # Collect pending tasks in gcs. - resource_pending_tasks.extend(list(DataSource.gcs_stats.get("readyTasks", []))) + # Collect pending actor creation tasks in gcs. + resource_pending_tasks.extend(list(DataSource.gcs_info.get("readyTasks", []))) new_resource_pending_tasks = [] for task in resource_pending_tasks: task = dict(task) diff --git a/dashboard/modules/node/node_head.py b/dashboard/modules/node/node_head.py index a410c5fc44b5..6baad36b17c7 100644 --- a/dashboard/modules/node/node_head.py +++ b/dashboard/modules/node/node_head.py @@ -38,7 +38,7 @@ def gcs_node_info_to_dict(message): ) -def gcs_stats_to_dict(message): +def gcs_info_to_dict(message): decode_keys = { "actorId", "jobId", @@ -132,8 +132,8 @@ async def _get_nodes(self): request = gcs_service_pb2.GetAllNodeInfoRequest() reply = await self._gcs_node_info_stub.GetAllNodeInfo(request, timeout=2) if reply.status.code == 0: - # Update gcs stats (include ready and infeasible tasks). - DataSource.gcs_stats = gcs_stats_to_dict(reply.gcs_stats) + # Update (infeasible and pending actor creation tasks) info of gcs. + DataSource.gcs_info = gcs_info_to_dict(reply.gcs_info) # Prepare info of worker nodes. result = {} diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 529e8bdce0d5..791d5c0274a6 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -140,12 +140,12 @@ void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &requ rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback) { if (cluster_task_manager_) { - // Fill pending queue info of gcs. + // Fill pending queue info of gcs when gcs actor scheduler is enabled. rpc::GetNodeStatsReply node_stats; cluster_task_manager_->FillPendingActorInfo(&node_stats); - reply->mutable_gcs_stats()->mutable_infeasible_tasks()->CopyFrom( + reply->mutable_gcs_info()->mutable_infeasible_tasks()->CopyFrom( node_stats.infeasible_tasks()); - reply->mutable_gcs_stats()->mutable_ready_tasks()->CopyFrom(node_stats.ready_tasks()); + reply->mutable_gcs_info()->mutable_ready_tasks()->CopyFrom(node_stats.ready_tasks()); } // Here the unsafe allocate is safe here, because entry.second's life cycle is longer // then reply. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 1126bb5364d2..ce1428400bfb 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -41,6 +41,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. + /// \param raylet_client_pool The pool of raylet clients for RPC communication. + /// \param cluster_task_manager The gcs server's `ClusterTaskManager`. Note, this + /// parameter is only configured when gcs actor scheduler is enabled. explicit GcsNodeManager( std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 0762d4dd136f..6a07e2d211ae 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -112,21 +112,24 @@ void GcsResourceManager::HandleGetAllAvailableResources( void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) { NodeID node_id = NodeID::FromBinary(data.node_id()); - // Only need to update worker nodes' resource usage. - if (node_id != local_node_id_) { - if (RayConfig::instance().gcs_actor_scheduling_enabled()) { - UpdateNodeNormalTaskResources(node_id, data); - } else { - if (!cluster_resource_manager_.UpdateNodeAvailableResourcesIfExist( - scheduling::NodeID(node_id.Binary()), data)) { - RAY_LOG(INFO) - << "[UpdateFromResourceReport]: received resource usage from unknown node id " - << node_id; - } - } + // We only need to update worker nodes' resource usage. Gcs node ifself does not + // execute any tasks so its report can be ignored. + if (node_id == local_node_id_) { + return; + } - UpdateNodeResourceUsage(node_id, data); + if (RayConfig::instance().gcs_actor_scheduling_enabled()) { + UpdateNodeNormalTaskResources(node_id, data); + } else { + if (!cluster_resource_manager_.UpdateNodeAvailableResourcesIfExist( + scheduling::NodeID(node_id.Binary()), data)) { + RAY_LOG(INFO) + << "[UpdateFromResourceReport]: received resource usage from unknown node id " + << node_id; + } } + + UpdateNodeResourceUsage(node_id, data); } void GcsResourceManager::UpdateResourceLoads(const rpc::ResourcesData &data) { @@ -152,9 +155,10 @@ void GcsResourceManager::HandleReportResourceUsage( ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; } -void FillAggregateLoad(const rpc::ResourcesData &resources_data, - std::unordered_map, - rpc::ResourceDemand> *aggregate_load) { +void GcsResourceManager::FillAggregateLoad( + const rpc::ResourcesData &resources_data, + std::unordered_map, rpc::ResourceDemand> + *aggregate_load) { auto load = resources_data.resource_load_by_shape(); for (const auto &demand : load.resource_demands()) { auto &aggregate_demand = (*aggregate_load)[demand.shape()]; @@ -187,7 +191,7 @@ void GcsResourceManager::HandleGetAllResourceUsage( if (cluster_task_manager_) { rpc::ResourcesData gcs_resources_data; cluster_task_manager_->FillPendingActorInfo(gcs_resources_data); - // Aggregate the load (pending info) of gcs. + // Aggregate the load (pending actor info) of gcs. FillAggregateLoad(gcs_resources_data, &aggregate_load); // We only export gcs's pending info without adding the corresponding // `ResourcesData` to the `batch` list. So if gcs has detected cluster full of diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index ebbe55e211b9..312462d16fae 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -134,6 +134,14 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, void UpdateResourceLoads(const rpc::ResourcesData &data); private: + /// Aggregate nodes' pending task info. + /// + /// \param resources_data A node's pending task info (by shape). + /// \param aggregate_load[out] The aggregate pending task info (across the cluster). + void FillAggregateLoad(const rpc::ResourcesData &resources_data, + std::unordered_map, + rpc::ResourceDemand> *aggregate_load); + /// io context. This is to ensure thread safety. Ideally, all public /// funciton needs to post job to this io_context. instrumented_io_context &io_context_; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 96fe52da3842..4958401de946 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -226,8 +226,13 @@ void GcsServer::Stop() { void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_publisher_); - gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_, cluster_task_manager_); + if (RayConfig::instance().gcs_actor_scheduling_enabled()) { + gcs_node_manager_ = std::make_shared( + gcs_publisher_, gcs_table_storage_, raylet_client_pool_, cluster_task_manager_); + } else { + gcs_node_manager_ = std::make_shared( + gcs_publisher_, gcs_table_storage_, raylet_client_pool_); + } // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 42bc23a02ee6..eb701a9dd2fa 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -175,10 +175,10 @@ message GetAllNodeInfoRequest {} message GetAllNodeInfoReply { GcsStatus status = 1; repeated GcsNodeInfo node_info_list = 2; - GcsStats gcs_stats = 3; + GcsInfo gcs_info = 3; } -message GcsStats { +message GcsInfo { repeated TaskSpec infeasible_tasks = 1; repeated TaskSpec ready_tasks = 2; } @@ -589,6 +589,9 @@ message GetAllResourceUsageRequest {} message GetAllResourceUsageReply { GcsStatus status = 1; ResourceUsageBatchData resource_usage_data = 2; + /// True if gcs finds infeasible or pending actor creation tasks + /// locally (when gcs actor scheduler is enabled). This field is + /// expected to help triggering auto-scaling. bool cluster_full_of_actors_detected_by_gcs = 3; } From c55a7a19994b50f8121a69f090c892ddbe8a90dd Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Mon, 5 Sep 2022 20:35:32 +0800 Subject: [PATCH 3/9] Add in-line comments Signed-off-by: Chong-Li --- src/ray/gcs/gcs_server/gcs_resource_manager.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 6a07e2d211ae..08db7d0a8f46 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -189,6 +189,7 @@ void GcsResourceManager::HandleGetAllResourceUsage( } if (cluster_task_manager_) { + // Fill the gcs info when gcs actor scheduler is enabled. rpc::ResourcesData gcs_resources_data; cluster_task_manager_->FillPendingActorInfo(gcs_resources_data); // Aggregate the load (pending actor info) of gcs. From da88560fd5613fa7d55a2280555d4b69ab51f534 Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Wed, 7 Sep 2022 15:32:48 +0800 Subject: [PATCH 4/9] Use a seperate RPC to get gcs stats Signed-off-by: Chong-Li --- dashboard/datacenter.py | 10 ++++----- dashboard/modules/node/node_head.py | 22 ++++++++++++++----- src/ray/gcs/gcs_server/gcs_node_manager.cc | 14 ++---------- src/ray/gcs/gcs_server/gcs_node_manager.h | 14 ++++-------- .../gcs/gcs_server/gcs_resource_manager.cc | 13 +++++++++++ src/ray/gcs/gcs_server/gcs_resource_manager.h | 5 +++++ src/ray/gcs/gcs_server/gcs_server.cc | 9 ++------ src/ray/protobuf/gcs_service.proto | 16 +++++++++----- src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 +++++ 9 files changed, 62 insertions(+), 46 deletions(-) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index cebefd027254..259ef8b68b21 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -54,11 +54,9 @@ class DataSource: # {node ip (str): error entries by pid # (dict from pid to list of latest err entries)} ip_and_pid_to_errors = Dict() - # The (infeasible and pending actor creation tasks) info of gcs. + # The current stats (e.g., pending actor creation tasks) of gcs. # {task type(str): task list} - # Note, this field is only updated with gcs actor scheduler - # is enabled. - gcs_info = Dict() + gcs_stats = Dict() class DataOrganizer: @@ -289,7 +287,7 @@ async def get_actor_creation_tasks(cls): [], ) # Collect infeasible actor creation tasks in gcs. - infeasible_tasks.extend(list(DataSource.gcs_info.get("infeasibleTasks", []))) + infeasible_tasks.extend(list(DataSource.gcs_stats.get("infeasibleTasks", []))) new_infeasible_tasks = [] for task in infeasible_tasks: task = dict(task) @@ -306,7 +304,7 @@ async def get_actor_creation_tasks(cls): [], ) # Collect pending actor creation tasks in gcs. - resource_pending_tasks.extend(list(DataSource.gcs_info.get("readyTasks", []))) + resource_pending_tasks.extend(list(DataSource.gcs_stats.get("readyTasks", []))) new_resource_pending_tasks = [] for task in resource_pending_tasks: task = dict(task) diff --git a/dashboard/modules/node/node_head.py b/dashboard/modules/node/node_head.py index 6baad36b17c7..b7c46e607c0b 100644 --- a/dashboard/modules/node/node_head.py +++ b/dashboard/modules/node/node_head.py @@ -38,7 +38,7 @@ def gcs_node_info_to_dict(message): ) -def gcs_info_to_dict(message): +def gcs_stats_to_dict(message): decode_keys = { "actorId", "jobId", @@ -86,6 +86,8 @@ def __init__(self, dashboard_head): self._stubs = {} # NodeInfoGcsService self._gcs_node_info_stub = None + # NodeResourceInfoGcsService + self._gcs_node_resource_info_sub = None self._collect_memory_info = False DataSource.nodes.signal.append(self._update_stubs) # Total number of node updates happened. @@ -132,10 +134,6 @@ async def _get_nodes(self): request = gcs_service_pb2.GetAllNodeInfoRequest() reply = await self._gcs_node_info_stub.GetAllNodeInfo(request, timeout=2) if reply.status.code == 0: - # Update (infeasible and pending actor creation tasks) info of gcs. - DataSource.gcs_info = gcs_info_to_dict(reply.gcs_info) - - # Prepare info of worker nodes. result = {} for node_info in reply.node_info_list: node_info_dict = gcs_node_info_to_dict(node_info) @@ -327,6 +325,17 @@ async def _update_node_stats(self): except Exception: logger.exception(f"Error updating node stats of {node_id}.") + # Update stats (e.g., pending actor creation tasks) of gcs. + try: + reply = await self._gcs_node_resource_info_stub.GetGcsStats( + gcs_service_pb2.GetGcsStatsRequest(), + timeout=2, + ) + if reply.status.code == 0: + DataSource.gcs_stats = gcs_stats_to_dict(reply) + except Exception: + logger.exception("Error updating gcs stats.") + async def _update_log_info(self): if ray_constants.DISABLE_DASHBOARD_LOG_INFO: return @@ -398,6 +407,9 @@ async def run(self, server): self._gcs_node_info_stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub( gcs_channel ) + self._gcs_node_resource_info_stub = ( + gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel) + ) await asyncio.gather( self._update_nodes(), diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 791d5c0274a6..546aa8f16eae 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -30,12 +30,10 @@ namespace gcs { GcsNodeManager::GcsNodeManager( std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, - std::shared_ptr raylet_client_pool, - std::shared_ptr cluster_task_manager) + std::shared_ptr raylet_client_pool) : gcs_publisher_(std::move(gcs_publisher)), gcs_table_storage_(std::move(gcs_table_storage)), - raylet_client_pool_(std::move(raylet_client_pool)), - cluster_task_manager_(std::move(cluster_task_manager)) {} + raylet_client_pool_(std::move(raylet_client_pool)) {} void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, rpc::RegisterNodeReply *reply, @@ -139,14 +137,6 @@ void GcsNodeManager::DrainNode(const NodeID &node_id) { void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &request, rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback) { - if (cluster_task_manager_) { - // Fill pending queue info of gcs when gcs actor scheduler is enabled. - rpc::GetNodeStatsReply node_stats; - cluster_task_manager_->FillPendingActorInfo(&node_stats); - reply->mutable_gcs_info()->mutable_infeasible_tasks()->CopyFrom( - node_stats.infeasible_tasks()); - reply->mutable_gcs_info()->mutable_ready_tasks()->CopyFrom(node_stats.ready_tasks()); - } // Here the unsafe allocate is safe here, because entry.second's life cycle is longer // then reply. // The request will be sent when call send_reply_callback and after that, reply will diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index ce1428400bfb..9eacd46f4d02 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -21,7 +21,6 @@ #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" -#include "ray/raylet/scheduling/cluster_task_manager.h" #include "ray/rpc/client_call.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client.h" @@ -29,7 +28,6 @@ #include "src/ray/protobuf/gcs.pb.h" namespace ray { -using raylet::ClusterTaskManager; namespace gcs { /// GcsNodeManager is responsible for managing and monitoring nodes as well as handing @@ -41,14 +39,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - /// \param raylet_client_pool The pool of raylet clients for RPC communication. - /// \param cluster_task_manager The gcs server's `ClusterTaskManager`. Note, this - /// parameter is only configured when gcs actor scheduler is enabled. - explicit GcsNodeManager( - std::shared_ptr gcs_publisher, - std::shared_ptr gcs_table_storage, - std::shared_ptr raylet_client_pool, - std::shared_ptr cluster_task_manager = nullptr); + /// we get pending task info if needed. + explicit GcsNodeManager(std::shared_ptr gcs_publisher, + std::shared_ptr gcs_table_storage, + std::shared_ptr raylet_client_pool); /// Handle register rpc request come from raylet. void HandleRegisterNode(const rpc::RegisterNodeRequest &request, diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 08db7d0a8f46..ff9ad2516c13 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -224,6 +224,19 @@ void GcsResourceManager::HandleGetAllResourceUsage( ++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; } +void GcsResourceManager::HandleGetGcsStats(const rpc::GetGcsStatsRequest &request, + rpc::GetGcsStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + if (cluster_task_manager_) { + // Fill pending (actor creation) tasks of gcs when gcs actor scheduler is enabled. + rpc::GetNodeStatsReply gcs_stats; + cluster_task_manager_->FillPendingActorInfo(&gcs_stats); + reply->mutable_infeasible_tasks()->CopyFrom(gcs_stats.infeasible_tasks()); + reply->mutable_ready_tasks()->CopyFrom(gcs_stats.ready_tasks()); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); +} + void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id, const rpc::ResourcesData &resources) { auto iter = node_resource_usages_.find(node_id); diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index 312462d16fae..f51dfe0ae494 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -83,6 +83,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, rpc::GetAllResourceUsageReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle get gcs stats rpc request. + void HandleGetGcsStats(const rpc::GetGcsStatsRequest &request, + rpc::GetGcsStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Handle a node registration. /// /// \param node The specified node to add. diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 4958401de946..2c2e2bfdce73 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -226,13 +226,8 @@ void GcsServer::Stop() { void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_publisher_); - if (RayConfig::instance().gcs_actor_scheduling_enabled()) { - gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_, cluster_task_manager_); - } else { - gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_); - } + gcs_node_manager_ = std::make_shared( + gcs_publisher_, gcs_table_storage_, raylet_client_pool_); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index eb701a9dd2fa..cab2c476ad53 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -175,12 +175,6 @@ message GetAllNodeInfoRequest {} message GetAllNodeInfoReply { GcsStatus status = 1; repeated GcsNodeInfo node_info_list = 2; - GcsInfo gcs_info = 3; -} - -message GcsInfo { - repeated TaskSpec infeasible_tasks = 1; - repeated TaskSpec ready_tasks = 2; } message ReportHeartbeatRequest { @@ -236,6 +230,14 @@ message ReportResourceUsageReply { GcsStatus status = 1; } +message GetGcsStatsRequest {} + +message GetGcsStatsReply { + GcsStatus status = 1; + repeated TaskSpec infeasible_tasks = 2; + repeated TaskSpec ready_tasks = 3; +} + // Service for heartbeat info access. service HeartbeatInfoGcsService { // Report heartbeat of a node to GCS Service. @@ -606,6 +608,8 @@ service NodeResourceInfoGcsService { rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); // Get resource usage of all nodes from GCS Service. rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); + // Get current stats (e.g., pending tasks) of gcs server. + rpc GetGcsStats(GetGcsStatsRequest) returns (GetGcsStatsReply); } message GcsStatus { diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 895b17a01633..5fb2840c946a 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -280,6 +280,10 @@ class NodeResourceInfoGcsServiceHandler { virtual void HandleGetAllResourceUsage(const GetAllResourceUsageRequest &request, GetAllResourceUsageReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetGcsStats(const GetGcsStatsRequest &request, + GetGcsStatsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeResourceInfoGcsService`. @@ -302,6 +306,7 @@ class NodeResourceInfoGrpcService : public GrpcService { NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllAvailableResources); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage); + NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetGcsStats); } private: From fca95d279931371b48039b7157a79c3ab4cea3bf Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Wed, 7 Sep 2022 15:41:48 +0800 Subject: [PATCH 5/9] Remove unnecessarities. Signed-off-by: Chong-Li --- src/ray/gcs/gcs_server/gcs_node_manager.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 9eacd46f4d02..0f6419173876 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -39,7 +39,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - /// we get pending task info if needed. explicit GcsNodeManager(std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, std::shared_ptr raylet_client_pool); @@ -160,8 +159,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { CountType_MAX = 4, }; uint64_t counts_[CountType::CountType_MAX] = {0}; - - std::shared_ptr cluster_task_manager_; }; } // namespace gcs From db89ccbb530e49ce3bade7f4ee2bc02c197ec95d Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Fri, 9 Sep 2022 12:49:35 +0800 Subject: [PATCH 6/9] Fix comment Signed-off-by: Chong-Li --- src/ray/gcs/gcs_server/gcs_resource_manager.cc | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index ff9ad2516c13..a7c95b1d1521 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -93,8 +93,7 @@ void GcsResourceManager::HandleGetAllAvailableResources( if (using_resource_reports) { auto resource_iter = node_resource_usages_[node_id].resources_available().find(resource_name); - if (resource_iter != node_resource_usages_[node_id].resources_available().end() && - resource_iter->second > 0) { + if (resource_iter != node_resource_usages_[node_id].resources_available().end()) { resource.mutable_resources_available()->insert( {resource_name, resource_iter->second}); } @@ -112,12 +111,6 @@ void GcsResourceManager::HandleGetAllAvailableResources( void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) { NodeID node_id = NodeID::FromBinary(data.node_id()); - // We only need to update worker nodes' resource usage. Gcs node ifself does not - // execute any tasks so its report can be ignored. - if (node_id == local_node_id_) { - return; - } - if (RayConfig::instance().gcs_actor_scheduling_enabled()) { UpdateNodeNormalTaskResources(node_id, data); } else { From dbfe7e043dab49bd27dc7ab1a5e2a50b3b100e2c Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Fri, 9 Sep 2022 13:00:19 +0800 Subject: [PATCH 7/9] Add comment Signed-off-by: Chong-Li --- python/ray/autoscaler/_private/monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index a9621a84025f..78a55c193e67 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -277,7 +277,7 @@ def update_load_metrics(self): hasattr(resource_message, "cluster_full_of_actors_detected") and resource_message.cluster_full_of_actors_detected ): - # Aggregate this flag across all batches. + # A worker node has detected the cluster full of actors. cluster_full = True resource_load = dict(resource_message.resource_load) total_resources = dict(resource_message.resources_total) From b73a86bb21d15f07ca76a62b96d21deedb1edba2 Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Tue, 13 Sep 2022 15:00:55 +0800 Subject: [PATCH 8/9] Fix Signed-off-by: Chong-Li --- dashboard/datacenter.py | 13 +++++++++---- dashboard/modules/node/node_head.py | 8 ++++---- src/ray/gcs/gcs_server/gcs_resource_manager.cc | 7 ++++--- src/ray/gcs/gcs_server/gcs_resource_manager.h | 8 ++++---- src/ray/protobuf/gcs_service.proto | 8 ++++---- src/ray/rpc/gcs_server/gcs_rpc_server.h | 8 ++++---- 6 files changed, 29 insertions(+), 23 deletions(-) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 259ef8b68b21..08559ad14ee7 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -54,9 +54,10 @@ class DataSource: # {node ip (str): error entries by pid # (dict from pid to list of latest err entries)} ip_and_pid_to_errors = Dict() - # The current stats (e.g., pending actor creation tasks) of gcs. + # The current scheduling stats (e.g., pending actor creation tasks) + # of gcs. # {task type(str): task list} - gcs_stats = Dict() + gcs_scheduling_stats = Dict() class DataOrganizer: @@ -287,7 +288,9 @@ async def get_actor_creation_tasks(cls): [], ) # Collect infeasible actor creation tasks in gcs. - infeasible_tasks.extend(list(DataSource.gcs_stats.get("infeasibleTasks", []))) + infeasible_tasks.extend( + list(DataSource.gcs_scheduling_stats.get("infeasibleTasks", [])) + ) new_infeasible_tasks = [] for task in infeasible_tasks: task = dict(task) @@ -304,7 +307,9 @@ async def get_actor_creation_tasks(cls): [], ) # Collect pending actor creation tasks in gcs. - resource_pending_tasks.extend(list(DataSource.gcs_stats.get("readyTasks", []))) + resource_pending_tasks.extend( + list(DataSource.gcs_scheduling_stats.get("readyTasks", [])) + ) new_resource_pending_tasks = [] for task in resource_pending_tasks: task = dict(task) diff --git a/dashboard/modules/node/node_head.py b/dashboard/modules/node/node_head.py index b7c46e607c0b..fcd7c7c19862 100644 --- a/dashboard/modules/node/node_head.py +++ b/dashboard/modules/node/node_head.py @@ -325,14 +325,14 @@ async def _update_node_stats(self): except Exception: logger.exception(f"Error updating node stats of {node_id}.") - # Update stats (e.g., pending actor creation tasks) of gcs. + # Update scheduling stats (e.g., pending actor creation tasks) of gcs. try: - reply = await self._gcs_node_resource_info_stub.GetGcsStats( - gcs_service_pb2.GetGcsStatsRequest(), + reply = await self._gcs_node_resource_info_stub.GetGcsSchedulingStats( + gcs_service_pb2.GetGcsSchedulingStatsRequest(), timeout=2, ) if reply.status.code == 0: - DataSource.gcs_stats = gcs_stats_to_dict(reply) + DataSource.gcs_scheduling_stats = gcs_stats_to_dict(reply) except Exception: logger.exception("Error updating gcs stats.") diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index a7c95b1d1521..1acbd6a1b5fd 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -217,9 +217,10 @@ void GcsResourceManager::HandleGetAllResourceUsage( ++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; } -void GcsResourceManager::HandleGetGcsStats(const rpc::GetGcsStatsRequest &request, - rpc::GetGcsStatsReply *reply, - rpc::SendReplyCallback send_reply_callback) { +void GcsResourceManager::HandleGetGcsSchedulingStats( + const rpc::GetGcsSchedulingStatsRequest &request, + rpc::GetGcsSchedulingStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) { if (cluster_task_manager_) { // Fill pending (actor creation) tasks of gcs when gcs actor scheduler is enabled. rpc::GetNodeStatsReply gcs_stats; diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index f51dfe0ae494..458ce80e3031 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -83,10 +83,10 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, rpc::GetAllResourceUsageReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle get gcs stats rpc request. - void HandleGetGcsStats(const rpc::GetGcsStatsRequest &request, - rpc::GetGcsStatsReply *reply, - rpc::SendReplyCallback send_reply_callback) override; + /// Handle get gcs scheduling stats rpc request. + void HandleGetGcsSchedulingStats(const rpc::GetGcsSchedulingStatsRequest &request, + rpc::GetGcsSchedulingStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; /// Handle a node registration. /// diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index cab2c476ad53..88cf23775577 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -230,9 +230,9 @@ message ReportResourceUsageReply { GcsStatus status = 1; } -message GetGcsStatsRequest {} +message GetGcsSchedulingStatsRequest {} -message GetGcsStatsReply { +message GetGcsSchedulingStatsReply { GcsStatus status = 1; repeated TaskSpec infeasible_tasks = 2; repeated TaskSpec ready_tasks = 3; @@ -608,8 +608,8 @@ service NodeResourceInfoGcsService { rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); // Get resource usage of all nodes from GCS Service. rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); - // Get current stats (e.g., pending tasks) of gcs server. - rpc GetGcsStats(GetGcsStatsRequest) returns (GetGcsStatsReply); + // Get current scheduling stats (e.g., pending tasks) of gcs server. + rpc GetGcsSchedulingStats(GetGcsSchedulingStatsRequest) returns (GetGcsSchedulingStatsReply); } message GcsStatus { diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 5fb2840c946a..381eed9cefc1 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -281,9 +281,9 @@ class NodeResourceInfoGcsServiceHandler { GetAllResourceUsageReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleGetGcsStats(const GetGcsStatsRequest &request, - GetGcsStatsReply *reply, - SendReplyCallback send_reply_callback) = 0; + virtual void HandleGetGcsSchedulingStats(const GetGcsSchedulingStatsRequest &request, + GetGcsSchedulingStatsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeResourceInfoGcsService`. @@ -306,7 +306,7 @@ class NodeResourceInfoGrpcService : public GrpcService { NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllAvailableResources); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage); - NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetGcsStats); + NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetGcsSchedulingStats); } private: From e62e88ce151e77bf5478df8ec76f11b5635bae4b Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Wed, 14 Sep 2022 11:23:42 +0800 Subject: [PATCH 9/9] Format Signed-off-by: Chong-Li --- src/ray/protobuf/gcs_service.proto | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 88cf23775577..1a11a51930f0 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -609,7 +609,8 @@ service NodeResourceInfoGcsService { // Get resource usage of all nodes from GCS Service. rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); // Get current scheduling stats (e.g., pending tasks) of gcs server. - rpc GetGcsSchedulingStats(GetGcsSchedulingStatsRequest) returns (GetGcsSchedulingStatsReply); + rpc GetGcsSchedulingStats(GetGcsSchedulingStatsRequest) + returns (GetGcsSchedulingStatsReply); } message GcsStatus {