Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix potential deadlock in ray.cancel #29763

Merged
merged 1 commit into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ OPTIMIZED = __OPTIMIZE__

logger = logging.getLogger(__name__)

# The currently executing task, if any. These are used to synchronize task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also any thoughts about how we can improve the thread model so it's obvious that the code has no deadlock.

Copy link
Contributor Author

@stephanie-wang stephanie-wang Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the best would be if we could figure out a way to enforce that any Python callbacks don't hold any C++ locks. But I'm not sure how to do that with thread annotations.

# interruption for ray.cancel.
current_task_id = None
current_task_id_lock = threading.Lock()


class ObjectRefGenerator:
def __init__(self, refs):
Expand Down Expand Up @@ -1024,8 +1029,15 @@ cdef CRayStatus task_execution_handler(
const c_vector[CConcurrencyGroup] &defined_concurrency_groups,
const c_string name_of_concurrency_group_to_execute,
c_bool is_reattempt) nogil:

global current_task_id
with gil, disable_client_hook():
try:
task_id = (ray._private.worker.
global_worker.core_worker.get_current_task_id())
with current_task_id_lock:
current_task_id = task_id

try:
# The call to execute_task should never raise an exception. If
# it does, that indicates that there was an internal error.
Expand Down Expand Up @@ -1069,6 +1081,9 @@ cdef CRayStatus task_execution_handler(
job_id=None)
sys_exit.unexpected_error_traceback = traceback_str
raise sys_exit
finally:
with current_task_id_lock:
current_task_id = None
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
Expand Down Expand Up @@ -1096,12 +1111,14 @@ cdef CRayStatus task_execution_handler(

return CRayStatus.OK()

cdef c_bool kill_main_task() nogil:
cdef c_bool kill_main_task(const CTaskID &task_id) nogil:
with gil:
if setproctitle.getproctitle() != "ray::IDLE":
task_id_to_kill = TaskID(task_id.Binary())
with current_task_id_lock:
if current_task_id != task_id_to_kill:
return False
_thread.interrupt_main()
return True
return False


cdef CRayStatus check_signals() nogil:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(void(c_string *stack_out) nogil) get_lang_stack
c_bool is_local_mode
int num_workers
(c_bool() nogil) kill_main
(c_bool(const CTaskID &) nogil) kill_main
CCoreWorkerOptions()
(void() nogil) terminate_asyncio_thread
c_string serialized_job_config
Expand Down
41 changes: 26 additions & 15 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3098,16 +3098,24 @@ void CoreWorker::HandleRemoteCancelTask(rpc::RemoteCancelTaskRequest request,
void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,
rpc::CancelTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
absl::MutexLock lock(&mutex_);
TaskID task_id = TaskID::FromBinary(request.intended_task_id());
bool requested_task_running = main_thread_task_id_ == task_id;
bool requested_task_running;
{
absl::MutexLock lock(&mutex_);
requested_task_running = main_thread_task_id_ == task_id;
}
bool success = requested_task_running;

// Try non-force kill
// Try non-force kill.
// NOTE(swang): We do not hold the CoreWorker lock here because the kill
// callback requires the GIL, which can cause a deadlock with the main task
// thread. This means that the currently executing task can change by the time
// the kill callback runs; the kill callback is responsible for also making
// sure it cancels the right task.
// See https://github.com/ray-project/ray/issues/29739.
if (requested_task_running && !request.force_kill()) {
RAY_LOG(INFO) << "Cancelling a running task " << main_thread_task_name_
<< " thread id: " << main_thread_task_id_;
success = options_.kill_main();
RAY_LOG(INFO) << "Cancelling a running task with id: " << task_id;
success = options_.kill_main(task_id);
} else if (!requested_task_running) {
RAY_LOG(INFO) << "Cancelling a task " << task_id
<< " that's not running. Tasks will be removed from a queue.";
Expand All @@ -3123,19 +3131,22 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,
}
}

// TODO: fix race condition to avoid using this hack
requested_task_running = main_thread_task_id_ == task_id;

reply->set_attempt_succeeded(success);
reply->set_requested_task_running(requested_task_running);
send_reply_callback(Status::OK(), nullptr, nullptr);

// Do force kill after reply callback sent
if (requested_task_running && request.force_kill()) {
ForceExit(rpc::WorkerExitType::INTENDED_USER_EXIT,
absl::StrCat("The worker exits because the task ",
main_thread_task_name_,
" has received a force ray.cancel request."));
// Do force kill after reply callback sent.
if (request.force_kill()) {
// We grab the lock again to make sure that we are force-killing the correct
// task. This is guaranteed not to deadlock because ForceExit should not
// require any other locks.
absl::MutexLock lock(&mutex_);
if (main_thread_task_id_ == task_id) {
ForceExit(rpc::WorkerExitType::INTENDED_USER_EXIT,
absl::StrCat("The worker exits because the task ",
main_thread_task_name_,
" has received a force ray.cancel request."));
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/core_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ struct CoreWorkerOptions {
std::function<void(const RayObject &error)> unhandled_exception_handler;
/// Language worker callback to get the current call stack.
std::function<void(std::string *)> get_lang_stack;
// Function that tries to interrupt the currently running Python thread.
std::function<bool()> kill_main;
// Function that tries to interrupt the currently running Python thread if its
// task ID matches the one given.
std::function<bool(const TaskID &task_id)> kill_main;
/// Is local mode being used.
bool is_local_mode;
/// The function to destroy asyncio event and loops.
Expand Down