Skip to content

Commit

Permalink
[Core][Enable gcs scheduler 5/n] Adapt gcs scheduler with external mo…
Browse files Browse the repository at this point in the history
…dules (ray-project#28162)

This the second split PR of ray-project#25075, which tried to enable gcs scheduler by default.

This split PR mainly includes:

In GcsResourceManager::HandleGetAllResourceUsage(), we export gcs' pending task info without adding an extra entry to the batch list. So as usual, the batch list still only contains the worker nodes (some tests depend on this).

To pass tests like test_actor_groups, rpc GetAllNodeInfo has to return gcs' pending task info additionally (do we need a dedicated rpc for that?).

Signed-off-by: PaulFenton <[email protected]>
  • Loading branch information
Chong-Li authored and PaulFenton committed Sep 19, 2022
1 parent 1e2ccdd commit 8dea87e
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 28 deletions.
14 changes: 14 additions & 0 deletions dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class DataSource:
# {node ip (str): error entries by pid
# (dict from pid to list of latest err entries)}
ip_and_pid_to_errors = Dict()
# The current scheduling stats (e.g., pending actor creation tasks)
# of gcs.
# {task type(str): task list}
gcs_scheduling_stats = Dict()


class DataOrganizer:
Expand Down Expand Up @@ -275,27 +279,37 @@ async def _get_actor(actor):

@classmethod
async def get_actor_creation_tasks(cls):
# Collect infeasible tasks in worker nodes.
infeasible_tasks = sum(
(
list(node_stats.get("infeasibleTasks", []))
for node_stats in DataSource.node_stats.values()
),
[],
)
# Collect infeasible actor creation tasks in gcs.
infeasible_tasks.extend(
list(DataSource.gcs_scheduling_stats.get("infeasibleTasks", []))
)
new_infeasible_tasks = []
for task in infeasible_tasks:
task = dict(task)
task["actorClass"] = actor_classname_from_task_spec(task)
task["state"] = "INFEASIBLE"
new_infeasible_tasks.append(task)

# Collect pending tasks in worker nodes.
resource_pending_tasks = sum(
(
list(data.get("readyTasks", []))
for data in DataSource.node_stats.values()
),
[],
)
# Collect pending actor creation tasks in gcs.
resource_pending_tasks.extend(
list(DataSource.gcs_scheduling_stats.get("readyTasks", []))
)
new_resource_pending_tasks = []
for task in resource_pending_tasks:
task = dict(task)
Expand Down
31 changes: 31 additions & 0 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ def gcs_node_info_to_dict(message):
)


def gcs_stats_to_dict(message):
decode_keys = {
"actorId",
"jobId",
"taskId",
"parentTaskId",
"sourceActorId",
"callerId",
"rayletId",
"workerId",
"placementGroupId",
}
return dashboard_utils.message_to_dict(message, decode_keys)


def node_stats_to_dict(message):
decode_keys = {
"actorId",
Expand Down Expand Up @@ -71,6 +86,8 @@ def __init__(self, dashboard_head):
self._stubs = {}
# NodeInfoGcsService
self._gcs_node_info_stub = None
# NodeResourceInfoGcsService
self._gcs_node_resource_info_sub = None
self._collect_memory_info = False
DataSource.nodes.signal.append(self._update_stubs)
# Total number of node updates happened.
Expand Down Expand Up @@ -308,6 +325,17 @@ async def _update_node_stats(self):
except Exception:
logger.exception(f"Error updating node stats of {node_id}.")

# Update scheduling stats (e.g., pending actor creation tasks) of gcs.
try:
reply = await self._gcs_node_resource_info_stub.GetGcsSchedulingStats(
gcs_service_pb2.GetGcsSchedulingStatsRequest(),
timeout=2,
)
if reply.status.code == 0:
DataSource.gcs_scheduling_stats = gcs_stats_to_dict(reply)
except Exception:
logger.exception("Error updating gcs stats.")

async def _update_log_info(self):
if ray_constants.DISABLE_DASHBOARD_LOG_INFO:
return
Expand Down Expand Up @@ -379,6 +407,9 @@ async def run(self, server):
self._gcs_node_info_stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub(
gcs_channel
)
self._gcs_node_resource_info_stub = (
gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel)
)

await asyncio.gather(
self._update_nodes(),
Expand Down
8 changes: 7 additions & 1 deletion python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ def update_load_metrics(self):

mirror_node_types = {}
cluster_full = False
if (
hasattr(response, "cluster_full_of_actors_detected_by_gcs")
and response.cluster_full_of_actors_detected_by_gcs
):
# GCS has detected the cluster full of actors.
cluster_full = True
for resource_message in resources_batch_data.batch:
node_id = resource_message.node_id
# Generate node type config based on GCS reported node list.
Expand All @@ -271,7 +277,7 @@ def update_load_metrics(self):
hasattr(resource_message, "cluster_full_of_actors_detected")
and resource_message.cluster_full_of_actors_detected
):
# Aggregate this flag across all batches.
# A worker node has detected the cluster full of actors.
cluster_full = True
resource_load = dict(resource_message.resource_load)
total_resources = dict(resource_message.resources_total)
Expand Down
84 changes: 57 additions & 27 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ void GcsResourceManager::HandleGetAllAvailableResources(
if (resource_iter != node_resource_usages_[node_id].resources_available().end()) {
resource.mutable_resources_available()->insert(
{resource_name, resource_iter->second});
continue;
}
} else {
const auto &resource_value = node_resources.available.Get(resource_id);
resource.mutable_resources_available()->insert(
{resource_name, resource_value.Double()});
}
const auto &resource_value = node_resources.available.Get(resource_id);
resource.mutable_resources_available()->insert(
{resource_name, resource_value.Double()});
}
reply->add_resources_list()->CopyFrom(resource);
}
Expand Down Expand Up @@ -148,59 +148,89 @@ void GcsResourceManager::HandleReportResourceUsage(
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::FillAggregateLoad(
const rpc::ResourcesData &resources_data,
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
*aggregate_load) {
auto load = resources_data.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto &aggregate_demand = (*aggregate_load)[demand.shape()];
aggregate_demand.set_num_ready_requests_queued(
aggregate_demand.num_ready_requests_queued() +
demand.num_ready_requests_queued());
aggregate_demand.set_num_infeasible_requests_queued(
aggregate_demand.num_infeasible_requests_queued() +
demand.num_infeasible_requests_queued());
aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() +
demand.backlog_size());
}
}

void GcsResourceManager::HandleGetAllResourceUsage(
const rpc::GetAllResourceUsageRequest &request,
rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (cluster_task_manager_ && RayConfig::instance().gcs_actor_scheduling_enabled()) {
rpc::ResourcesData resources_data;
cluster_task_manager_->FillPendingActorInfo(resources_data);
node_resource_usages_[local_node_id_].CopyFrom(resources_data);
}
if (!node_resource_usages_.empty()) {
auto batch = std::make_shared<rpc::ResourceUsageBatchData>();
rpc::ResourceUsageBatchData batch;
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
aggregate_load;

for (const auto &usage : node_resource_usages_) {
// Aggregate the load reported by each raylet.
auto load = usage.second.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto &aggregate_demand = aggregate_load[demand.shape()];
aggregate_demand.set_num_ready_requests_queued(
aggregate_demand.num_ready_requests_queued() +
demand.num_ready_requests_queued());
aggregate_demand.set_num_infeasible_requests_queued(
aggregate_demand.num_infeasible_requests_queued() +
demand.num_infeasible_requests_queued());
aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() +
demand.backlog_size());
}
FillAggregateLoad(usage.second, &aggregate_load);
batch.add_batch()->CopyFrom(usage.second);
}

batch->add_batch()->CopyFrom(usage.second);
if (cluster_task_manager_) {
// Fill the gcs info when gcs actor scheduler is enabled.
rpc::ResourcesData gcs_resources_data;
cluster_task_manager_->FillPendingActorInfo(gcs_resources_data);
// Aggregate the load (pending actor info) of gcs.
FillAggregateLoad(gcs_resources_data, &aggregate_load);
// We only export gcs's pending info without adding the corresponding
// `ResourcesData` to the `batch` list. So if gcs has detected cluster full of
// actors, set the dedicated field in reply.
if (gcs_resources_data.cluster_full_of_actors_detected()) {
reply->set_cluster_full_of_actors_detected_by_gcs(true);
}
}

for (const auto &demand : aggregate_load) {
auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands();
auto demand_proto = batch.mutable_resource_load_by_shape()->add_resource_demands();
demand_proto->CopyFrom(demand.second);
for (const auto &resource_pair : demand.first) {
(*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second;
}
}

// Update placement group load to heartbeat batch.
// This is updated only one per second.
if (placement_group_load_.has_value()) {
auto placement_group_load = placement_group_load_.value();
auto placement_group_load_proto = batch->mutable_placement_group_load();
auto placement_group_load_proto = batch.mutable_placement_group_load();
placement_group_load_proto->CopyFrom(*placement_group_load.get());
}
reply->mutable_resource_usage_data()->CopyFrom(*batch);

reply->mutable_resource_usage_data()->CopyFrom(batch);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::HandleGetGcsSchedulingStats(
const rpc::GetGcsSchedulingStatsRequest &request,
rpc::GetGcsSchedulingStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (cluster_task_manager_) {
// Fill pending (actor creation) tasks of gcs when gcs actor scheduler is enabled.
rpc::GetNodeStatsReply gcs_stats;
cluster_task_manager_->FillPendingActorInfo(&gcs_stats);
reply->mutable_infeasible_tasks()->CopyFrom(gcs_stats.infeasible_tasks());
reply->mutable_ready_tasks()->CopyFrom(gcs_stats.ready_tasks());
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources) {
auto iter = node_resource_usages_.find(node_id);
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle get gcs scheduling stats rpc request.
void HandleGetGcsSchedulingStats(const rpc::GetGcsSchedulingStatsRequest &request,
rpc::GetGcsSchedulingStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a node registration.
///
/// \param node The specified node to add.
Expand Down Expand Up @@ -134,6 +139,14 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
void UpdateResourceLoads(const rpc::ResourcesData &data);

private:
/// Aggregate nodes' pending task info.
///
/// \param resources_data A node's pending task info (by shape).
/// \param aggregate_load[out] The aggregate pending task info (across the cluster).
void FillAggregateLoad(const rpc::ResourcesData &resources_data,
std::unordered_map<google::protobuf::Map<std::string, double>,
rpc::ResourceDemand> *aggregate_load);

/// io context. This is to ensure thread safety. Ideally, all public
/// funciton needs to post job to this io_context.
instrumented_io_context &io_context_;
Expand Down
15 changes: 15 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ message ReportResourceUsageReply {
GcsStatus status = 1;
}

message GetGcsSchedulingStatsRequest {}

message GetGcsSchedulingStatsReply {
GcsStatus status = 1;
repeated TaskSpec infeasible_tasks = 2;
repeated TaskSpec ready_tasks = 3;
}

// Service for heartbeat info access.
service HeartbeatInfoGcsService {
// Report heartbeat of a node to GCS Service.
Expand Down Expand Up @@ -583,6 +591,10 @@ message GetAllResourceUsageRequest {}
message GetAllResourceUsageReply {
GcsStatus status = 1;
ResourceUsageBatchData resource_usage_data = 2;
/// True if gcs finds infeasible or pending actor creation tasks
/// locally (when gcs actor scheduler is enabled). This field is
/// expected to help triggering auto-scaling.
bool cluster_full_of_actors_detected_by_gcs = 3;
}

// Service for node resource info access.
Expand All @@ -596,6 +608,9 @@ service NodeResourceInfoGcsService {
rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply);
// Get resource usage of all nodes from GCS Service.
rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply);
// Get current scheduling stats (e.g., pending tasks) of gcs server.
rpc GetGcsSchedulingStats(GetGcsSchedulingStatsRequest)
returns (GetGcsSchedulingStatsReply);
}

message GcsStatus {
Expand Down
5 changes: 5 additions & 0 deletions src/ray/rpc/gcs_server/gcs_rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ class NodeResourceInfoGcsServiceHandler {
virtual void HandleGetAllResourceUsage(const GetAllResourceUsageRequest &request,
GetAllResourceUsageReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleGetGcsSchedulingStats(const GetGcsSchedulingStatsRequest &request,
GetGcsSchedulingStatsReply *reply,
SendReplyCallback send_reply_callback) = 0;
};

/// The `GrpcService` for `NodeResourceInfoGcsService`.
Expand All @@ -302,6 +306,7 @@ class NodeResourceInfoGrpcService : public GrpcService {
NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllAvailableResources);
NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage);
NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage);
NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetGcsSchedulingStats);
}

private:
Expand Down

0 comments on commit 8dea87e

Please sign in to comment.