-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][Enable gcs scheduler 5/n] Adapt gcs scheduler with external modules #28162
Conversation
Signed-off-by: Chong-Li <[email protected]>
b0af7be
to
707bd61
Compare
cc @wuisawesome for the autoscaler changes |
dashboard/datacenter.py
Outdated
infeasible_tasks = sum( | ||
( | ||
list(node_stats.get("infeasibleTasks", [])) | ||
for node_stats in DataSource.node_stats.values() | ||
), | ||
[], | ||
) | ||
# Collect infeasible tasks in gcs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only for actors right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I'll emphasize this in that in-line comment.
src/ray/protobuf/gcs_service.proto
Outdated
} | ||
|
||
message GcsStats { | ||
repeated TaskSpec infeasible_tasks = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you only report necessary fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take a look at test_actor_groups
in test_actor.py
, these fields are necessary when gcs scheduling is enabled.
@@ -93,15 +93,16 @@ void GcsResourceManager::HandleGetAllAvailableResources( | |||
if (using_resource_reports) { | |||
auto resource_iter = | |||
node_resource_usages_[node_id].resources_available().find(resource_name); | |||
if (resource_iter != node_resource_usages_[node_id].resources_available().end()) { | |||
if (resource_iter != node_resource_usages_[node_id].resources_available().end() && | |||
resource_iter->second > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I look at the else statement, it seems like we update resources when the value > 0. Is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every resource in node_resources.available
has value greater than 0, because ResourceRequest
automatically erases any resource with zero value. So while iterating over node_resources.available
, if using_resource_reports
== true
, we have to make sure no resource with zero value inserted. I believe there is a test requiring this behavior, but can not remember which one for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this behavior is important and we don't expect to report any 0 values, should we assert this instead?
This resource_iter->second > 0
seems not needed. I'll just revert this. @wuisawesome
<< "[UpdateFromResourceReport]: received resource usage from unknown node id " | ||
<< node_id; | ||
// Only need to update worker nodes' resource usage. | ||
if (node_id != local_node_id_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (node_id == local_node_id_) {
return;
}
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
UpdateNodeNormalTaskResources(node_id, data);
} else {
if (!cluster_resource_manager_.UpdateNodeAvailableResourcesIfExist(
scheduling::NodeID(node_id.Binary()), data)) {
RAY_LOG(INFO)
<< "[UpdateFromResourceReport]: received resource usage from unknown node id "
<< node_id;
}
}
UpdateNodeResourceUsage(node_id, data);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Only need to update worker nodes' resource usage.
if (node_id != local_node_id_) {
Hmm I don't understand this part. Why do we only need to update worker nodes' resource usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't run any normal tasks or actors in gcs node (server). Even with gcs scheduler enabled, gcs server only queues and schedules tasks, without allocating any local resources to them. In terms of scheduling, gcs node actually has zero total resources, so we don't need to update it here.
Signed-off-by: Chong-Li <[email protected]>
Signed-off-by: Chong-Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Chong-Li Thanks for pushing the last mile effort.
I'm not sure, but I think our high-level direction is to make sure scheduler are decoupled from Raylet and GCS, which means, we really shouldn't have things treated specialized. There should be no GCS scheduler, and only things like scheduler. I feel you are not doing things in this direction and things are treated differently for different component. If it's hard, we should justify that and maybe we can do that later.
@iycheng I think the primary goal is making gcs scheduler pass all tests without breaking any existing user behavior. In terms of this PR, we're trying to make external modules and public APIs work the same way no matter which scheduler is enabled. After these functionality requirements are achieved, we could try to reduce the difference between gcs and raylet schedulers (using the lowest amount of gcs scheduling feature flag) in a more organized way (I'll definitely do that). Of course, I totally agree that we should not add up too much scheduler difference throughout this plan. So about the implementation details of this PR, maybe we should not put many |
Signed-off-by: Chong-Li <[email protected]>
Signed-off-by: Chong-Li <[email protected]>
@Chong-Li it makes sense. I'm Ok with make it work first! It's a really useful features and can make the actor scheduling faster for some cases and we should just open it by default. |
yeah let's make it work first and focus on unification! It'd be great if @wuisawesome can review this PR... let me ping him again. |
@@ -93,15 +93,16 @@ void GcsResourceManager::HandleGetAllAvailableResources( | |||
if (using_resource_reports) { | |||
auto resource_iter = | |||
node_resource_usages_[node_id].resources_available().find(resource_name); | |||
if (resource_iter != node_resource_usages_[node_id].resources_available().end()) { | |||
if (resource_iter != node_resource_usages_[node_id].resources_available().end() && | |||
resource_iter->second > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this behavior is important and we don't expect to report any 0 values, should we assert this instead?
This resource_iter->second > 0
seems not needed. I'll just revert this. @wuisawesome
@@ -253,6 +253,12 @@ def update_load_metrics(self): | |||
|
|||
mirror_node_types = {} | |||
cluster_full = False | |||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go away if you rebase right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there might be pending actors in gcs server (if enabling gcs actor scheduler), we need to not only check if any worker node has detected cluster full
(see line 276-281), but also check gcs server's report (this part).
@@ -111,6 +112,12 @@ void GcsResourceManager::HandleGetAllAvailableResources( | |||
|
|||
void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) { | |||
NodeID node_id = NodeID::FromBinary(data.node_id()); | |||
// We only need to update worker nodes' resource usage. Gcs node ifself does not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to assert this right? GCS shouldn't ever send out a resource report right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is actually from another feature, which I'll do in the next split PR. So I'll just revert this here.
Signed-off-by: Chong-Li <[email protected]>
Signed-off-by: Chong-Li <[email protected]>
/// True if gcs finds infeasible or pending actor creation tasks | ||
/// locally (when gcs actor scheduler is enabled). This field is | ||
/// expected to help triggering auto-scaling. | ||
bool cluster_full_of_actors_detected_by_gcs = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, is there any reason we can't reuse the ResourceUsageBatchData?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResourceUsageBatchData
is a gcs proto, which is also used for table storage and pubsub (although the pub-sub one might be removed later.) So adding a cluster_full_of_actors_detected_by_gcs
in ResourceUsageBatchData
might increase unnecessary overhead in most cases.
Signed-off-by: Chong-Li <[email protected]>
Signed-off-by: Chong-Li <[email protected]>
Thanks for the contribution! |
…dules (ray-project#28162) This the second split PR of ray-project#25075, which tried to enable gcs scheduler by default. This split PR mainly includes: In GcsResourceManager::HandleGetAllResourceUsage(), we export gcs' pending task info without adding an extra entry to the batch list. So as usual, the batch list still only contains the worker nodes (some tests depend on this). To pass tests like test_actor_groups, rpc GetAllNodeInfo has to return gcs' pending task info additionally (do we need a dedicated rpc for that?). Signed-off-by: PaulFenton <[email protected]>
Why are these changes needed?
This the second split PR of #25075, which tried to enable gcs scheduler by default.
This split PR mainly includes:
In
GcsResourceManager::HandleGetAllResourceUsage()
, we export gcs' pending task info without adding an extra entry to the batch list. So as usual, the batch list still only contains the worker nodes (some tests depend on this).To pass tests like
test_actor_groups
, rpcGetAllNodeInfo
has to return gcs' pending task info additionally (do we need a dedicated rpc for that?).Related issue number
#25075, #27084
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.