-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Fix worker process leaks after job finishes #44214
Conversation
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
// If the worker is idle, we exit. | ||
if (will_exit) { | ||
if (force_exit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: when force_exit
is set, we still goes through the graceful exit code path which means if there is pending tasks or owning objects, the core worker will drain and exit which is not what we want when the job finishes.
bool own_objects = reference_counter_->OwnObjects(); | ||
int64_t pins_in_flight = local_raylet_client_->GetPinsInFlight(); | ||
const bool own_objects = reference_counter_->OwnObjects(); | ||
const size_t num_pending_tasks = task_manager_->NumPendingTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: we didn't consider pending tasks when we decide whether the core worker is idle or not however we consider it during drain and exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we write a python test for this?
if (worker && finished_jobs_.contains(task_spec.JobId()) && | ||
task_spec.AncestorDetachedActorId().IsNil()) { | ||
RAY_CHECK(status == PopWorkerStatus::OK); | ||
callback(nullptr, PopWorkerStatus::JobFinished, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: when a job finishes, raylet will kill leased workers (one time) and idle workers (periodic). However there are workers don't belong to either of these two states: workers inside PopWorkerCallbackInternal that's sitting inside the event loop waiting to be added to the leased worker later. This fix makes sure this worker will go back to idle and will be killed by the periodic idle termination.
@@ -597,6 +597,25 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_ | |||
void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) { | |||
RAY_LOG(DEBUG) << "HandleJobFinished " << job_id; | |||
RAY_CHECK(job_data.is_dead()); | |||
for (const auto &pair : leased_workers_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: we only tried to kill worker processes when its parent dies (this works in theory if you consider the task graph as a tree where the root is the driver so when root dies, eventually the entire task graph should die but in practice due to worker reuse, we will have cycles so driver death won't transitively kill all workers ) and this PR fixes it by killing workers when job finishes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test?
@@ -338,20 +347,17 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, | |||
worker_command_args.push_back("--worker-launch-time-ms=" + | |||
std::to_string(current_sys_time_ms())); | |||
worker_command_args.push_back("--node-id=" + node_id_.Hex()); | |||
worker_command_args.push_back("--runtime-env-hash=" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: runtime-env-hash is actually worker cache key and includes more than just runtime env so we should always set it when starting a worker process not just when runtime env is not empty.
Signed-off-by: Jiajun Yao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm the number of bugs here is a bit scary :)
Can we add unit tests for these?
bool own_objects = reference_counter_->OwnObjects(); | ||
int64_t pins_in_flight = local_raylet_client_->GetPinsInFlight(); | ||
const bool own_objects = reference_counter_->OwnObjects(); | ||
const size_t num_pending_tasks = task_manager_->NumPendingTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we write a python test for this?
@@ -597,6 +597,25 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_ | |||
void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) { | |||
RAY_LOG(DEBUG) << "HandleJobFinished " << job_id; | |||
RAY_CHECK(job_data.is_dead()); | |||
for (const auto &pair : leased_workers_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test?
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fix makes sense. Need tests/comments to explain some code which we discussed offline
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
@@ -205,6 +208,8 @@ class Worker : public WorkerInterface { | |||
lifetime_allocated_instances_ = allocated_instances; | |||
}; | |||
|
|||
const ActorID &GetRootDetachedActorId() const { return root_detached_actor_id_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does root detached actor mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a task or actor is created by a detached actor (directly or transitively) then its root is the detached actor otherwise it's the driver.
// The task job finished. | ||
// Just remove the task from dispatch queue. | ||
RAY_LOG(DEBUG) << "Call back to a job finished task, task id = " << task_id; | ||
erase_from_dispatch_queue_fn(work, scheduling_class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this available in worker death reason btw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have
if (force_exit) {
ForceExit(rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
"Worker force exits because its job has finished");
// If this task is originated from a detached actor, | ||
// this field contains the detached actor id. | ||
// Otherwise it's empty and is originated from a driver. | ||
bytes root_detached_actor_id = 40; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytes root_detached_actor_id = 40; | |
bytes root_detached_actor_id = 40; |
why don't we just call it from_detached_actor: bool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way we can differentiate work processes belonging to different detached actors.
Signed-off-by: Jiajun Yao <[email protected]>
Why are these changes needed?
This PR makes sure that when a job finishes, all worker processes (excluding those started by detached actors) belonging to it will forcibly exit. It fixes this by:
This PR doesn't fix the worker leak when detached actors finish. Ideally if we treat detached actors are separate jobs, the same code can just work but this change has bigger impact and higher risk (e.g. are we going to overload the job table since each serve replica is a detached actor, will we run out of job id which is 4 bytes) so I didn't include it here in this PR.
Related issue number
Closes #44897, #44931
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.