-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle starting worker throttling inside worker pool #28551
Conversation
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I'm curious to see if scheduling throughput improves on test_many_tasks with this change.
May be too difficult to write one that isn't flaky, but you could consider adding a Python test too for checking that the resource availability accounting is correct while workers are starting. |
@@ -968,6 +970,10 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) { | |||
// TODO(SongGuyang): This worker will not be used forever. We should kill it. | |||
state.idle_dedicated_workers[task_id] = worker; | |||
} | |||
// We either have an idle worker or a slot to start a new worker. | |||
if (worker->GetWorkerType() == rpc::WorkerType::WORKER) { | |||
TryPendingPopWorkerRequests(worker->GetLanguage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to invoke this function in OnWorkerStarted ? I suggest this because the flag is_pending_registration
is set to false
in this function and it is related to the concurrent starting worker process count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't only retry pending pop worker requests when is_pending_registration
is set to false
but also when an idle worker is returned. PushWorker()
handles both case.
If we do it in OnWorkerStarted()
then we may start more workers than necessary. Imagining a pre-started worker is registered, OnWorkerStarted()
is called first, if we retry in this function, then we will start a new worker (since the new worker is not pushed yet) but if we retry in PushWorker
then we can simply use the pre-started worker without starting a new one.
} else if (status == PopWorkerStatus::TooManyStartingWorkerProcesses) { | ||
DeleteRuntimeEnvIfPossible(task_spec.SerializedRuntimeEnv()); | ||
state.pending_pop_worker_requests.emplace_back( | ||
PopWorkerRequest{task_spec, callback, allocated_instances_serialized_json}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to avoid the memory copy of task_spec? Maybe we can optimize it in future and add a TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to make a copy here. The caller pass this in as a reference, if we don't make a copy, the caller may destruct the object later on.
@@ -1177,13 +1186,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, | |||
} else { | |||
state.starting_workers_to_tasks[startup_token] = std::move(task_info); | |||
} | |||
} else if (status == PopWorkerStatus::TooManyStartingWorkerProcesses) { | |||
DeleteRuntimeEnvIfPossible(task_spec.SerializedRuntimeEnv()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the ideal way is that we don't DeleteRuntimeEnvIfPossible
here and reuse it when pop worker next time, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I agree. I'll add a TODO here and address it in the follow-up PR since it's not a regression. Does that sound good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. We can merge it first after you add the TODO comments.
Signed-off-by: Jiajun Yao <[email protected]>
Release tests look good: https://buildkite.com/ray-project/release-tests-pr/builds/16008#_. Didn't see improvement or regression. |
Signed-off-by: Jiajun Yao [email protected]
Why are these changes needed?
Currently, worker pool has throttling of how many workers can be started simultaneously (i.e.
maximum_startup_concurrency_
). Right now if a PopWorker call cannot be fulfilled due to throttling, it will fail and the caller (i.e. local task manger) will handle the retry. The issue is that when PopWorker fails, local task manager will release the resources claimed by the task. As a result, even though the node already has enough tasks to use up all the resources, it will still show available resources and attract more tasks than it can handle. Instead of letting local task manager handles the throttling, it should be handled internally in worker pool since throttling is a transient thing and is not a real error. It's effectively the same as longer worker startup time.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.