diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8efbd645616f..79e82b2ff149 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -568,7 +568,8 @@ cdef store_task_errors( ray.util.pdb.post_mortem() backtrace = ray._private.utils.format_error_message( - traceback.format_exc(), task_exception=task_exception) + "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)), + task_exception=task_exception) # Generate the actor repr from the actor class. actor_repr = repr(actor) if actor else None @@ -674,7 +675,7 @@ cdef execute_dynamic_generator_and_store_task_outputs( "by the first execution.\n" "See https://github.com/ray-project/ray/issues/28688.") -cdef execute_task( +cdef void execute_task( const CAddress &caller_address, CTaskType task_type, const c_string name, @@ -692,25 +693,20 @@ cdef execute_task( # the concurrency groups of this actor. const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups, const c_string c_name_of_concurrency_group_to_execute, - c_bool is_reattempt): - - is_application_error[0] = False - is_retryable_error[0] = False - + c_bool is_reattempt, + execution_info, + title, + task_name) except *: worker = ray._private.worker.global_worker manager = worker.function_actor_manager actor = None cdef: - dict execution_infos = manager.execution_infos CoreWorker core_worker = worker.core_worker JobID job_id = core_worker.get_current_job_id() TaskID task_id = core_worker.get_current_task_id() CFiberEvent task_done_event c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs - # Automatically restrict the GPUs available to this task. - ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) - # Helper method used to exit current asyncio actor. # This is called when a KeyboardInterrupt is received by the main thread. # Upon receiving a KeyboardInterrupt signal, Ray will exit the current @@ -726,43 +722,12 @@ cdef execute_task( function_descriptor = CFunctionDescriptorToPython( ray_function.GetFunctionDescriptor()) - - if task_type == TASK_TYPE_ACTOR_CREATION_TASK: - actor_class = manager.load_actor_class(job_id, function_descriptor) - actor_id = core_worker.get_actor_id() - actor = actor_class.__new__(actor_class) - worker.actors[actor_id] = actor - # Record the actor class via :actor_name: magic token in the log. - # - # (Phase 1): this covers code run before __init__ finishes. - # We need to handle this separately because `__repr__` may not be - # runnable until after `__init__` (e.g., if it accesses fields - # defined in the constructor). - actor_magic_token = "{}{}\n".format( - ray_constants.LOG_PREFIX_ACTOR_NAME, actor_class.__name__) - # Flush to both .out and .err - print(actor_magic_token, end="") - print(actor_magic_token, file=sys.stderr, end="") - - # Initial eventloops for asyncio for this actor. - if core_worker.current_actor_is_asyncio(): - core_worker.initialize_eventloops_for_actor_concurrency_group( - c_defined_concurrency_groups) - - execution_info = execution_infos.get(function_descriptor) - if not execution_info: - execution_info = manager.get_execution_info( - job_id, function_descriptor) - execution_infos[function_descriptor] = execution_info - function_name = execution_info.function_name extra_data = (b'{"name": ' + function_name.encode("ascii") + b' "task_id": ' + task_id.hex().encode("ascii") + b'}') - task_name = name.decode("utf-8") name_of_concurrency_group_to_execute = \ c_name_of_concurrency_group_to_execute.decode("ascii") - title = f"ray::{task_name}()" if task_type == TASK_TYPE_NORMAL_TASK: next_title = "ray::IDLE" @@ -816,10 +781,9 @@ cdef execute_task( with core_worker.profile_event(b"task::" + name, extra_data=extra_data): try: - task_exception = False if (not (task_type == TASK_TYPE_ACTOR_TASK and function_name == "__ray_terminate__") and - ray._config.memory_monitor_interval_ms() == 0): + ray._config.memory_monitor_interval_ms() == 0): worker.memory_monitor.raise_if_low_memory() with core_worker.profile_event(b"task:deserialize_arguments"): @@ -842,8 +806,9 @@ cdef execute_task( deserialize_args, function_descriptor, name_of_concurrency_group_to_execute) else: - args = ray._private.worker.global_worker.deserialize_objects( - metadata_pairs, object_refs) + args = (ray._private.worker.global_worker + .deserialize_objects( + metadata_pairs, object_refs)) for arg in args: raise_if_dependency_failed(arg) @@ -882,13 +847,11 @@ cdef execute_task( "RAY_PDB_CONTINUE_{}".format(next_breakpoint), namespace=ray_constants.KV_NAMESPACE_PDB ) - ray._private.worker.global_worker.debugger_breakpoint = b"" + (ray._private.worker.global_worker + .debugger_breakpoint) = b"" task_exception = False except AsyncioActorExit as e: exit_current_actor_if_asyncio() - except KeyboardInterrupt as e: - raise TaskCancelledError( - core_worker.get_current_task_id()) except Exception as e: is_application_error[0] = True is_retryable_error[0] = determine_if_retryable( @@ -907,7 +870,7 @@ cdef execute_task( else: logger.debug("Task failed with unretryable exception:" " {}.".format( - core_worker.get_current_task_id()), + core_worker.get_current_task_id()), exc_info=True) raise e if returns[0].size() == 1 and not inspect.isgenerator(outputs): @@ -920,19 +883,15 @@ cdef execute_task( # (Phase 2): after `__init__` finishes, we override the # log prefix with the full repr of the actor. The log monitor # will pick up the updated token. + actor_class = manager.load_actor_class(job_id, function_descriptor) if (hasattr(actor_class, "__ray_actor_class__") and - actor_class.__ray_actor_class__.__repr__ != object.__repr__): + (actor_class.__ray_actor_class__.__repr__ + != object.__repr__)): actor_magic_token = "{}{}\n".format( ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor)) # Flush on both stdout and stderr. print(actor_magic_token, end="") print(actor_magic_token, file=sys.stderr, end="") - # Check for a cancellation that was called when the function - # was exiting and was raised after the except block. - if not check_signals().ok(): - task_exception = True - raise TaskCancelledError( - core_worker.get_current_task_id()) if (returns[0].size() > 0 and not inspect.isgenerator(outputs) and @@ -947,8 +906,9 @@ cdef execute_task( if dynamic_returns != NULL: if not inspect.isgenerator(outputs): raise ValueError( - "Functions with @ray.remote(num_returns=\"dynamic\" " - "must return a generator") + "Functions with " + "@ray.remote(num_returns=\"dynamic\" must return a " + "generator") task_exception = True execute_dynamic_generator_and_store_task_outputs( @@ -981,9 +941,9 @@ cdef execute_task( core_worker.store_task_outputs( worker, outputs, returns) - except Exception as error: + except Exception as e: num_errors_stored = store_task_errors( - worker, error, task_exception, actor, function_name, + worker, e, task_exception, actor, function_name, task_type, title, returns) if returns[0].size() > 0 and num_errors_stored == 0: logger.exception( @@ -992,6 +952,133 @@ cdef execute_task( "This should only occur when using generator tasks.\n" "See https://github.com/ray-project/ray/issues/28689.") + +cdef execute_task_with_cancellation_handler( + const CAddress &caller_address, + CTaskType task_type, + const c_string name, + const CRayFunction &ray_function, + const unordered_map[c_string, double] &c_resources, + const c_vector[shared_ptr[CRayObject]] &c_args, + const c_vector[CObjectReference] &c_arg_refs, + const c_string debugger_breakpoint, + const c_string serialized_retry_exception_allowlist, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns, + c_bool *is_retryable_error, + c_bool *is_application_error, + # This parameter is only used for actor creation task to define + # the concurrency groups of this actor. + const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups, + const c_string c_name_of_concurrency_group_to_execute, + c_bool is_reattempt): + + is_application_error[0] = False + is_retryable_error[0] = False + + worker = ray._private.worker.global_worker + manager = worker.function_actor_manager + cdef: + dict execution_infos = manager.execution_infos + CoreWorker core_worker = worker.core_worker + JobID job_id = core_worker.get_current_job_id() + TaskID task_id = core_worker.get_current_task_id() + CFiberEvent task_done_event + c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs + + task_name = name.decode("utf-8") + title = f"ray::{task_name}()" + + # Automatically restrict the GPUs available to this task. + ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + + # Initialize the actor if this is an actor creation task. We do this here + # before setting the current task ID so that we can get the execution info, + # in case executing the main task throws an exception. + function_descriptor = CFunctionDescriptorToPython( + ray_function.GetFunctionDescriptor()) + if task_type == TASK_TYPE_ACTOR_CREATION_TASK: + actor_class = manager.load_actor_class(job_id, function_descriptor) + actor_id = core_worker.get_actor_id() + actor = actor_class.__new__(actor_class) + worker.actors[actor_id] = actor + # Record the actor class via :actor_name: magic token in the log. + # + # (Phase 1): this covers code run before __init__ finishes. + # We need to handle this separately because `__repr__` may not be + # runnable until after `__init__` (e.g., if it accesses fields + # defined in the constructor). + actor_magic_token = "{}{}\n".format( + ray_constants.LOG_PREFIX_ACTOR_NAME, actor_class.__name__) + # Flush to both .out and .err + print(actor_magic_token, end="") + print(actor_magic_token, file=sys.stderr, end="") + + # Initial eventloops for asyncio for this actor. + if core_worker.current_actor_is_asyncio(): + core_worker.initialize_eventloops_for_actor_concurrency_group( + c_defined_concurrency_groups) + + execution_info = execution_infos.get(function_descriptor) + if not execution_info: + execution_info = manager.get_execution_info( + job_id, function_descriptor) + execution_infos[function_descriptor] = execution_info + + global current_task_id + + try: + task_id = (ray._private.worker. + global_worker.core_worker.get_current_task_id()) + # Set the current task ID, which is checked by a separate thread during + # task cancellation. We must do this inside the try block so that, if + # the task is interrupted because of cancellation, we will catch the + # interrupt error here. + with current_task_id_lock: + current_task_id = task_id + + execute_task(caller_address, + task_type, + name, + ray_function, + c_resources, + c_args, + c_arg_refs, + debugger_breakpoint, + serialized_retry_exception_allowlist, + returns, + dynamic_returns, + is_retryable_error, + is_application_error, + c_defined_concurrency_groups, + c_name_of_concurrency_group_to_execute, + is_reattempt, execution_info, title, task_name) + + # Check for cancellation. + PyErr_CheckSignals() + + except KeyboardInterrupt as e: + # Catch and handle task cancellation, which will result in an interrupt being + # raised. + e = TaskCancelledError( + core_worker.get_current_task_id()).with_traceback(e.__traceback__) + + actor = None + actor_id = core_worker.get_actor_id() + if not actor_id.is_nil(): + actor = core_worker.actors[actor_id] + + store_task_errors( + worker, e, + # Task cancellation can happen anytime so we don't really need + # to differentiate between mid-task or not. + False, # task_exception + actor, execution_info.function_name, + task_type, title, returns) + finally: + with current_task_id_lock: + current_task_id = None + if execution_info.max_calls != 0: # Reset the state of the worker for the next task to execute. # Increase the task execution counter. @@ -1030,29 +1117,26 @@ cdef CRayStatus task_execution_handler( const c_string name_of_concurrency_group_to_execute, c_bool is_reattempt) nogil: - global current_task_id with gil, disable_client_hook(): try: - task_id = (ray._private.worker. - global_worker.core_worker.get_current_task_id()) - with current_task_id_lock: - current_task_id = task_id - try: - # The call to execute_task should never raise an exception. If - # it does, that indicates that there was an internal error. - execute_task(caller_address, task_type, task_name, - ray_function, c_resources, - c_args, c_arg_refs, - debugger_breakpoint, - serialized_retry_exception_allowlist, - returns, - dynamic_returns, - is_retryable_error, - is_application_error, - defined_concurrency_groups, - name_of_concurrency_group_to_execute, - is_reattempt) + # Exceptions, including task cancellation, should be handled + # internal to this call. If it does raise an exception, that + # indicates that there was an internal error. + execute_task_with_cancellation_handler( + caller_address, + task_type, task_name, + ray_function, c_resources, + c_args, c_arg_refs, + debugger_breakpoint, + serialized_retry_exception_allowlist, + returns, + dynamic_returns, + is_retryable_error, + is_application_error, + defined_concurrency_groups, + name_of_concurrency_group_to_execute, + is_reattempt) except Exception as e: sys_exit = SystemExit() if isinstance(e, RayActorError) and \ @@ -1081,9 +1165,6 @@ cdef CRayStatus task_execution_handler( job_id=None) sys_exit.unexpected_error_traceback = traceback_str raise sys_exit - finally: - with current_task_id_lock: - current_task_id = None except SystemExit as e: # Tell the core worker to exit as soon as the result objects # are processed. diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index c384dc0da12e..76db6903c30a 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -63,6 +63,35 @@ def wait_for(t): ray.get(obj1) +@pytest.mark.parametrize("use_force", [True, False]) +def test_cancel_during_arg_deser(ray_start_regular, use_force): + time_to_sleep = 5 + + class SlowToDeserialize: + def __reduce__(self): + def reconstruct(): + import time + + time.sleep(time_to_sleep) + return SlowToDeserialize() + + return reconstruct, () + + @ray.remote + def dummy(a: SlowToDeserialize): + # Task should never execute. + assert False + + arg = SlowToDeserialize() + obj = dummy.remote(arg) + # Check that task isn't done. + assert len(ray.wait([obj], timeout=0.1)[0]) == 0 + # Cancel task. + ray.cancel(obj, force=use_force) + with pytest.raises(valid_exceptions(use_force)): + ray.get(obj) + + @pytest.mark.parametrize("use_force", [True, False]) def test_cancel_multiple_dependents(ray_start_regular, use_force): signaler = SignalActor.remote()