Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Enable gcs scheduler 5/n] Adapt gcs scheduler with external modules #28162

Merged
merged 13 commits into from
Sep 16, 2022
11 changes: 11 additions & 0 deletions dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class DataSource:
# {node ip (str): error entries by pid
# (dict from pid to list of latest err entries)}
ip_and_pid_to_errors = Dict()
# The (infeasible and pending actor creation tasks) info of gcs.
# {task type(str): task list}
# Note, this field is only updated with gcs actor scheduler
# is enabled.
gcs_info = Dict()


class DataOrganizer:
Expand Down Expand Up @@ -275,27 +280,33 @@ async def _get_actor(actor):

@classmethod
async def get_actor_creation_tasks(cls):
# Collect infeasible tasks in worker nodes.
infeasible_tasks = sum(
(
list(node_stats.get("infeasibleTasks", []))
for node_stats in DataSource.node_stats.values()
),
[],
)
# Collect infeasible actor creation tasks in gcs.
infeasible_tasks.extend(list(DataSource.gcs_info.get("infeasibleTasks", [])))
new_infeasible_tasks = []
for task in infeasible_tasks:
task = dict(task)
task["actorClass"] = actor_classname_from_task_spec(task)
task["state"] = "INFEASIBLE"
new_infeasible_tasks.append(task)

# Collect pending tasks in worker nodes.
resource_pending_tasks = sum(
(
list(data.get("readyTasks", []))
for data in DataSource.node_stats.values()
),
[],
)
# Collect pending actor creation tasks in gcs.
resource_pending_tasks.extend(list(DataSource.gcs_info.get("readyTasks", [])))
new_resource_pending_tasks = []
for task in resource_pending_tasks:
task = dict(task)
Expand Down
19 changes: 19 additions & 0 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ def gcs_node_info_to_dict(message):
)


def gcs_info_to_dict(message):
decode_keys = {
"actorId",
"jobId",
"taskId",
"parentTaskId",
"sourceActorId",
"callerId",
"rayletId",
"workerId",
"placementGroupId",
}
return dashboard_utils.message_to_dict(message, decode_keys)


def node_stats_to_dict(message):
decode_keys = {
"actorId",
Expand Down Expand Up @@ -117,6 +132,10 @@ async def _get_nodes(self):
request = gcs_service_pb2.GetAllNodeInfoRequest()
reply = await self._gcs_node_info_stub.GetAllNodeInfo(request, timeout=2)
if reply.status.code == 0:
# Update (infeasible and pending actor creation tasks) info of gcs.
DataSource.gcs_info = gcs_info_to_dict(reply.gcs_info)

# Prepare info of worker nodes.
result = {}
for node_info in reply.node_info_list:
node_info_dict = gcs_node_info_to_dict(node_info)
Expand Down
6 changes: 6 additions & 0 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ def update_load_metrics(self):

mirror_node_types = {}
cluster_full = False
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go away if you rebase right?

Copy link
Contributor Author

@Chong-Li Chong-Li Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there might be pending actors in gcs server (if enabling gcs actor scheduler), we need to not only check if any worker node has detected cluster full (see line 276-281), but also check gcs server's report (this part).

hasattr(response, "cluster_full_of_actors_detected_by_gcs")
and response.cluster_full_of_actors_detected_by_gcs
):
# GCS has detected the cluster full of actors.
cluster_full = True
for resource_message in resources_batch_data.batch:
node_id = resource_message.node_id
# Generate node type config based on GCS reported node list.
Expand Down
14 changes: 12 additions & 2 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ namespace gcs {
GcsNodeManager::GcsNodeManager(
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::shared_ptr<ClusterTaskManager> cluster_task_manager)
: gcs_publisher_(std::move(gcs_publisher)),
gcs_table_storage_(std::move(gcs_table_storage)),
raylet_client_pool_(std::move(raylet_client_pool)) {}
raylet_client_pool_(std::move(raylet_client_pool)),
cluster_task_manager_(std::move(cluster_task_manager)) {}

void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
rpc::RegisterNodeReply *reply,
Expand Down Expand Up @@ -137,6 +139,14 @@ void GcsNodeManager::DrainNode(const NodeID &node_id) {
void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &request,
rpc::GetAllNodeInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (cluster_task_manager_) {
// Fill pending queue info of gcs when gcs actor scheduler is enabled.
rpc::GetNodeStatsReply node_stats;
cluster_task_manager_->FillPendingActorInfo(&node_stats);
reply->mutable_gcs_info()->mutable_infeasible_tasks()->CopyFrom(
node_stats.infeasible_tasks());
reply->mutable_gcs_info()->mutable_ready_tasks()->CopyFrom(node_stats.ready_tasks());
}
// Here the unsafe allocate is safe here, because entry.second's life cycle is longer
// then reply.
// The request will be sent when call send_reply_callback and after that, reply will
Expand Down
15 changes: 12 additions & 3 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
using raylet::ClusterTaskManager;
namespace gcs {

/// GcsNodeManager is responsible for managing and monitoring nodes as well as handing
Expand All @@ -39,9 +41,14 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
///
/// \param gcs_publisher GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);
/// \param raylet_client_pool The pool of raylet clients for RPC communication.
/// \param cluster_task_manager The gcs server's `ClusterTaskManager`. Note, this
/// parameter is only configured when gcs actor scheduler is enabled.
explicit GcsNodeManager(
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::shared_ptr<ClusterTaskManager> cluster_task_manager = nullptr);
Chong-Li marked this conversation as resolved.
Show resolved Hide resolved

/// Handle register rpc request come from raylet.
void HandleRegisterNode(const rpc::RegisterNodeRequest &request,
Expand Down Expand Up @@ -159,6 +166,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
CountType_MAX = 4,
};
uint64_t counts_[CountType::CountType_MAX] = {0};

std::shared_ptr<ClusterTaskManager> cluster_task_manager_;
};

} // namespace gcs
Expand Down
79 changes: 51 additions & 28 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,16 @@ void GcsResourceManager::HandleGetAllAvailableResources(
if (using_resource_reports) {
auto resource_iter =
node_resource_usages_[node_id].resources_available().find(resource_name);
if (resource_iter != node_resource_usages_[node_id].resources_available().end()) {
if (resource_iter != node_resource_usages_[node_id].resources_available().end() &&
resource_iter->second > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I look at the else statement, it seems like we update resources when the value > 0. Is it expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every resource in node_resources.available has value greater than 0, because ResourceRequest automatically erases any resource with zero value. So while iterating over node_resources.available, if using_resource_reports == true, we have to make sure no resource with zero value inserted. I believe there is a test requiring this behavior, but can not remember which one for now.

Copy link
Contributor

@wuisawesome wuisawesome Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this behavior is important and we don't expect to report any 0 values, should we assert this instead?

This resource_iter->second > 0 seems not needed. I'll just revert this. @wuisawesome

resource.mutable_resources_available()->insert(
{resource_name, resource_iter->second});
continue;
}
} else {
const auto &resource_value = node_resources.available.Get(resource_id);
resource.mutable_resources_available()->insert(
{resource_name, resource_value.Double()});
}
const auto &resource_value = node_resources.available.Get(resource_id);
resource.mutable_resources_available()->insert(
{resource_name, resource_value.Double()});
}
reply->add_resources_list()->CopyFrom(resource);
}
Expand All @@ -111,6 +112,12 @@ void GcsResourceManager::HandleGetAllAvailableResources(

void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
// We only need to update worker nodes' resource usage. Gcs node ifself does not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to assert this right? GCS shouldn't ever send out a resource report right?

Copy link
Contributor Author

@Chong-Li Chong-Li Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is actually from another feature, which I'll do in the next split PR. So I'll just revert this here.

// execute any tasks so its report can be ignored.
if (node_id == local_node_id_) {
return;
}

if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
UpdateNodeNormalTaskResources(node_id, data);
} else {
Expand Down Expand Up @@ -148,53 +155,69 @@ void GcsResourceManager::HandleReportResourceUsage(
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::FillAggregateLoad(
const rpc::ResourcesData &resources_data,
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
*aggregate_load) {
auto load = resources_data.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto &aggregate_demand = (*aggregate_load)[demand.shape()];
aggregate_demand.set_num_ready_requests_queued(
aggregate_demand.num_ready_requests_queued() +
demand.num_ready_requests_queued());
aggregate_demand.set_num_infeasible_requests_queued(
aggregate_demand.num_infeasible_requests_queued() +
demand.num_infeasible_requests_queued());
aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() +
demand.backlog_size());
}
}

void GcsResourceManager::HandleGetAllResourceUsage(
const rpc::GetAllResourceUsageRequest &request,
rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (cluster_task_manager_ && RayConfig::instance().gcs_actor_scheduling_enabled()) {
rpc::ResourcesData resources_data;
cluster_task_manager_->FillPendingActorInfo(resources_data);
node_resource_usages_[local_node_id_].CopyFrom(resources_data);
}
if (!node_resource_usages_.empty()) {
auto batch = std::make_shared<rpc::ResourceUsageBatchData>();
rpc::ResourceUsageBatchData batch;
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
aggregate_load;

for (const auto &usage : node_resource_usages_) {
// Aggregate the load reported by each raylet.
auto load = usage.second.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto &aggregate_demand = aggregate_load[demand.shape()];
aggregate_demand.set_num_ready_requests_queued(
aggregate_demand.num_ready_requests_queued() +
demand.num_ready_requests_queued());
aggregate_demand.set_num_infeasible_requests_queued(
aggregate_demand.num_infeasible_requests_queued() +
demand.num_infeasible_requests_queued());
aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() +
demand.backlog_size());
}
FillAggregateLoad(usage.second, &aggregate_load);
batch.add_batch()->CopyFrom(usage.second);
}

batch->add_batch()->CopyFrom(usage.second);
if (cluster_task_manager_) {
Chong-Li marked this conversation as resolved.
Show resolved Hide resolved
// Fill the gcs info when gcs actor scheduler is enabled.
rpc::ResourcesData gcs_resources_data;
cluster_task_manager_->FillPendingActorInfo(gcs_resources_data);
// Aggregate the load (pending actor info) of gcs.
FillAggregateLoad(gcs_resources_data, &aggregate_load);
// We only export gcs's pending info without adding the corresponding
// `ResourcesData` to the `batch` list. So if gcs has detected cluster full of
// actors, set the dedicated field in reply.
if (gcs_resources_data.cluster_full_of_actors_detected()) {
reply->set_cluster_full_of_actors_detected_by_gcs(true);
}
}

for (const auto &demand : aggregate_load) {
auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands();
auto demand_proto = batch.mutable_resource_load_by_shape()->add_resource_demands();
demand_proto->CopyFrom(demand.second);
for (const auto &resource_pair : demand.first) {
(*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second;
}
}

// Update placement group load to heartbeat batch.
// This is updated only one per second.
if (placement_group_load_.has_value()) {
auto placement_group_load = placement_group_load_.value();
auto placement_group_load_proto = batch->mutable_placement_group_load();
auto placement_group_load_proto = batch.mutable_placement_group_load();
placement_group_load_proto->CopyFrom(*placement_group_load.get());
}
reply->mutable_resource_usage_data()->CopyFrom(*batch);

reply->mutable_resource_usage_data()->CopyFrom(batch);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
void UpdateResourceLoads(const rpc::ResourcesData &data);

private:
/// Aggregate nodes' pending task info.
///
/// \param resources_data A node's pending task info (by shape).
/// \param aggregate_load[out] The aggregate pending task info (across the cluster).
void FillAggregateLoad(const rpc::ResourcesData &resources_data,
std::unordered_map<google::protobuf::Map<std::string, double>,
rpc::ResourceDemand> *aggregate_load);

/// io context. This is to ensure thread safety. Ideally, all public
/// funciton needs to post job to this io_context.
instrumented_io_context &io_context_;
Expand Down
9 changes: 7 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,13 @@ void GcsServer::Stop() {

void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
gcs_publisher_, gcs_table_storage_, raylet_client_pool_);
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
gcs_publisher_, gcs_table_storage_, raylet_client_pool_, cluster_task_manager_);
} else {
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
gcs_publisher_, gcs_table_storage_, raylet_client_pool_);
}
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
// Register service.
Expand Down
10 changes: 10 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ message GetAllNodeInfoRequest {}
message GetAllNodeInfoReply {
GcsStatus status = 1;
repeated GcsNodeInfo node_info_list = 2;
GcsInfo gcs_info = 3;
Chong-Li marked this conversation as resolved.
Show resolved Hide resolved
}

message GcsInfo {
repeated TaskSpec infeasible_tasks = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you only report necessary fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take a look at test_actor_groups in test_actor.py, these fields are necessary when gcs scheduling is enabled.

repeated TaskSpec ready_tasks = 2;
}

message ReportHeartbeatRequest {
Expand Down Expand Up @@ -583,6 +589,10 @@ message GetAllResourceUsageRequest {}
message GetAllResourceUsageReply {
GcsStatus status = 1;
ResourceUsageBatchData resource_usage_data = 2;
/// True if gcs finds infeasible or pending actor creation tasks
/// locally (when gcs actor scheduler is enabled). This field is
/// expected to help triggering auto-scaling.
bool cluster_full_of_actors_detected_by_gcs = 3;
Chong-Li marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is there any reason we can't reuse the ResourceUsageBatchData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResourceUsageBatchData is a gcs proto, which is also used for table storage and pubsub (although the pub-sub one might be removed later.) So adding a cluster_full_of_actors_detected_by_gcs in ResourceUsageBatchData might increase unnecessary overhead in most cases.

}

// Service for node resource info access.
Expand Down