From c137b9fe9b2718d76955582cc74c7d09f18b062f Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 27 Oct 2022 10:09:26 -0700 Subject: [PATCH] Fix deadlock in cancel --- python/ray/_raylet.pyx | 23 +++++++++++-- python/ray/includes/libcoreworker.pxd | 2 +- src/ray/core_worker/core_worker.cc | 41 ++++++++++++++--------- src/ray/core_worker/core_worker_options.h | 5 +-- 4 files changed, 50 insertions(+), 21 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 163aa3f634a5..4e581b9df617 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -159,6 +159,11 @@ OPTIMIZED = __OPTIMIZE__ logger = logging.getLogger(__name__) +# The currently executing task, if any. These are used to synchronize task +# interruption for ray.cancel. +current_task_id = None +current_task_id_lock = threading.Lock() + class ObjectRefGenerator: def __init__(self, refs): @@ -1024,8 +1029,15 @@ cdef CRayStatus task_execution_handler( const c_vector[CConcurrencyGroup] &defined_concurrency_groups, const c_string name_of_concurrency_group_to_execute, c_bool is_reattempt) nogil: + + global current_task_id with gil, disable_client_hook(): try: + task_id = (ray._private.worker. + global_worker.core_worker.get_current_task_id()) + with current_task_id_lock: + current_task_id = task_id + try: # The call to execute_task should never raise an exception. If # it does, that indicates that there was an internal error. @@ -1069,6 +1081,9 @@ cdef CRayStatus task_execution_handler( job_id=None) sys_exit.unexpected_error_traceback = traceback_str raise sys_exit + finally: + with current_task_id_lock: + current_task_id = None except SystemExit as e: # Tell the core worker to exit as soon as the result objects # are processed. @@ -1096,12 +1111,14 @@ cdef CRayStatus task_execution_handler( return CRayStatus.OK() -cdef c_bool kill_main_task() nogil: +cdef c_bool kill_main_task(const CTaskID &task_id) nogil: with gil: - if setproctitle.getproctitle() != "ray::IDLE": + task_id_to_kill = TaskID(task_id.Binary()) + with current_task_id_lock: + if current_task_id != task_id_to_kill: + return False _thread.interrupt_main() return True - return False cdef CRayStatus check_signals() nogil: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index f53464d5f98e..5c5ebbe08a17 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -321,7 +321,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: (void(c_string *stack_out) nogil) get_lang_stack c_bool is_local_mode int num_workers - (c_bool() nogil) kill_main + (c_bool(const CTaskID &) nogil) kill_main CCoreWorkerOptions() (void() nogil) terminate_asyncio_thread c_string serialized_job_config diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 0e2949026f46..20282b65958f 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3098,16 +3098,24 @@ void CoreWorker::HandleRemoteCancelTask(rpc::RemoteCancelTaskRequest request, void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, rpc::CancelTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { - absl::MutexLock lock(&mutex_); TaskID task_id = TaskID::FromBinary(request.intended_task_id()); - bool requested_task_running = main_thread_task_id_ == task_id; + bool requested_task_running; + { + absl::MutexLock lock(&mutex_); + requested_task_running = main_thread_task_id_ == task_id; + } bool success = requested_task_running; - // Try non-force kill + // Try non-force kill. + // NOTE(swang): We do not hold the CoreWorker lock here because the kill + // callback requires the GIL, which can cause a deadlock with the main task + // thread. This means that the currently executing task can change by the time + // the kill callback runs; the kill callback is responsible for also making + // sure it cancels the right task. + // See https://github.com/ray-project/ray/issues/29739. if (requested_task_running && !request.force_kill()) { - RAY_LOG(INFO) << "Cancelling a running task " << main_thread_task_name_ - << " thread id: " << main_thread_task_id_; - success = options_.kill_main(); + RAY_LOG(INFO) << "Cancelling a running task with id: " << task_id; + success = options_.kill_main(task_id); } else if (!requested_task_running) { RAY_LOG(INFO) << "Cancelling a task " << task_id << " that's not running. Tasks will be removed from a queue."; @@ -3123,19 +3131,22 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, } } - // TODO: fix race condition to avoid using this hack - requested_task_running = main_thread_task_id_ == task_id; - reply->set_attempt_succeeded(success); reply->set_requested_task_running(requested_task_running); send_reply_callback(Status::OK(), nullptr, nullptr); - // Do force kill after reply callback sent - if (requested_task_running && request.force_kill()) { - ForceExit(rpc::WorkerExitType::INTENDED_USER_EXIT, - absl::StrCat("The worker exits because the task ", - main_thread_task_name_, - " has received a force ray.cancel request.")); + // Do force kill after reply callback sent. + if (request.force_kill()) { + // We grab the lock again to make sure that we are force-killing the correct + // task. This is guaranteed not to deadlock because ForceExit should not + // require any other locks. + absl::MutexLock lock(&mutex_); + if (main_thread_task_id_ == task_id) { + ForceExit(rpc::WorkerExitType::INTENDED_USER_EXIT, + absl::StrCat("The worker exits because the task ", + main_thread_task_name_, + " has received a force ray.cancel request.")); + } } } diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index 3231b2aef86f..d073cda6b064 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -148,8 +148,9 @@ struct CoreWorkerOptions { std::function unhandled_exception_handler; /// Language worker callback to get the current call stack. std::function get_lang_stack; - // Function that tries to interrupt the currently running Python thread. - std::function kill_main; + // Function that tries to interrupt the currently running Python thread if its + // task ID matches the one given. + std::function kill_main; /// Is local mode being used. bool is_local_mode; /// The function to destroy asyncio event and loops.