Skip to content

Commit

Permalink
[Core] [Arrow 7+ Support] [1/N] Handle task cancellation during argum…
Browse files Browse the repository at this point in the history
…ent deserialization and task error storage. (#29984)

Task execution currently doesn't handle KeyboardInterrupt (which is raised via the SIGINT signal upon task cancellation) during argument deserialization or task error storage, since we are only catching KeyboardInterrupt during actual execution of the task function. This can result in task cancellation causing the underlying worker to crash, since the task cancellation error may never be stored, causing a RAY_CHECK in the task transport to fail. Arguments that take a particularly long time to deserialize can cause this to be hit pretty consistently.

This PR adds a top-level try-except on KeyboardInterrupt that covers (nearly) the entire execute_task function body, ensuring that the SIGINT is properly handled.

This PR is the first PR in a set of stacked PRs making up this mono-PR for adding support for Arrow 7+ support in Ray: #29161
  • Loading branch information
clarkzinzow authored Nov 4, 2022
1 parent e7d9c24 commit ccf6640
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 85 deletions.
251 changes: 166 additions & 85 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ cdef store_task_errors(
ray.util.pdb.post_mortem()

backtrace = ray._private.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
"".join(traceback.format_exception(type(exc), exc, exc.__traceback__)),
task_exception=task_exception)

# Generate the actor repr from the actor class.
actor_repr = repr(actor) if actor else None
Expand Down Expand Up @@ -674,7 +675,7 @@ cdef execute_dynamic_generator_and_store_task_outputs(
"by the first execution.\n"
"See https://github.com/ray-project/ray/issues/28688.")

cdef execute_task(
cdef void execute_task(
const CAddress &caller_address,
CTaskType task_type,
const c_string name,
Expand All @@ -692,25 +693,20 @@ cdef execute_task(
# the concurrency groups of this actor.
const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
const c_string c_name_of_concurrency_group_to_execute,
c_bool is_reattempt):

is_application_error[0] = False
is_retryable_error[0] = False

c_bool is_reattempt,
execution_info,
title,
task_name) except *:
worker = ray._private.worker.global_worker
manager = worker.function_actor_manager
actor = None
cdef:
dict execution_infos = manager.execution_infos
CoreWorker core_worker = worker.core_worker
JobID job_id = core_worker.get_current_job_id()
TaskID task_id = core_worker.get_current_task_id()
CFiberEvent task_done_event
c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs

# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())

# Helper method used to exit current asyncio actor.
# This is called when a KeyboardInterrupt is received by the main thread.
# Upon receiving a KeyboardInterrupt signal, Ray will exit the current
Expand All @@ -726,43 +722,12 @@ cdef execute_task(

function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())

if <int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK:
actor_class = manager.load_actor_class(job_id, function_descriptor)
actor_id = core_worker.get_actor_id()
actor = actor_class.__new__(actor_class)
worker.actors[actor_id] = actor
# Record the actor class via :actor_name: magic token in the log.
#
# (Phase 1): this covers code run before __init__ finishes.
# We need to handle this separately because `__repr__` may not be
# runnable until after `__init__` (e.g., if it accesses fields
# defined in the constructor).
actor_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, actor_class.__name__)
# Flush to both .out and .err
print(actor_magic_token, end="")
print(actor_magic_token, file=sys.stderr, end="")

# Initial eventloops for asyncio for this actor.
if core_worker.current_actor_is_asyncio():
core_worker.initialize_eventloops_for_actor_concurrency_group(
c_defined_concurrency_groups)

execution_info = execution_infos.get(function_descriptor)
if not execution_info:
execution_info = manager.get_execution_info(
job_id, function_descriptor)
execution_infos[function_descriptor] = execution_info

function_name = execution_info.function_name
extra_data = (b'{"name": ' + function_name.encode("ascii") +
b' "task_id": ' + task_id.hex().encode("ascii") + b'}')

task_name = name.decode("utf-8")
name_of_concurrency_group_to_execute = \
c_name_of_concurrency_group_to_execute.decode("ascii")
title = f"ray::{task_name}()"

if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
next_title = "ray::IDLE"
Expand Down Expand Up @@ -816,10 +781,9 @@ cdef execute_task(

with core_worker.profile_event(b"task::" + name, extra_data=extra_data):
try:
task_exception = False
if (not (<int>task_type == <int>TASK_TYPE_ACTOR_TASK
and function_name == "__ray_terminate__") and
ray._config.memory_monitor_interval_ms() == 0):
ray._config.memory_monitor_interval_ms() == 0):
worker.memory_monitor.raise_if_low_memory()

with core_worker.profile_event(b"task:deserialize_arguments"):
Expand All @@ -842,8 +806,9 @@ cdef execute_task(
deserialize_args, function_descriptor,
name_of_concurrency_group_to_execute)
else:
args = ray._private.worker.global_worker.deserialize_objects(
metadata_pairs, object_refs)
args = (ray._private.worker.global_worker
.deserialize_objects(
metadata_pairs, object_refs))

for arg in args:
raise_if_dependency_failed(arg)
Expand Down Expand Up @@ -882,13 +847,11 @@ cdef execute_task(
"RAY_PDB_CONTINUE_{}".format(next_breakpoint),
namespace=ray_constants.KV_NAMESPACE_PDB
)
ray._private.worker.global_worker.debugger_breakpoint = b""
(ray._private.worker.global_worker
.debugger_breakpoint) = b""
task_exception = False
except AsyncioActorExit as e:
exit_current_actor_if_asyncio()
except KeyboardInterrupt as e:
raise TaskCancelledError(
core_worker.get_current_task_id())
except Exception as e:
is_application_error[0] = True
is_retryable_error[0] = determine_if_retryable(
Expand All @@ -907,7 +870,7 @@ cdef execute_task(
else:
logger.debug("Task failed with unretryable exception:"
" {}.".format(
core_worker.get_current_task_id()),
core_worker.get_current_task_id()),
exc_info=True)
raise e
if returns[0].size() == 1 and not inspect.isgenerator(outputs):
Expand All @@ -920,19 +883,15 @@ cdef execute_task(
# (Phase 2): after `__init__` finishes, we override the
# log prefix with the full repr of the actor. The log monitor
# will pick up the updated token.
actor_class = manager.load_actor_class(job_id, function_descriptor)
if (hasattr(actor_class, "__ray_actor_class__") and
actor_class.__ray_actor_class__.__repr__ != object.__repr__):
(actor_class.__ray_actor_class__.__repr__
!= object.__repr__)):
actor_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor))
# Flush on both stdout and stderr.
print(actor_magic_token, end="")
print(actor_magic_token, file=sys.stderr, end="")
# Check for a cancellation that was called when the function
# was exiting and was raised after the except block.
if not check_signals().ok():
task_exception = True
raise TaskCancelledError(
core_worker.get_current_task_id())

if (returns[0].size() > 0 and
not inspect.isgenerator(outputs) and
Expand All @@ -947,8 +906,9 @@ cdef execute_task(
if dynamic_returns != NULL:
if not inspect.isgenerator(outputs):
raise ValueError(
"Functions with @ray.remote(num_returns=\"dynamic\" "
"must return a generator")
"Functions with "
"@ray.remote(num_returns=\"dynamic\" must return a "
"generator")
task_exception = True

execute_dynamic_generator_and_store_task_outputs(
Expand Down Expand Up @@ -981,9 +941,9 @@ cdef execute_task(
core_worker.store_task_outputs(
worker, outputs,
returns)
except Exception as error:
except Exception as e:
num_errors_stored = store_task_errors(
worker, error, task_exception, actor, function_name,
worker, e, task_exception, actor, function_name,
task_type, title, returns)
if returns[0].size() > 0 and num_errors_stored == 0:
logger.exception(
Expand All @@ -992,6 +952,133 @@ cdef execute_task(
"This should only occur when using generator tasks.\n"
"See https://github.com/ray-project/ray/issues/28689.")


cdef execute_task_with_cancellation_handler(
const CAddress &caller_address,
CTaskType task_type,
const c_string name,
const CRayFunction &ray_function,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectReference] &c_arg_refs,
const c_string debugger_breakpoint,
const c_string serialized_retry_exception_allowlist,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns,
c_bool *is_retryable_error,
c_bool *is_application_error,
# This parameter is only used for actor creation task to define
# the concurrency groups of this actor.
const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
const c_string c_name_of_concurrency_group_to_execute,
c_bool is_reattempt):

is_application_error[0] = False
is_retryable_error[0] = False

worker = ray._private.worker.global_worker
manager = worker.function_actor_manager
cdef:
dict execution_infos = manager.execution_infos
CoreWorker core_worker = worker.core_worker
JobID job_id = core_worker.get_current_job_id()
TaskID task_id = core_worker.get_current_task_id()
CFiberEvent task_done_event
c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs

task_name = name.decode("utf-8")
title = f"ray::{task_name}()"

# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())

# Initialize the actor if this is an actor creation task. We do this here
# before setting the current task ID so that we can get the execution info,
# in case executing the main task throws an exception.
function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
if <int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK:
actor_class = manager.load_actor_class(job_id, function_descriptor)
actor_id = core_worker.get_actor_id()
actor = actor_class.__new__(actor_class)
worker.actors[actor_id] = actor
# Record the actor class via :actor_name: magic token in the log.
#
# (Phase 1): this covers code run before __init__ finishes.
# We need to handle this separately because `__repr__` may not be
# runnable until after `__init__` (e.g., if it accesses fields
# defined in the constructor).
actor_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, actor_class.__name__)
# Flush to both .out and .err
print(actor_magic_token, end="")
print(actor_magic_token, file=sys.stderr, end="")

# Initial eventloops for asyncio for this actor.
if core_worker.current_actor_is_asyncio():
core_worker.initialize_eventloops_for_actor_concurrency_group(
c_defined_concurrency_groups)

execution_info = execution_infos.get(function_descriptor)
if not execution_info:
execution_info = manager.get_execution_info(
job_id, function_descriptor)
execution_infos[function_descriptor] = execution_info

global current_task_id

try:
task_id = (ray._private.worker.
global_worker.core_worker.get_current_task_id())
# Set the current task ID, which is checked by a separate thread during
# task cancellation. We must do this inside the try block so that, if
# the task is interrupted because of cancellation, we will catch the
# interrupt error here.
with current_task_id_lock:
current_task_id = task_id

execute_task(caller_address,
task_type,
name,
ray_function,
c_resources,
c_args,
c_arg_refs,
debugger_breakpoint,
serialized_retry_exception_allowlist,
returns,
dynamic_returns,
is_retryable_error,
is_application_error,
c_defined_concurrency_groups,
c_name_of_concurrency_group_to_execute,
is_reattempt, execution_info, title, task_name)

# Check for cancellation.
PyErr_CheckSignals()

except KeyboardInterrupt as e:
# Catch and handle task cancellation, which will result in an interrupt being
# raised.
e = TaskCancelledError(
core_worker.get_current_task_id()).with_traceback(e.__traceback__)

actor = None
actor_id = core_worker.get_actor_id()
if not actor_id.is_nil():
actor = core_worker.actors[actor_id]

store_task_errors(
worker, e,
# Task cancellation can happen anytime so we don't really need
# to differentiate between mid-task or not.
False, # task_exception
actor, execution_info.function_name,
task_type, title, returns)
finally:
with current_task_id_lock:
current_task_id = None

if execution_info.max_calls != 0:
# Reset the state of the worker for the next task to execute.
# Increase the task execution counter.
Expand Down Expand Up @@ -1030,29 +1117,26 @@ cdef CRayStatus task_execution_handler(
const c_string name_of_concurrency_group_to_execute,
c_bool is_reattempt) nogil:

global current_task_id
with gil, disable_client_hook():
try:
task_id = (ray._private.worker.
global_worker.core_worker.get_current_task_id())
with current_task_id_lock:
current_task_id = task_id

try:
# The call to execute_task should never raise an exception. If
# it does, that indicates that there was an internal error.
execute_task(caller_address, task_type, task_name,
ray_function, c_resources,
c_args, c_arg_refs,
debugger_breakpoint,
serialized_retry_exception_allowlist,
returns,
dynamic_returns,
is_retryable_error,
is_application_error,
defined_concurrency_groups,
name_of_concurrency_group_to_execute,
is_reattempt)
# Exceptions, including task cancellation, should be handled
# internal to this call. If it does raise an exception, that
# indicates that there was an internal error.
execute_task_with_cancellation_handler(
caller_address,
task_type, task_name,
ray_function, c_resources,
c_args, c_arg_refs,
debugger_breakpoint,
serialized_retry_exception_allowlist,
returns,
dynamic_returns,
is_retryable_error,
is_application_error,
defined_concurrency_groups,
name_of_concurrency_group_to_execute,
is_reattempt)
except Exception as e:
sys_exit = SystemExit()
if isinstance(e, RayActorError) and \
Expand Down Expand Up @@ -1081,9 +1165,6 @@ cdef CRayStatus task_execution_handler(
job_id=None)
sys_exit.unexpected_error_traceback = traceback_str
raise sys_exit
finally:
with current_task_id_lock:
current_task_id = None
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
Expand Down
Loading

0 comments on commit ccf6640

Please sign in to comment.