diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index c62765d53b10..7b0c55d401b6 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -59,6 +59,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, + /*returns_dynamic=*/false, required_resources, required_placement_resources, "", diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index 426920b0cdbe..747cc83165e5 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -119,16 +119,17 @@ std::pair> GetExecuteResult( } Status TaskExecutor::ExecuteTask( + const rpc::Address &caller_address, ray::TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args_buffer, const std::vector &arg_refs, - const std::vector &return_ids, const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, - std::vector> *results, + std::vector>> *returns, + std::vector>> *dynamic_returns, std::shared_ptr &creation_task_exception_pb_bytes, bool *is_retryable_error, const std::vector &defined_concurrency_groups, @@ -209,11 +210,10 @@ Status TaskExecutor::ExecuteTask( data = std::make_shared(std::move(buf)); } - results->resize(return_ids.size(), nullptr); if (task_type != ray::TaskType::ACTOR_CREATION_TASK) { size_t data_size = data->size(); - auto &result_id = return_ids[0]; - auto result_ptr = &(*results)[0]; + auto &result_id = (*returns)[0].first; + auto result_ptr = &(*returns)[0].second; int64_t task_output_inlined_bytes = 0; if (cross_lang && meta_buffer == nullptr) { @@ -250,7 +250,10 @@ Status TaskExecutor::ExecuteTask( } } - RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result)); + RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject( + result_id, + result, + /*generator_id=*/ObjectID::Nil())); } else { if (!status.ok()) { return ray::Status::CreationTaskError(""); diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index 2e48351e86dc..12169026e21d 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -75,16 +75,17 @@ class TaskExecutor { absl::Mutex &actor_contexts_mutex); static Status ExecuteTask( + const rpc::Address &caller_address, ray::TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_refs, - const std::vector &return_ids, const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, - std::vector> *results, + std::vector>> *returns, + std::vector>> *dynamic_returns, std::shared_ptr &creation_task_exception_pb_bytes, bool *is_retryable_error, const std::vector &defined_concurrency_groups, diff --git a/doc/source/ray-core/doc_code/generator.py b/doc/source/ray-core/doc_code/generator.py new file mode 100644 index 000000000000..1daf9598494a --- /dev/null +++ b/doc/source/ray-core/doc_code/generator.py @@ -0,0 +1,71 @@ +# __program_start__ +import ray +from ray import ObjectRefGenerator + +# fmt: off +# __dynamic_generator_start__ +import numpy as np + + +@ray.remote(num_returns="dynamic") +def split(array, chunk_size): + while len(array) > 0: + yield array[:chunk_size] + array = array[chunk_size:] + + +array_ref = ray.put(np.zeros(np.random.randint(1000_000))) +block_size = 1000 + +# Returns an ObjectRef[ObjectRefGenerator]. +dynamic_ref = split.remote(array_ref, block_size) +print(dynamic_ref) +# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000) + +i = -1 +ref_generator = ray.get(dynamic_ref) +print(ref_generator) +# +for i, ref in enumerate(ref_generator): + # Each ObjectRefGenerator iteration returns an ObjectRef. + assert len(ray.get(ref)) <= block_size +num_blocks_generated = i + 1 +array_size = len(ray.get(array_ref)) +assert array_size <= num_blocks_generated * block_size +print(f"Split array of size {array_size} into {num_blocks_generated} blocks of " + f"size {block_size} each.") +# Split array of size 63153 into 64 blocks of size 1000 each. + +# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this +# ObjectRef goes out of scope so that Ray can garbage-collect the internal +# ObjectRefs. +del dynamic_ref +# __dynamic_generator_end__ +# fmt: on + + +# fmt: off +# __dynamic_generator_pass_start__ +@ray.remote +def get_size(ref_generator : ObjectRefGenerator): + print(ref_generator) + num_elements = 0 + for ref in ref_generator: + array = ray.get(ref) + assert len(array) <= block_size + num_elements += len(array) + return num_elements + + +# Returns an ObjectRef[ObjectRefGenerator]. +dynamic_ref = split.remote(array_ref, block_size) +assert array_size == ray.get(get_size.remote(dynamic_ref)) +# (get_size pid=1504184) + +# This also works, but should be avoided because you have to call an additional +# `ray.get`, which blocks the driver. +ref_generator = ray.get(dynamic_ref) +assert array_size == ray.get(get_size.remote(ref_generator)) +# (get_size pid=1504184) +# __dynamic_generator_pass_end__ +# fmt: on diff --git a/doc/source/ray-core/patterns/generators.rst b/doc/source/ray-core/patterns/generators.rst index c19c21d49448..6e6f80d1d766 100644 --- a/doc/source/ray-core/patterns/generators.rst +++ b/doc/source/ray-core/patterns/generators.rst @@ -1,4 +1,4 @@ -.. _generators: +.. _generator-pattern: Pattern: Using generators to reduce heap memory usage ===================================================== diff --git a/doc/source/ray-core/tasks.rst b/doc/source/ray-core/tasks.rst index 3861f28ced73..aa1f87863eb2 100644 --- a/doc/source/ray-core/tasks.rst +++ b/doc/source/ray-core/tasks.rst @@ -164,7 +164,8 @@ By default, a Ray task only returns a single Object Ref. However, you can config .. literalinclude:: doc_code/tasks_multiple_returns.py -For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. See the :ref:`user guide ` for more details on use cases. + +For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. Ray also supports an option to set the number of return values dynamically, which can be useful when the task caller does not know how many return values to expect. See the :ref:`user guide ` for more details on use cases. .. tabbed:: Python @@ -213,6 +214,7 @@ More about Ray Tasks tasks/resources.rst tasks/using-ray-with-gpus.rst tasks/nested-tasks.rst + tasks/generators.rst tasks/fault-tolerance.rst tasks/scheduling.rst tasks/patterns/index.rst diff --git a/doc/source/ray-core/tasks/generators.rst b/doc/source/ray-core/tasks/generators.rst new file mode 100644 index 000000000000..a8d34fc9ba50 --- /dev/null +++ b/doc/source/ray-core/tasks/generators.rst @@ -0,0 +1,86 @@ +.. _generators: + +Generators +========== + +Python generators are functions that behave like an iterator, yielding one +value per iteration. Ray supports remote generators for two use cases: + +1. To reduce max heap memory usage when returning multiple values from a remote + function. See the :ref:`design pattern guide ` for an + example. +2. When the number of return values is set dynamically by the remote function + instead of by the caller. + +Remote generators can be used in both actor and non-actor tasks. + +`num_returns` set by the task caller +------------------------------------ + +Where possible, the caller should set the remote function's number of return values using ``@ray.remote(num_returns=x)`` or ``foo.options(num_returns=x).remote()``. +Ray will return this many ``ObjectRefs`` to the caller. +The remote task should then return the same number of values, usually as a tuple or list. +Compared to setting the number of return values dynamically, this adds less complexity to user code and less performance overhead, as Ray will know exactly how many ``ObjectRefs`` to return to the caller ahead of time. + +Without changing the caller's syntax, we can also use a remote generator function to return the values iteratively. +The generator should return the same number of return values specified by the caller, and these will be stored one at a time in Ray's object store. +An error will be returned for generators that return a different number of values from the one specified by the caller. + +For example, we can swap the following code that returns a list of return values: + +.. literalinclude:: ../doc_code/pattern_generators.py + :language: python + :start-after: __large_values_start__ + :end-before: __large_values_end__ + +for this code, which uses a generator function: + +.. literalinclude:: ../doc_code/pattern_generators.py + :language: python + :start-after: __large_values_generator_start__ + :end-before: __large_values_generator_end__ + +The advantage of doing so is that the generator function does not need to hold all of its return values in memory at once. +It can return the arrays one at a time to reduce memory pressure. + +`num_returns` set by the task executor +-------------------------------------- + +In some cases, the caller may not know the number of return values to expect from a remote function. +For example, suppose we want to write a task that breaks up its argument into equal-size chunks and returns these. +We may not know the size of the argument until we execute the task, so we don't know the number of return values to expect. + +In these cases, we can use a remote generator function that returns a *dynamic* number of values. +To use this feature, set ``num_returns="dynamic"`` in the ``@ray.remote`` decorator or the remote function's ``.options()``. +Then, when invoking the remote function, Ray will return a *single* ``ObjectRef`` that will get populated with an ``ObjectRefGenerator`` when the task completes. +The ``ObjectRefGenerator`` can be used to iterate over a list of ``ObjectRefs`` containing the actual values returned by the task. + +.. note:: ``num_returns="dynamic"`` is currently an experimental API in v2.1+. + +.. literalinclude:: ../doc_code/generator.py + :language: python + :start-after: __dynamic_generator_start__ + :end-before: __dynamic_generator_end__ + +We can also pass the ``ObjectRef`` returned by a task with ``num_returns="dynamic"`` to another task. The task will receive the ``ObjectRefGenerator``, which it can use to iterate over the task's return values. Similarly, you can also pass an ``ObjectRefGenerator`` as a task argument. + +.. literalinclude:: ../doc_code/generator.py + :language: python + :start-after: __dynamic_generator_pass_start__ + :end-before: __dynamic_generator_pass_end__ + +Limitations +----------- + +Although a generator function creates ``ObjectRefs`` one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list. + +``num_returns="dynamic"`` is not yet supported for actor tasks. + +If a generator function raises an exception before yielding all its values, all values returned by the generator will be replaced by the exception traceback, including values that were already successfully yielded. +If the task was called with ``num_returns="dynamic"``, the exception will be stored in the ``ObjectRef`` returned by the task instead of the usual ``ObjectRefGenerator``. + +Note that there is currently a known bug where exceptions will not be propagated for generators that yield objects in Ray's shared-memory object store before erroring. In this case, these objects will still be accessible through the returned ``ObjectRefs`` and you may see an error like the following: + +.. code-block:: text + + $ ERROR worker.py:754 -- Generator threw exception after returning partial values in the object store, error may be unhandled. diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 78d8867309f5..387d7b9db1d0 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -125,6 +125,7 @@ def _configure_system(): FunctionID, ObjectID, ObjectRef, + ObjectRefGenerator, TaskID, UniqueID, Language, @@ -248,6 +249,7 @@ def __getattr__(self, attr): "FunctionID", "ObjectID", "ObjectRef", + "ObjectRefGenerator", "TaskID", "UniqueID", "PlacementGroupID", diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index b24e35a0d9e3..e51301cf2fd6 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -120,7 +120,13 @@ def issubclass_safe(obj: Any, cls_: type) -> bool: ), # override "_common_options" "num_cpus": _resource_option("num_cpus", default_value=1), - "num_returns": _counting_option("num_returns", False, default_value=1), + "num_returns": Option( + (int, str, type(None)), + lambda x: x is None or x == "dynamic" or x >= 0, + "The keyword 'num_returns' only accepts None, a non-negative integer, or " + '"dynamic" (for generators)', + default_value=1, + ), "object_store_memory": Option( # override "_common_options" (int, type(None)), lambda x: x is None, diff --git a/python/ray/_private/serialization.py b/python/ray/_private/serialization.py index a1f1a83d18a1..995b56ab181b 100644 --- a/python/ray/_private/serialization.py +++ b/python/ray/_private/serialization.py @@ -9,6 +9,7 @@ from ray._raylet import ( MessagePackSerializedObject, MessagePackSerializer, + ObjectRefGenerator, Pickle5SerializedObject, Pickle5Writer, RawSerializedObject, @@ -121,6 +122,14 @@ def object_ref_reducer(obj): ) self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer) + + def object_ref_generator_reducer(obj): + return ObjectRefGenerator, (obj._refs,) + + self._register_cloudpickle_reducer( + ObjectRefGenerator, object_ref_generator_reducer + ) + serialization_addons.apply(self) def _register_cloudpickle_reducer(self, cls, reducer): diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index ff3bf9836399..bda68fae4f75 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -2266,8 +2266,7 @@ def get( if not isinstance(object_refs, list): raise ValueError( - "'object_refs' must either be an object ref " - "or a list of object refs." + "'object_refs' must either be an ObjectRef or a list of ObjectRefs." ) # TODO(ujvl): Consider how to allow user to retrieve the ready objects. @@ -2888,8 +2887,11 @@ def remote( Args: num_returns: This is only for *remote functions*. It specifies - the number of object refs returned by - the remote function invocation. + the number of object refs returned by the remote function + invocation. Pass "dynamic" to allow the task to decide how many + return values to return during execution, and the caller will + receive an ObjectRef[ObjectRefGenerator] (note, this setting is + experimental). num_cpus: The quantity of CPU cores to reserve for this task or for the lifetime of the actor. num_gpus: The quantity of GPUs to reserve diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index ae7ee588d19e..f6f34a24b55f 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -11,10 +11,19 @@ from libc.stdint cimport ( from libcpp cimport bool as c_bool from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector +from libcpp.unordered_map cimport unordered_map from libcpp.memory cimport ( shared_ptr, unique_ptr ) +from libcpp.pair cimport pair as c_pair +from libcpp.utility cimport pair +from ray.includes.optional cimport ( + optional, + nullopt, + make_optional, +) + from ray.includes.common cimport ( CBuffer, CRayObject, @@ -123,13 +132,17 @@ cdef class CoreWorker: c_bool inline_small_object=*) cdef unique_ptr[CAddress] _convert_python_address(self, address=*) cdef store_task_output( - self, serialized_object, const CObjectID &return_id, size_t - data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID] + self, serialized_object, + const CObjectID &return_id, + const CObjectID &generator_id, + size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID] &contained_id, int64_t *task_output_inlined_bytes, shared_ptr[CRayObject] *return_ptr) cdef store_task_outputs( - self, worker, outputs, const c_vector[CObjectID] return_ids, - c_vector[shared_ptr[CRayObject]] *returns) + self, + worker, outputs, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, + CObjectID ref_generator_id=*) cdef yield_current_fiber(self, CFiberEvent &fiber_event) cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle) cdef c_function_descriptors_to_python( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 6e5f2adf5f6e..0f1360e1066e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -159,6 +159,22 @@ OPTIMIZED = __OPTIMIZE__ logger = logging.getLogger(__name__) + +class ObjectRefGenerator: + def __init__(self, refs): + # TODO(swang): As an optimization, can also store the generator + # ObjectID so that we don't need to keep individual ref counts for the + # inner ObjectRefs. + self._refs = refs + + def __iter__(self): + while self._refs: + yield self._refs.pop(0) + + def __len__(self): + return len(self._refs) + + cdef int check_status(const CRayStatus& status) nogil except -1: if status.ok(): return 0 @@ -531,16 +547,17 @@ cdef c_bool determine_if_retryable( cdef execute_task( + const CAddress &caller_address, CTaskType task_type, const c_string name, const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, const c_vector[CObjectReference] &c_arg_refs, - const c_vector[CObjectID] &c_return_ids, const c_string debugger_breakpoint, const c_string serialized_retry_exception_allowlist, - c_vector[shared_ptr[CRayObject]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns, c_bool *is_retryable_error, # This parameter is only used for actor creation task to define # the concurrency groups of this actor. @@ -558,6 +575,7 @@ cdef execute_task( JobID job_id = core_worker.get_current_job_id() TaskID task_id = core_worker.get_current_task_id() CFiberEvent task_done_event + c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs # Automatically restrict the GPUs available to this task. ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) @@ -759,7 +777,7 @@ cdef execute_task( core_worker.get_current_task_id()), exc_info=True) raise e - if c_return_ids.size() == 1: + if returns[0].size() == 1 and not inspect.isgenerator(outputs): # If there is only one return specified, we should return # all return values as a single object. outputs = (outputs,) @@ -784,17 +802,37 @@ cdef execute_task( raise TaskCancelledError( core_worker.get_current_task_id()) - if (c_return_ids.size() > 0 and + if (returns[0].size() > 0 and not inspect.isgenerator(outputs) and - len(outputs) != int(c_return_ids.size())): + len(outputs) != int(returns[0].size())): raise ValueError( "Task returned {} objects, but num_returns={}.".format( - len(outputs), c_return_ids.size())) + len(outputs), returns[0].size())) # Store the outputs in the object store. with core_worker.profile_event(b"task:store_outputs"): + num_returns = returns[0].size() + if dynamic_returns != NULL: + if not inspect.isgenerator(outputs): + raise ValueError( + "Functions with @ray.remote(num_returns=\"dynamic\" " + "must return a generator") + core_worker.store_task_outputs( + worker, outputs, + dynamic_returns, + returns[0][0].first) + dynamic_refs = [] + for idx in range(dynamic_returns.size()): + dynamic_refs.append(ObjectRef( + dynamic_returns[0][idx].first.Binary(), + caller_address.SerializeAsString(), + )) + # Swap out the generator for an ObjectRef generator. + outputs = (ObjectRefGenerator(dynamic_refs), ) + core_worker.store_task_outputs( - worker, outputs, c_return_ids, returns) + worker, outputs, + returns) except Exception as error: # If the debugger is enabled, drop into the remote pdb here. if "RAY_PDB" in os.environ: @@ -816,10 +854,20 @@ cdef execute_task( error, proctitle=title, actor_repr=actor_repr) errors = [] - for _ in range(c_return_ids.size()): + for _ in range(returns[0].size()): errors.append(failure_object) core_worker.store_task_outputs( - worker, errors, c_return_ids, returns) + worker, errors, + returns) + if dynamic_returns != NULL: + # Store errors for any dynamically generated objects too. + dynamic_errors = [] + for _ in range(dynamic_returns[0].size()): + dynamic_errors.append(failure_object) + core_worker.store_task_outputs( + worker, dynamic_errors, + dynamic_returns) + ray._private.utils.push_error_to_driver( worker, ray_constants.TASK_PUSH_ERROR, @@ -848,16 +896,17 @@ cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error): py_bytes, len(py_bytes), True) cdef CRayStatus task_execution_handler( + const CAddress &caller_address, CTaskType task_type, const c_string task_name, const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, const c_vector[CObjectReference] &c_arg_refs, - const c_vector[CObjectID] &c_return_ids, const c_string debugger_breakpoint, const c_string serialized_retry_exception_allowlist, - c_vector[shared_ptr[CRayObject]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns, shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes, c_bool *is_retryable_error, const c_vector[CConcurrencyGroup] &defined_concurrency_groups, @@ -867,11 +916,13 @@ cdef CRayStatus task_execution_handler( try: # The call to execute_task should never raise an exception. If # it does, that indicates that there was an internal error. - execute_task(task_type, task_name, ray_function, c_resources, - c_args, c_arg_refs, c_return_ids, + execute_task(caller_address, task_type, task_name, + ray_function, c_resources, + c_args, c_arg_refs, debugger_breakpoint, serialized_retry_exception_allowlist, returns, + dynamic_returns, is_retryable_error, defined_concurrency_groups, name_of_concurrency_group_to_execute) @@ -1408,6 +1459,7 @@ cdef class CoreWorker: check_status( CCoreWorkerProcess.GetCoreWorker().SealExisting( c_object_id, pin_object=False, + generator_id=CObjectID.Nil(), owner_address=c_owner_address)) def put_serialized_object_and_increment_local_ref(self, serialized_object, @@ -1466,6 +1518,7 @@ cdef class CoreWorker: check_status( CCoreWorkerProcess.GetCoreWorker().SealExisting( c_object_id, pin_object=False, + generator_id=CObjectID.Nil(), owner_address=move(c_owner_address))) return c_object_id.Binary() @@ -2067,6 +2120,7 @@ cdef class CoreWorker: serialized_object_status)) cdef store_task_output(self, serialized_object, const CObjectID &return_id, + const CObjectID &generator_id, size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID] &contained_id, int64_t *task_output_inlined_bytes, @@ -2093,38 +2147,85 @@ cdef class CoreWorker: with nogil: check_status( CCoreWorkerProcess.GetCoreWorker().SealReturnObject( - return_id, return_ptr[0])) + return_id, return_ptr[0], generator_id)) return True else: with nogil: success = (CCoreWorkerProcess.GetCoreWorker() - .PinExistingReturnObject(return_id, return_ptr)) + .PinExistingReturnObject( + return_id, return_ptr, generator_id)) return success - cdef store_task_outputs( - self, worker, outputs, const c_vector[CObjectID] return_ids, - c_vector[shared_ptr[CRayObject]] *returns): + cdef store_task_outputs(self, + worker, outputs, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] + *returns, + CObjectID ref_generator_id=CObjectID.Nil()): cdef: CObjectID return_id size_t data_size shared_ptr[CBuffer] metadata c_vector[CObjectID] contained_id int64_t task_output_inlined_bytes + int64_t num_returns = -1 + shared_ptr[CRayObject] *return_ptr + if not ref_generator_id.IsNil(): + # The task specified a dynamic number of return values. Determine + # the expected number of return values. + if returns[0].size() > 0: + # We are re-executing the task. We should return the same + # number of objects as before. + num_returns = returns[0].size() + else: + # This is the first execution of the task, so we don't know how + # many return objects it should have yet. + # NOTE(swang): returns could also be empty if the task returned + # an empty generator and was re-executed. However, this should + # not happen because we never reconstruct empty + # ObjectRefGenerators (since these are not stored in plasma). + num_returns = -1 + else: + # The task specified how many return values it should have. + num_returns = returns[0].size() - if return_ids.size() == 0: + if num_returns == 0: return - n_returns = return_ids.size() - returns.resize(n_returns) task_output_inlined_bytes = 0 + i = -1 for i, output in enumerate(outputs): - if i >= n_returns: + if num_returns >= 0 and i >= num_returns: raise ValueError( "Task returned more than num_returns={} objects.".format( - n_returns)) + num_returns)) + while i >= returns[0].size(): + return_id = (CCoreWorkerProcess.GetCoreWorker() + .AllocateDynamicReturnId()) + returns[0].push_back( + c_pair[CObjectID, shared_ptr[CRayObject]]( + return_id, shared_ptr[CRayObject]())) + assert i < returns[0].size() + return_id = returns[0][i].first + if returns[0][i].second == nullptr: + returns[0][i].second = shared_ptr[CRayObject]() + return_ptr = &returns[0][i].second + + # Skip return values that we already created and that were stored + # in plasma. This can occur if there were multiple return values, + # and we errored while trying to create one of them. + if (return_ptr.get() != NULL and return_ptr.get().GetData().get() + != NULL and + return_ptr.get().GetData().get().IsPlasmaBuffer()): + # TODO(swang): This return object already has a value stored in Plasma + # because we created it before the error triggered. We should + # try to delete the current value and store the same error + # instead here. + logger.error("Generator threw exception after returning partial " + "values in the object store, error may be unhandled.") + continue - return_id = return_ids[i] context = worker.get_serialization_context() + serialized_object = context.serialize(output) data_size = serialized_object.total_bytes metadata_str = serialized_object.metadata @@ -2142,21 +2243,24 @@ cdef class CoreWorker: if not self.store_task_output( serialized_object, return_id, + ref_generator_id, data_size, metadata, contained_id, - &task_output_inlined_bytes, &returns[0][i]): + &task_output_inlined_bytes, return_ptr): # If the object already exists, but we fail to pin the copy, it # means the existing copy might've gotten evicted. Try to # create another copy. self.store_task_output( - serialized_object, return_id, data_size, metadata, + serialized_object, return_id, + ref_generator_id, + data_size, metadata, contained_id, &task_output_inlined_bytes, - &returns[0][i]) + return_ptr) i += 1 - if i < n_returns: + if i < num_returns: raise ValueError( "Task returned {} objects, but num_returns={}.".format( - i, n_returns)) + i, num_returns)) cdef c_function_descriptors_to_python( self, diff --git a/python/ray/actor.py b/python/ray/actor.py index 3afd7ba3a5ae..32fb7ed6ffb9 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1161,6 +1161,12 @@ def _actor_method_call( not self._ray_is_cross_language ), "Cross language remote actor method cannot be executed locally." + if num_returns == "dynamic": + # TODO(swang): Support dynamic generators for actors. + raise NotImplementedError( + 'num_returns="dynamic" not yet supported for actor tasks.' + ) + object_refs = worker.core_worker.submit_actor_task( self._ray_actor_language, self._ray_actor_id, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index b153e7c4b814..3f16da71ebce 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -218,6 +218,7 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil: cdef cppclass CBuffer "ray::Buffer": uint8_t *Data() const size_t Size() const + c_bool IsPlasmaBuffer() const cdef cppclass LocalMemoryBuffer(CBuffer): LocalMemoryBuffer(uint8_t *data, size_t size, c_bool copy_data) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 0a32daf6bc6a..2f5553689e85 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -146,12 +146,15 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: shared_ptr[CRayObject] *return_object) CRayStatus SealReturnObject( const CObjectID& return_id, - shared_ptr[CRayObject] return_object + shared_ptr[CRayObject] return_object, + const CObjectID& generator_id ) c_bool PinExistingReturnObject( const CObjectID& return_id, - shared_ptr[CRayObject] *return_object + shared_ptr[CRayObject] *return_object, + const CObjectID& generator_id ) + CObjectID AllocateDynamicReturnId() CJobID GetCurrentJobId() CTaskID GetCurrentTaskId() @@ -217,6 +220,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, const unique_ptr[CAddress] &owner_address) CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, + const CObjectID &generator_id, const unique_ptr[CAddress] &owner_address) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results) @@ -280,16 +284,17 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_string stdout_file c_string stderr_file (CRayStatus( + const CAddress &caller_address, CTaskType task_type, const c_string name, const CRayFunction &ray_function, const unordered_map[c_string, double] &resources, const c_vector[shared_ptr[CRayObject]] &args, const c_vector[CObjectReference] &arg_refs, - const c_vector[CObjectID] &return_ids, const c_string debugger_breakpoint, const c_string serialized_retry_exception_allowlist, - c_vector[shared_ptr[CRayObject]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, + c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns, shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes, c_bool *is_retryable_error, diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index bc9558c731ad..1a39ba911876 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -299,6 +299,8 @@ def _remote(self, args=None, kwargs=None, **task_options): ] scheduling_strategy = task_options["scheduling_strategy"] num_returns = task_options["num_returns"] + if num_returns == "dynamic": + num_returns = -1 max_retries = task_options["max_retries"] retry_exceptions = task_options["retry_exceptions"] if isinstance(retry_exceptions, (list, tuple)): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index e26e2556635c..c5741ab6e653 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -203,23 +203,37 @@ class A: ) # Type check - for keyword in ("num_returns", "max_retries", "max_calls"): + for keyword in ("max_retries", "max_calls"): with pytest.raises(TypeError, match=re.escape(template1.format(keyword))): ray.remote(**{keyword: np.random.uniform(0, 1)})(f) + num_returns_template = ( + "The type of keyword 'num_returns' " + + f"must be {(int, str, type(None))}, but received type {float}" + ) + with pytest.raises(TypeError, match=re.escape(num_returns_template)): + ray.remote(**{"num_returns": np.random.uniform(0, 1)})(f) for keyword in ("max_restarts", "max_task_retries"): with pytest.raises(TypeError, match=re.escape(template1.format(keyword))): ray.remote(**{keyword: np.random.uniform(0, 1)})(A) # Value check for non-negative finite values - for keyword in ("num_returns", "max_calls"): - for v in (np.random.randint(-100, -2), -1): - with pytest.raises( - ValueError, - match=f"The keyword '{keyword}' only accepts None, " - f"0 or a positive integer", - ): - ray.remote(**{keyword: v})(f) + for v in (np.random.randint(-100, -2), -1): + keyword = "max_calls" + with pytest.raises( + ValueError, + match=f"The keyword '{keyword}' only accepts None, " + f"0 or a positive integer", + ): + ray.remote(**{keyword: v})(f) + + keyword = "num_returns" + with pytest.raises( + ValueError, + match=f"The keyword '{keyword}' only accepts None, " + 'a non-negative integer, or "dynamic"', + ): + ray.remote(**{keyword: v})(f) # Value check for non-negative and infinite values template2 = ( diff --git a/python/ray/tests/test_generators.py b/python/ray/tests/test_generators.py index 5923537e10a0..3e04b30d0002 100644 --- a/python/ray/tests/test_generators.py +++ b/python/ray/tests/test_generators.py @@ -36,7 +36,8 @@ def large_values_generator(num_returns): @pytest.mark.parametrize("use_actors", [False, True]) -def test_generator_returns(ray_start_regular, use_actors): +@pytest.mark.parametrize("store_in_plasma", [False, True]) +def test_generator_returns(ray_start_regular, use_actors, store_in_plasma): remote_generator_fn = None if use_actors: @@ -45,18 +46,24 @@ class Generator: def __init__(self): pass - def generator(self, num_returns): + def generator(self, num_returns, store_in_plasma): for i in range(num_returns): - yield i + if store_in_plasma: + yield np.ones(1_000_000, dtype=np.int8) * i + else: + yield [i] g = Generator.remote() remote_generator_fn = g.generator else: @ray.remote(max_retries=0) - def generator(num_returns): + def generator(num_returns, store_in_plasma): for i in range(num_returns): - yield i + if store_in_plasma: + yield np.ones(1_000_000, dtype=np.int8) * i + else: + yield [i] remote_generator_fn = generator @@ -66,32 +73,302 @@ def generator(num_returns): try: ray.get( - remote_generator_fn.options(num_returns=num_returns).remote(num_returns - 1) + remote_generator_fn.options(num_returns=num_returns).remote( + num_returns - 1, store_in_plasma + ) ) assert False except ray.exceptions.RayTaskError as e: assert isinstance(e.as_instanceof_cause(), ValueError) - try: + if store_in_plasma: + # TODO(swang): Currently there is a bug when a generator errors after + # already storing some values in plasma. The already stored values can + # be accessed and the error is lost. Propagate the error correctly by + # replacing all successfully stored return values with the same error. ray.get( - remote_generator_fn.options(num_returns=num_returns).remote(num_returns + 1) + remote_generator_fn.options(num_returns=num_returns).remote( + num_returns + 1, store_in_plasma + ) ) - assert False - except ray.exceptions.RayTaskError as e: - assert isinstance(e.as_instanceof_cause(), ValueError) - - # Check num_returns=1 case, should receive TypeError because generator - # cannot be pickled. - try: - ray.get(remote_generator_fn.remote(num_returns)) - assert False - except ray.exceptions.RayTaskError as e: - assert isinstance(e.as_instanceof_cause(), TypeError) + else: + try: + ray.get( + remote_generator_fn.options(num_returns=num_returns).remote( + num_returns + 1, store_in_plasma + ) + ) + assert False + except ray.exceptions.RayTaskError as e: + assert isinstance(e.as_instanceof_cause(), ValueError) # Check return values. - ray.get( - remote_generator_fn.options(num_returns=num_returns).remote(num_returns) - ) == list(range(num_returns)) + [ + x[0] + for x in ray.get( + remote_generator_fn.options(num_returns=num_returns).remote( + num_returns, store_in_plasma + ) + ) + ] == list(range(num_returns)) + # Works for num_returns=1 if generator returns a single value. + assert ( + ray.get(remote_generator_fn.options(num_returns=1).remote(1, store_in_plasma))[ + 0 + ] + == 0 + ) + + +@pytest.mark.parametrize("store_in_plasma", [False, True]) +def test_generator_errors(ray_start_regular, store_in_plasma): + @ray.remote(max_retries=0) + def generator(num_returns, store_in_plasma): + for i in range(num_returns - 2): + if store_in_plasma: + yield np.ones(1_000_000, dtype=np.int8) * i + else: + yield [i] + raise Exception("error") + + ref1, ref2, ref3 = generator.options(num_returns=3).remote(3, store_in_plasma) + # TODO(swang): Currently there is a bug when a generator errors after + # already storing some values in plasma. The already stored values can + # be accessed and the error is lost. Propagate the error correctly by + # replacing all successfully stored return values with the same error. + if not store_in_plasma: + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(ref1) + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(ref2) + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(ref3) + + dynamic_ref = generator.options(num_returns="dynamic").remote(3, store_in_plasma) + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(dynamic_ref) + + +@pytest.mark.parametrize("store_in_plasma", [False, True]) +def test_dynamic_generator(ray_start_regular, store_in_plasma): + @ray.remote(num_returns="dynamic") + def dynamic_generator(num_returns, store_in_plasma): + for i in range(num_returns): + if store_in_plasma: + yield np.ones(1_000_000, dtype=np.int8) * i + else: + yield [i] + + @ray.remote + def read(gen): + for i, ref in enumerate(gen): + if ray.get(ref)[0] != i: + return False + return True + + gen = ray.get(dynamic_generator.remote(10, store_in_plasma)) + for i, ref in enumerate(gen): + assert ray.get(ref)[0] == i + + # Test empty generator. + gen = ray.get(dynamic_generator.remote(0, store_in_plasma)) + assert len(gen) == 0 + + # Check that passing as task arg. + gen = dynamic_generator.remote(10, store_in_plasma) + assert ray.get(read.remote(gen)) + assert ray.get(read.remote(ray.get(gen))) + + # Generator remote functions with num_returns=1 error if they return more + # than 1 value. + # TODO(swang): Currently there is a bug when a generator errors after + # already storing some values in plasma. The already stored values can + # be accessed and the error is lost. Propagate the error correctly by + # replacing all successfully stored return values with the same error. + if not store_in_plasma: + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(dynamic_generator.options(num_returns=1).remote(3, store_in_plasma)) + + # Normal remote functions don't work with num_returns="dynamic". + @ray.remote(num_returns="dynamic") + def static(num_returns): + return list(range(num_returns)) + + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(static.remote(3)) + + +def test_dynamic_generator_reconstruction(ray_start_cluster): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "max_direct_call_object_size": 100, + "task_retry_delay_ms": 100, + "object_timeout_milliseconds": 200, + "fetch_warn_timeout_milliseconds": 1000, + } + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, _system_config=config, enable_object_reconstruction=True + ) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + cluster.wait_for_nodes() + + @ray.remote(num_returns="dynamic") + def dynamic_generator(num_returns): + for i in range(num_returns): + # Random ray.put to make sure it's okay to interleave these with + # the dynamic returns. + if np.random.randint(2) == 1: + ray.put(np.ones(1_000_000, dtype=np.int8) * np.random.randint(100)) + yield np.ones(1_000_000, dtype=np.int8) * i + + @ray.remote + def fetch(x): + return x[0] + + # Test recovery of all dynamic objects through re-execution. + gen = ray.get(dynamic_generator.remote(10)) + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + refs = list(gen) + for i, ref in enumerate(refs): + assert ray.get(fetch.remote(ref)) == i + + cluster.add_node(num_cpus=1, resources={"node2": 1}, object_store_memory=10 ** 8) + + # Fetch one of the ObjectRefs to another node. We should try to reuse this + # copy during recovery. + ray.get(fetch.options(resources={"node2": 1}).remote(refs[-1])) + cluster.remove_node(node_to_kill, allow_graceful=False) + for i, ref in enumerate(refs): + assert ray.get(fetch.remote(ref)) == i + + +@pytest.mark.parametrize("too_many_returns", [False, True]) +def test_dynamic_generator_reconstruction_nondeterministic( + ray_start_cluster, too_many_returns +): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "max_direct_call_object_size": 100, + "task_retry_delay_ms": 100, + "object_timeout_milliseconds": 200, + "fetch_warn_timeout_milliseconds": 1000, + } + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + enable_object_reconstruction=True, + resources={"head": 1}, + ) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + cluster.wait_for_nodes() + + @ray.remote(num_cpus=0, resources={"head": 1}) + class ExecutionCounter: + def __init__(self): + self.count = 0 + + def inc(self): + self.count += 1 + return self.count + + @ray.remote(num_returns="dynamic") + def dynamic_generator(exec_counter): + num_returns = 10 + if ray.get(exec_counter.inc.remote()) > 1: + if too_many_returns: + num_returns += 1 + else: + num_returns -= 1 + for i in range(num_returns): + yield np.ones(1_000_000, dtype=np.int8) * i + + @ray.remote + def fetch(x): + return + + exec_counter = ExecutionCounter.remote() + gen = ray.get(dynamic_generator.remote(exec_counter)) + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + refs = list(gen) + if too_many_returns: + for ref in refs: + ray.get(ref) + else: + with pytest.raises(ray.exceptions.RayTaskError): + for ref in refs: + ray.get(ref) + # TODO(swang): If the re-executed task returns a different number of + # objects, we should throw an error for every return value. + # for ref in refs: + # with pytest.raises(ray.exceptions.RayTaskError): + # ray.get(ref) + + +def test_dynamic_empty_generator_reconstruction_nondeterministic(ray_start_cluster): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "max_direct_call_object_size": 100, + "task_retry_delay_ms": 100, + "object_timeout_milliseconds": 200, + "fetch_warn_timeout_milliseconds": 1000, + } + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + enable_object_reconstruction=True, + resources={"head": 1}, + ) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + cluster.wait_for_nodes() + + @ray.remote(num_cpus=0, resources={"head": 1}) + class ExecutionCounter: + def __init__(self): + self.count = 0 + + def inc(self): + self.count += 1 + return self.count + + def get_count(self): + return self.count + + @ray.remote(num_returns="dynamic") + def maybe_empty_generator(exec_counter): + if ray.get(exec_counter.inc.remote()) > 1: + for i in range(3): + yield np.ones(1_000_000, dtype=np.int8) * i + + @ray.remote + def check(empty_generator): + return len(empty_generator) == 0 + + exec_counter = ExecutionCounter.remote() + gen = maybe_empty_generator.remote(exec_counter) + assert ray.get(check.remote(gen)) + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10 ** 8) + assert ray.get(check.remote(gen)) + + # We should never reconstruct an empty generator. + assert ray.get(exec_counter.get_count.remote()) == 1 if __name__ == "__main__": diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index 81720055b549..578bb8d902f3 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -746,6 +746,32 @@ def ping(self): ray.get(config["actor"].ping.remote()) +def test_generators(one_worker_100MiB): + @ray.remote(num_returns="dynamic") + def remote_generator(): + for _ in range(3): + yield np.zeros(10 * 1024 * 1024, dtype=np.uint8) + + gen = ray.get(remote_generator.remote()) + refs = list(gen) + for r in refs: + _fill_object_store_and_get(r) + + # Outer ID out of scope, we should still be able to get the dynamic + # objects. + del gen + for r in refs: + _fill_object_store_and_get(r) + + # Inner IDs out of scope. + refs_oids = [r.binary() for r in refs] + del r + del refs + + for r_oid in refs_oids: + _fill_object_store_and_get(r_oid, succeed=False) + + if __name__ == "__main__": import sys diff --git a/src/mock/ray/raylet_client/raylet_client.h b/src/mock/ray/raylet_client/raylet_client.h index dee2808e0fea..c7a5fc9474a7 100644 --- a/src/mock/ray/raylet_client/raylet_client.h +++ b/src/mock/ray/raylet_client/raylet_client.h @@ -20,6 +20,7 @@ class MockPinObjectsInterface : public PinObjectsInterface { PinObjectIDs, (const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const ray::rpc::ClientCallback &callback), (override)); }; @@ -192,6 +193,7 @@ class MockRayletClientInterface : public RayletClientInterface { PinObjectIDs, (const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const ray::rpc::ClientCallback &callback), (override)); MOCK_METHOD(void, diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 0893f5385948..43191d6ce4b4 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -207,6 +207,21 @@ ObjectID TaskSpecification::ReturnId(size_t return_index) const { return ObjectID::FromIndex(TaskId(), return_index + 1); } +bool TaskSpecification::ReturnsDynamic() const { return message_->returns_dynamic(); } + +std::vector TaskSpecification::DynamicReturnIds() const { + RAY_CHECK(message_->returns_dynamic()); + std::vector dynamic_return_ids; + for (const auto &dynamic_return_id : message_->dynamic_return_ids()) { + dynamic_return_ids.push_back(ObjectID::FromBinary(dynamic_return_id)); + } + return dynamic_return_ids; +} + +void TaskSpecification::AddDynamicReturnId(const ObjectID &dynamic_return_id) { + message_->add_dynamic_return_ids(dynamic_return_id.Binary()); +} + bool TaskSpecification::ArgByRef(size_t arg_index) const { return message_->args(arg_index).has_object_ref(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 2f4ee06affb6..eda8319a8657 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -225,6 +225,12 @@ class TaskSpecification : public MessageWrapper { ObjectID ReturnId(size_t return_index) const; + bool ReturnsDynamic() const; + + std::vector DynamicReturnIds() const; + + void AddDynamicReturnId(const ObjectID &dynamic_return_id); + const uint8_t *ArgData(size_t arg_index) const; size_t ArgDataSize(size_t arg_index) const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index a5e4713023de..13dfa97e6787 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -108,6 +108,7 @@ class TaskSpecBuilder { const TaskID &caller_id, const rpc::Address &caller_address, uint64_t num_returns, + bool returns_dynamic, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, @@ -125,6 +126,7 @@ class TaskSpecBuilder { message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); message_->set_num_returns(num_returns); + message_->set_returns_dynamic(returns_dynamic); message_->mutable_required_resources()->insert(required_resources.begin(), required_resources.end()); message_->mutable_required_placement_resources()->insert( diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 389221700666..5dc332995d9b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -100,7 +100,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5); + std::placeholders::_5, + std::placeholders::_6); direct_task_receiver_ = std::make_unique( worker_context_, task_execution_service_, execute_task, [this] { return local_raylet_client_->TaskDone(); @@ -926,6 +927,7 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object, local_raylet_client_->PinObjectIDs( rpc_address_, {object_id}, + /*generator_id=*/ObjectID::Nil(), [this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) { // Only release the object once the raylet has responded to avoid the race // condition that the object could be evicted before the raylet pins it. @@ -1059,7 +1061,8 @@ Status CoreWorker::CreateExisting(const std::shared_ptr &metadata, Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object, const std::unique_ptr &owner_address) { - auto status = SealExisting(object_id, pin_object, std::move(owner_address)); + auto status = + SealExisting(object_id, pin_object, ObjectID::Nil(), std::move(owner_address)); if (status.ok()) return status; RemoveLocalReference(object_id); if (reference_counter_->HasReference(object_id)) { @@ -1072,6 +1075,7 @@ Status CoreWorker::SealOwned(const ObjectID &object_id, Status CoreWorker::SealExisting(const ObjectID &object_id, bool pin_object, + const ObjectID &generator_id, const std::unique_ptr &owner_address) { RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id)); if (pin_object) { @@ -1080,6 +1084,7 @@ Status CoreWorker::SealExisting(const ObjectID &object_id, local_raylet_client_->PinObjectIDs( owner_address != nullptr ? *owner_address : rpc_address_, {object_id}, + generator_id, [this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) { // Only release the object once the raylet has responded to avoid the race // condition that the object could be evicted before the raylet pins it. @@ -1536,7 +1541,7 @@ void CoreWorker::BuildCommonTaskSpec( const rpc::Address &address, const RayFunction &function, const std::vector> &args, - uint64_t num_returns, + int64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, @@ -1546,6 +1551,14 @@ void CoreWorker::BuildCommonTaskSpec( // Build common task spec. auto override_runtime_env_info = OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info); + + bool returns_dynamic = num_returns == -1; + if (returns_dynamic) { + // This remote function returns 1 ObjectRef, whose value + // is a generator of ObjectRefs. + num_returns = 1; + } + RAY_CHECK(num_returns >= 0); builder.SetCommonTaskSpec(task_id, name, function.GetLanguage(), @@ -1556,6 +1569,7 @@ void CoreWorker::BuildCommonTaskSpec( caller_id, address, num_returns, + returns_dynamic, required_resources, required_placement_resources, debugger_breakpoint, @@ -2182,11 +2196,13 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id, return Status::OK(); } -Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, - const std::shared_ptr &resource_ids, - std::vector> *return_objects, - ReferenceCounter::ReferenceTableProto *borrowed_refs, - bool *is_retryable_error) { +Status CoreWorker::ExecuteTask( + const TaskSpecification &task_spec, + const std::shared_ptr &resource_ids, + std::vector>> *return_objects, + std::vector>> *dynamic_return_objects, + ReferenceCounter::ReferenceTableProto *borrowed_refs, + bool *is_retryable_error) { RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString(); task_queue_length_ -= 1; num_executed_tasks_ += 1; @@ -2218,16 +2234,31 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, std::vector borrowed_ids; RAY_CHECK_OK(GetAndPinArgsForExecutor(task_spec, &args, &arg_refs, &borrowed_ids)); - std::vector return_ids; for (size_t i = 0; i < task_spec.NumReturns(); i++) { - return_ids.push_back(task_spec.ReturnId(i)); + return_objects->push_back(std::make_pair<>(task_spec.ReturnId(i), nullptr)); + } + // For dynamic tasks, pass the return IDs that were dynamically generated on + // the first execution. + if (!task_spec.ReturnsDynamic()) { + dynamic_return_objects = NULL; + } else if (task_spec.AttemptNumber() > 0) { + for (const auto &dynamic_return_id : task_spec.DynamicReturnIds()) { + dynamic_return_objects->push_back( + std::make_pair<>(dynamic_return_id, std::shared_ptr())); + RAY_LOG(DEBUG) << "Re-executed task " << task_spec.TaskId() + << " should return dynamic object " << dynamic_return_id; + + AddLocalReference(dynamic_return_id, ""); + reference_counter_->AddBorrowedObject( + dynamic_return_id, ObjectID::Nil(), task_spec.CallerAddress()); + } } Status status; TaskType task_type = TaskType::NORMAL_TASK; if (task_spec.IsActorCreationTask()) { - RAY_CHECK(return_ids.size() > 0); - return_ids.pop_back(); + RAY_CHECK(return_objects->size() > 0); + return_objects->pop_back(); task_type = TaskType::ACTOR_CREATION_TASK; SetActorId(task_spec.ActorCreationId()); { @@ -2242,8 +2273,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId(); } else if (task_spec.IsActorTask()) { - RAY_CHECK(return_ids.size() > 0); - return_ids.pop_back(); + RAY_CHECK(return_objects->size() > 0); + return_objects->pop_back(); task_type = TaskType::ACTOR_TASK; } @@ -2258,16 +2289,17 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } status = options_.task_execution_callback( + task_spec.CallerAddress(), task_type, task_spec.GetName(), func, task_spec.GetRequiredResources().GetResourceUnorderedMap(), args, arg_refs, - return_ids, task_spec.GetDebuggerBreakpoint(), task_spec.GetSerializedRetryExceptionAllowlist(), return_objects, + dynamic_return_objects, creation_task_exception_pb_bytes, is_retryable_error, defined_concurrency_groups, @@ -2283,6 +2315,12 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, if (!borrowed_ids.empty()) { reference_counter_->PopAndClearLocalBorrowers(borrowed_ids, borrowed_refs, &deleted); } + if (dynamic_return_objects != NULL) { + for (const auto &dynamic_return : *dynamic_return_objects) { + reference_counter_->PopAndClearLocalBorrowers( + {dynamic_return.first}, borrowed_refs, &deleted); + } + } memory_store_->Delete(deleted); if (task_spec.IsNormalTask() && reference_counter_->NumObjectIDsInScope() != 0) { @@ -2337,7 +2375,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } Status CoreWorker::SealReturnObject(const ObjectID &return_id, - std::shared_ptr return_object) { + std::shared_ptr return_object, + const ObjectID &generator_id) { RAY_LOG(DEBUG) << "Sealing return object " << return_id; Status status = Status::OK(); RAY_CHECK(return_object); @@ -2345,7 +2384,8 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, std::unique_ptr caller_address = std::make_unique(worker_context_.GetCurrentTask()->CallerAddress()); if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { - status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address)); + status = SealExisting( + return_id, /*pin_object=*/true, generator_id, std::move(caller_address)); if (!status.ok()) { RAY_LOG(FATAL) << "Failed to seal object " << return_id << " in store: " << status.message(); @@ -2355,7 +2395,8 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, } bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, - std::shared_ptr *return_object) { + std::shared_ptr *return_object, + const ObjectID &generator_id) { // TODO(swang): If there is already an existing copy of this object, then it // might not have the same value as the new copy. It would be better to evict // the existing copy here. @@ -2386,6 +2427,7 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, local_raylet_client_->PinObjectIDs( owner_address, {return_id}, + generator_id, [return_id, pinned_return_object](const Status &status, const rpc::PinObjectIDsReply &reply) { if (!status.ok() || !reply.successes(0)) { @@ -2405,10 +2447,20 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, } } +ObjectID CoreWorker::AllocateDynamicReturnId() { + const auto &task_spec = worker_context_.GetCurrentTask(); + const auto return_id = + ObjectID::FromIndex(task_spec->TaskId(), worker_context_.GetNextPutIndex()); + AddLocalReference(return_id, ""); + reference_counter_->AddBorrowedObject( + return_id, ObjectID::Nil(), worker_context_.GetCurrentTask()->CallerAddress()); + return return_id; +} + std::vector CoreWorker::ExecuteTaskLocalMode( const TaskSpecification &task_spec, const ActorID &actor_id) { auto resource_ids = std::make_shared(); - auto return_objects = std::vector>(); + auto return_objects = std::vector>>(); auto borrowed_refs = ReferenceCounter::ReferenceTableProto(); std::vector returned_refs; @@ -2434,8 +2486,14 @@ std::vector CoreWorker::ExecuteTaskLocalMode( auto old_id = GetActorId(); SetActorId(actor_id); bool is_retryable_error; - RAY_UNUSED(ExecuteTask( - task_spec, resource_ids, &return_objects, &borrowed_refs, &is_retryable_error)); + // TODO(swang): Support ObjectRefGenerators in local mode? + std::vector>> dynamic_return_objects; + RAY_UNUSED(ExecuteTask(task_spec, + resource_ids, + &return_objects, + &dynamic_return_objects, + &borrowed_refs, + &is_retryable_error)); SetActorId(old_id); return returned_refs; } @@ -2740,6 +2798,17 @@ void CoreWorker::ProcessSubscribeForObjectEviction( return; } + if (message.has_generator_id()) { + // For dynamically generated return values, the raylet may subscribe to + // eviction events before we know about the object. This can happen when we + // receive the subscription request before the reply from the task that + // created the object. Add the dynamically created object to our ref + // counter so that we know that it exists. + const auto generator_id = ObjectID::FromBinary(message.generator_id()); + RAY_CHECK(!generator_id.IsNil()); + reference_counter_->AddDynamicReturn(object_id, generator_id); + } + // Returns true if the object was present and the callback was added. It might have // already been evicted by the time we get this request, in which case we should // respond immediately so the raylet unpins the object. @@ -2829,7 +2898,11 @@ void CoreWorker::HandleUpdateObjectLocationBatch( object_location_update.spilled_location_update().spilled_url(), object_location_update.spilled_location_update().spilled_to_local_storage() ? node_id - : NodeID::Nil()); + : NodeID::Nil(), + object_location_update.has_generator_id() + ? std::optional( + ObjectID::FromBinary(object_location_update.generator_id())) + : std::nullopt); } if (object_location_update.has_plasma_location_update()) { @@ -2852,12 +2925,30 @@ void CoreWorker::HandleUpdateObjectLocationBatch( /*failure_callback_on_reply*/ nullptr); } -void CoreWorker::AddSpilledObjectLocationOwner(const ObjectID &object_id, - const std::string &spilled_url, - const NodeID &spilled_node_id) { +void CoreWorker::AddSpilledObjectLocationOwner( + const ObjectID &object_id, + const std::string &spilled_url, + const NodeID &spilled_node_id, + const std::optional &generator_id) { RAY_LOG(DEBUG) << "Received object spilled location update for object " << object_id << ", which has been spilled to " << spilled_url << " on node " << spilled_node_id; + if (generator_id.has_value()) { + // For dynamically generated return values, the raylet may spill the + // primary copy before we know about the object. This can happen when the + // object is spilled before the reply from the task that created the + // object. Add the dynamically created object to our ref counter so that we + // know that it exists. + // NOTE(swang): We don't need to do this for in-plasma object locations because: + // 1) We will add the primary copy as a location when processing the task + // reply. + // 2) It is not possible to copy the object to a second location until + // after the owner has added the object to the ref count table (since no + // raylet can get the current location of the object until this happens). + RAY_CHECK(!generator_id->IsNil()); + reference_counter_->AddDynamicReturn(object_id, *generator_id); + } + auto reference_exists = reference_counter_->HandleObjectSpilled(object_id, spilled_url, spilled_node_id); if (!reference_exists) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 105f033c6cf3..fe02f2e4361f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -340,11 +340,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] object_id Object ID corresponding to the object. /// \param[in] pin_object Whether or not to pin the object at the local raylet. + /// \param[in] generator_id For dynamically created objects, this is the ID + /// of the object that wraps the dynamically created ObjectRefs in a + /// generator. We use this to notify the owner of the dynamically created + /// objects. /// \param[in] owner_address Address of the owner of the object who will be contacted by /// the raylet if the object is pinned. If not provided, defaults to this worker. /// \return Status. Status SealExisting(const ObjectID &object_id, bool pin_object, + const ObjectID &generator_id = ObjectID::Nil(), const std::unique_ptr &owner_address = nullptr); /// Get a list of objects from the object store. Objects that failed to be retrieved @@ -634,8 +639,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] return_id Object ID of the return value. /// \param[in] return_object RayObject containing the buffer written info. /// \return Status. + /// \param[in] generator_id For dynamically created objects, this is the ID + /// of the object that wraps the dynamically created ObjectRefs in a + /// generator. We use this to notify the owner of the dynamically created + /// objects. Status SealReturnObject(const ObjectID &return_id, - std::shared_ptr return_object); + std::shared_ptr return_object, + const ObjectID &generator_id); /// Pin the local copy of the return object, if one exists. /// @@ -643,8 +653,23 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[out] return_object The object that was pinned. /// \return success if the object still existed and was pinned. Note that /// pinning is done asynchronously. + /// \param[in] generator_id For dynamically created objects, this is the ID + /// of the object that wraps the dynamically created ObjectRefs in a + /// generator. We use this to notify the owner of the dynamically created + /// objects. bool PinExistingReturnObject(const ObjectID &return_id, - std::shared_ptr *return_object); + std::shared_ptr *return_object, + const ObjectID &generator_id); + + /// Dynamically allocate an object. + /// + /// This should be used during task execution, if the task wants to return an + /// object to the task caller and have the resulting ObjectRef be owned by + /// the caller. This is in contrast to static allocation, where the caller + /// decides at task invocation time how many returns the task should have. + /// + /// \param[out] The ObjectID that the caller should use to store the object. + ObjectID AllocateDynamicReturnId(); /// Get a handle to an actor. /// @@ -851,7 +876,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const rpc::Address &address, const RayFunction &function, const std::vector> &args, - uint64_t num_returns, + int64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, @@ -929,18 +954,25 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// worker. If nullptr, reuse the previously assigned /// resources. /// \param results[out] return_objects Result objects that should be returned - /// by value (not via plasma). + /// to the caller. + /// \param results[out] dynamic_return_objects Result objects whose + /// ObjectRefs were dynamically allocated during task execution by using a + /// generator. The language-level ObjectRefs should be returned inside the + /// statically allocated return_objects. /// \param results[out] borrowed_refs Refs that this task (or a nested task) /// was or is still borrowing. This includes all /// objects whose IDs we passed to the task in its /// arguments and recursively, any object IDs that were /// contained in those objects. /// \return Status. - Status ExecuteTask(const TaskSpecification &task_spec, - const std::shared_ptr &resource_ids, - std::vector> *return_objects, - ReferenceCounter::ReferenceTableProto *borrowed_refs, - bool *is_retryable_error); + Status ExecuteTask( + const TaskSpecification &task_spec, + const std::shared_ptr &resource_ids, + std::vector>> *return_objects, + std::vector>> + *dynamic_return_objects, + ReferenceCounter::ReferenceTableProto *borrowed_refs, + bool *is_retryable_error); /// Put an object in the local plasma store. Status PutInLocalPlasmaStore(const RayObject &object, @@ -1028,7 +1060,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void AddSpilledObjectLocationOwner(const ObjectID &object_id, const std::string &spilled_url, - const NodeID &spilled_node_id); + const NodeID &spilled_node_id, + const std::optional &generator_id); void AddObjectLocationOwner(const ObjectID &object_id, const NodeID &node_id); diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index efb50202f20d..ed8699ec3953 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -33,16 +33,17 @@ struct CoreWorkerOptions { // Callback that must be implemented and provided by the language-specific worker // frontend to execute tasks and return their results. using TaskExecutionCallback = std::function &required_resources, const std::vector> &args, const std::vector &arg_refs, - const std::vector &return_ids, const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, - std::vector> *results, + std::vector>> *returns, + std::vector>> *dynamic_returns, std::shared_ptr &creation_task_exception_pb_bytes, bool *is_retryable_error, // The following 2 parameters `defined_concurrency_groups` and diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index b496842f9285..4a5a2d5199c6 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -108,16 +108,17 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, jint startupToken, jint runtimeEnvHash) { auto task_execution_callback = - [](TaskType task_type, + [](const rpc::Address &caller_address, + TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_refs, - const std::vector &return_ids, const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, - std::vector> *results, + std::vector>> *returns, + std::vector>> *dynamic_returns, std::shared_ptr &creation_task_exception_pb, bool *is_retryable_error, const std::vector &defined_concurrency_groups, @@ -185,7 +186,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, int64_t task_output_inlined_bytes = 0; // Process return objects. - if (!return_ids.empty()) { + if (!returns->empty()) { std::vector> return_objects; JavaListToNativeVector>( env, @@ -194,9 +195,8 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, [](JNIEnv *env, jobject java_native_ray_object) { return JavaNativeRayObjectToNativeRayObject(env, java_native_ray_object); }); - results->resize(return_ids.size(), nullptr); for (size_t i = 0; i < return_objects.size(); i++) { - auto &result_id = return_ids[i]; + auto &result_id = (*returns)[i].first; size_t data_size = return_objects[i]->HasData() ? return_objects[i]->GetData()->Size() : 0; auto &metadata = return_objects[i]->GetMetadata(); @@ -204,7 +204,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, for (const auto &ref : return_objects[i]->GetNestedRefs()) { contained_object_ids.push_back(ObjectID::FromBinary(ref.object_id())); } - auto result_ptr = &(*results)[0]; + auto result_ptr = &(*returns)[i].second; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject( result_id, @@ -224,8 +224,8 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, } } - RAY_CHECK_OK( - CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result)); + RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject( + result_id, result, ObjectID::Nil())); } } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index c14cfe186a8b..c46b2d1b251e 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -74,7 +74,10 @@ Status PutSerializedObject(JNIEnv *env, *out_object_id, pin_object, owner_address)); } else { RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealExisting( - *out_object_id, /* pin_object = */ false, owner_address)); + *out_object_id, + /* pin_object = */ false, + /* generator_id = */ ObjectID::Nil(), + owner_address)); } } return Status::OK(); diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 448a3bdce4f2..8ba0b2128181 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -120,6 +120,7 @@ void ObjectRecoveryManager::PinExistingObjectCopy( client->PinObjectIDs(rpc_address_, {object_id}, + /*generator_id=*/ObjectID::Nil(), [this, object_id, other_locations, node_id]( const Status &status, const rpc::PinObjectIDsReply &reply) { if (status.ok() && reply.successes(0)) { diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 5b5ece7f30a5..de0adb756219 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -198,10 +198,60 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id) { - RAY_LOG(DEBUG) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); - RAY_CHECK(object_id_refs_.count(object_id) == 0) + RAY_CHECK(AddOwnedObjectInternal(object_id, + inner_ids, + owner_address, + call_site, + object_size, + is_reconstructable, + add_local_ref, + pinned_at_raylet_id)) << "Tried to create an owned object that already exists: " << object_id; +} + +void ReferenceCounter::AddDynamicReturn(const ObjectID &object_id, + const ObjectID &generator_id) { + absl::MutexLock lock(&mutex_); + auto outer_it = object_id_refs_.find(generator_id); + if (outer_it == object_id_refs_.end()) { + // Outer object already went out of scope. Either: + // 1. The inner object was never deserialized and has already gone out of + // scope. + // 2. The inner object was deserialized and we already added it as a + // dynamic return. + // Either way, we shouldn't add the inner object to the ref count. + return; + } + RAY_LOG(DEBUG) << "Adding dynamic return " << object_id + << " contained in generator object " << generator_id; + RAY_CHECK(outer_it->second.owned_by_us); + RAY_CHECK(outer_it->second.owner_address.has_value()); + rpc::Address owner_address(outer_it->second.owner_address.value()); + RAY_UNUSED(AddOwnedObjectInternal(object_id, + {}, + owner_address, + outer_it->second.call_site, + /*object_size=*/-1, + outer_it->second.is_reconstructable, + /*add_local_ref=*/false, + absl::optional())); + AddNestedObjectIdsInternal(generator_id, {object_id}, owner_address); +} + +bool ReferenceCounter::AddOwnedObjectInternal( + const ObjectID &object_id, + const std::vector &inner_ids, + const rpc::Address &owner_address, + const std::string &call_site, + const int64_t object_size, + bool is_reconstructable, + bool add_local_ref, + const absl::optional &pinned_at_raylet_id) { + if (object_id_refs_.count(object_id) != 0) { + return false; + } + RAY_LOG(DEBUG) << "Adding owned object " << object_id; // If the entry doesn't exist, we initialize the direct reference count to zero // because this corresponds to a submitted task whose return ObjectID will be created // in the frontend language, incrementing the reference count. @@ -233,6 +283,7 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, if (add_local_ref) { it->second.local_ref_count++; } + return true; } void ReferenceCounter::UpdateObjectSize(const ObjectID &object_id, int64_t object_size) { @@ -809,10 +860,9 @@ void ReferenceCounter::PopAndClearLocalBorrowers( RAY_LOG(WARNING) << "Tried to decrease ref count for object ID that has count 0 " << borrowed_id << ". This should only happen if ray.internal.free was called earlier."; - continue; + } else { + it->second.local_ref_count--; } - - it->second.local_ref_count--; PRINT_REF_COUNT(it); if (it->second.RefCount() == 0) { DeleteReferenceInternal(it, deleted); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 4aaefb592bd0..b9aee68e831b 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -190,6 +190,17 @@ class ReferenceCounter : public ReferenceCounterInterface, const absl::optional &pinned_at_raylet_id = absl::optional()) LOCKS_EXCLUDED(mutex_); + /// Add an owned object that was dynamically created. These are objects that + /// were created by a task that we called, but that we own. + /// + /// \param[in] object_id The ID of the object that we now own. + /// \param[in] generator_id The ID of the object that wraps the dynamically + /// created object ref. This should be an object that we own, and we will + /// update its ref count info to show that it contains the dynamically + /// created ObjectID. + void AddDynamicReturn(const ObjectID &object_id, const ObjectID &generator_id) + LOCKS_EXCLUDED(mutex_); + /// Update the size of the object. /// /// \param[in] object_id The ID of the object. @@ -735,6 +746,16 @@ class ReferenceCounter : public ReferenceCounterInterface, using ReferenceTable = absl::flat_hash_map; using ReferenceProtoTable = absl::flat_hash_map; + bool AddOwnedObjectInternal(const ObjectID &object_id, + const std::vector &contained_ids, + const rpc::Address &owner_address, + const std::string &call_site, + const int64_t object_size, + bool is_reconstructable, + bool add_local_ref, + const absl::optional &pinned_at_raylet_id) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void SetNestedRefInUseRecursive(ReferenceTable::iterator inner_ref_it) EXCLUSIVE_LOCKS_REQUIRED(mutex_); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 78d9fd29d4a6..cb8f31b1c9c1 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -256,6 +256,65 @@ size_t TaskManager::NumPendingTasks() const { return num_pending_tasks_; } +bool TaskManager::HandleTaskReturn(const ObjectID &object_id, + const rpc::ReturnObject &return_object, + const NodeID &worker_raylet_id, + bool store_in_plasma) { + bool direct_return = false; + reference_counter_->UpdateObjectSize(object_id, return_object.size()); + RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " + << return_object.size(); + + const auto nested_refs = + VectorFromProtobuf(return_object.nested_inlined_refs()); + if (return_object.in_plasma()) { + // NOTE(swang): We need to add the location of the object before marking + // it as local in the in-memory store so that the data locality policy + // will choose the right raylet for any queued dependent tasks. + reference_counter_->UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id); + // Mark it as in plasma with a dummy object. + RAY_CHECK( + in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); + } else { + // NOTE(swang): If a direct object was promoted to plasma, then we do not + // record the node ID that it was pinned at, which means that we will not + // be able to reconstruct it if the plasma object copy is lost. However, + // this is okay because the pinned copy is on the local node, so we will + // fate-share with the object if the local node fails. + std::shared_ptr data_buffer; + if (return_object.data().size() > 0) { + data_buffer = std::make_shared( + const_cast( + reinterpret_cast(return_object.data().data())), + return_object.data().size()); + } + std::shared_ptr metadata_buffer; + if (return_object.metadata().size() > 0) { + metadata_buffer = std::make_shared( + const_cast( + reinterpret_cast(return_object.metadata().data())), + return_object.metadata().size()); + } + + RayObject object(data_buffer, metadata_buffer, nested_refs); + if (store_in_plasma) { + put_in_local_plasma_callback_(object, object_id); + } else { + direct_return = in_memory_store_->Put(object, object_id); + } + } + + rpc::Address owner_address; + if (reference_counter_->GetOwner(object_id, &owner_address) && !nested_refs.empty()) { + std::vector nested_ids; + for (const auto &nested_ref : nested_refs) { + nested_ids.emplace_back(ObjectRefToId(nested_ref)); + } + reference_counter_->AddNestedObjectIds(object_id, nested_ids, owner_address); + } + return direct_return; +} + void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &worker_addr) { @@ -267,74 +326,49 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // reference holders that are already scheduled at the raylet can retrieve // these objects through plasma. absl::flat_hash_set store_in_plasma_ids = {}; + bool first_execution = false; { absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); RAY_CHECK(it != submissible_tasks_.end()) << "Tried to complete task that was not pending " << task_id; - if (it->second.num_successful_executions > 0) { + first_execution = it->second.num_successful_executions == 0; + if (!first_execution) { store_in_plasma_ids = it->second.reconstructable_return_ids; } } + std::vector dynamic_return_ids; + std::vector dynamic_returns_in_plasma; std::vector direct_return_ids; - for (int i = 0; i < reply.return_objects_size(); i++) { - const auto &return_object = reply.return_objects(i); - ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); - reference_counter_->UpdateObjectSize(object_id, return_object.size()); - RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " - << return_object.size(); - - const auto nested_refs = - VectorFromProtobuf(return_object.nested_inlined_refs()); - if (return_object.in_plasma()) { - // NOTE(swang): We need to add the location of the object before marking - // it as local in the in-memory store so that the data locality policy - // will choose the right raylet for any queued dependent tasks. - const auto pinned_at_raylet_id = NodeID::FromBinary(worker_addr.raylet_id()); - reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); - // Mark it as in plasma with a dummy object. - RAY_CHECK( - in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); - } else { - // NOTE(swang): If a direct object was promoted to plasma, then we do not - // record the node ID that it was pinned at, which means that we will not - // be able to reconstruct it if the plasma object copy is lost. However, - // this is okay because the pinned copy is on the local node, so we will - // fate-share with the object if the local node fails. - std::shared_ptr data_buffer; - if (return_object.data().size() > 0) { - data_buffer = std::make_shared( - const_cast( - reinterpret_cast(return_object.data().data())), - return_object.data().size()); - } - std::shared_ptr metadata_buffer; - if (return_object.metadata().size() > 0) { - metadata_buffer = std::make_shared( - const_cast( - reinterpret_cast(return_object.metadata().data())), - return_object.metadata().size()); + if (reply.dynamic_return_objects_size() > 0) { + RAY_CHECK(reply.return_objects_size() == 1) + << "Dynamic generators only supported for num_returns=1"; + const auto generator_id = ObjectID::FromBinary(reply.return_objects(0).object_id()); + for (const auto &return_object : reply.dynamic_return_objects()) { + const auto object_id = ObjectID::FromBinary(return_object.object_id()); + if (first_execution) { + reference_counter_->AddDynamicReturn(object_id, generator_id); + dynamic_return_ids.push_back(object_id); } - - RayObject object(data_buffer, metadata_buffer, nested_refs); - if (store_in_plasma_ids.count(object_id)) { - put_in_local_plasma_callback_(object, object_id); - } else { - bool stored_in_direct_memory = in_memory_store_->Put(object, object_id); - if (stored_in_direct_memory) { - direct_return_ids.push_back(object_id); + if (!HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(worker_addr.raylet_id()), + store_in_plasma_ids.count(object_id))) { + if (first_execution) { + dynamic_returns_in_plasma.push_back(object_id); } } } + } - rpc::Address owner_address; - if (reference_counter_->GetOwner(object_id, &owner_address) && !nested_refs.empty()) { - std::vector nested_ids; - for (const auto &nested_ref : nested_refs) { - nested_ids.emplace_back(ObjectRefToId(nested_ref)); - } - reference_counter_->AddNestedObjectIds(object_id, nested_ids, owner_address); + for (const auto &return_object : reply.return_objects()) { + const auto object_id = ObjectID::FromBinary(return_object.object_id()); + if (HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(worker_addr.raylet_id()), + store_in_plasma_ids.count(object_id))) { + direct_return_ids.push_back(object_id); } } @@ -348,6 +382,20 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, << "Tried to complete task that was not pending " << task_id; spec = it->second.spec; + // Record any dynamically returned objects. We need to store these with the + // task spec so that the worker will recreate them if the task gets + // re-executed. + if (first_execution) { + for (const auto &dynamic_return_id : dynamic_return_ids) { + RAY_LOG(DEBUG) << "Task " << task_id << " produced dynamic return object " + << dynamic_return_id; + spec.AddDynamicReturnId(dynamic_return_id); + } + for (const auto &dynamic_return_id : dynamic_returns_in_plasma) { + it->second.reconstructable_return_ids.insert(dynamic_return_id); + } + } + // Release the lineage for any non-plasma return objects. for (const auto &direct_return_id : direct_return_ids) { RAY_LOG(DEBUG) << "Task " << it->first << " returned direct object " @@ -562,6 +610,11 @@ void TaskManager::RemoveFinishedTaskReferences( for (size_t i = 0; i < num_returns; i++) { return_ids.push_back(spec.ReturnId(i)); } + if (spec.ReturnsDynamic()) { + for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { + return_ids.push_back(dynamic_return_id); + } + } std::vector deleted; reference_counter_->UpdateFinishedTaskReferences(return_ids, diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 5963beb790ef..83cd2906f853 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -290,6 +290,14 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Fill every task information of the current worker to GetCoreWorkerStatsReply. void FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply, const int64_t limit) const; + /// Update nested ref count info and store the in-memory value for a task's + /// return object. Returns true if the task's return object was returned + /// directly by value. + bool HandleTaskReturn(const ObjectID &object_id, + const rpc::ReturnObject &return_object, + const NodeID &worker_raylet_id, + bool store_in_plasma); + private: struct TaskEntry { TaskEntry(const TaskSpecification &spec_arg, diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index fb48f1ceeb1d..da2b7791d704 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -560,6 +560,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { RandomTaskId(), address, num_returns, + false, resources, resources, "", diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index 4968dcc73ace..ea58a8e94ba4 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -41,6 +41,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r TaskID::Nil(), empty_address, 1, + false, resources, resources, serialized_runtime_env, diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 9e5c1411b4f3..e56a8b323b60 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -744,7 +744,8 @@ class DirectActorReceiverTest : public ::testing::Test { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - std::placeholders::_4); + std::placeholders::_4, + std::placeholders::_5); receiver_ = std::make_unique( worker_context_, main_io_service_, execute_task, [] { return Status::OK(); }); receiver_->Init(std::make_shared( @@ -753,10 +754,13 @@ class DirectActorReceiverTest : public ::testing::Test { dependency_waiter_); } - Status MockExecuteTask(const TaskSpecification &task_spec, - const std::shared_ptr &resource_ids, - std::vector> *return_objects, - ReferenceCounter::ReferenceTableProto *borrowed_refs) { + Status MockExecuteTask( + const TaskSpecification &task_spec, + const std::shared_ptr &resource_ids, + std::vector>> *return_objects, + std::vector>> + *dynamic_return_objects, + ReferenceCounter::ReferenceTableProto *borrowed_refs) { return Status::OK(); } diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 753192da9c78..c375922df926 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -46,6 +46,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r TaskID::Nil(), empty_address, 1, + false, resources, resources, serialized_runtime_env, diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index ab3350400951..a8e51072615a 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -50,7 +50,7 @@ class MockWorker { options.node_manager_port = node_manager_port; options.raylet_ip_address = "127.0.0.1"; options.task_execution_callback = std::bind( - &MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9, _10); + &MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11); options.metrics_agent_port = -1; options.startup_token = startup_token; CoreWorkerProcess::Initialize(options); @@ -59,16 +59,18 @@ class MockWorker { void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } private: - Status ExecuteTask(TaskType task_type, - const std::string task_name, - const RayFunction &ray_function, - const std::unordered_map &required_resources, - const std::vector> &args, - const std::vector &arg_refs, - const std::vector &return_ids, - const std::string &debugger_breakpoint, - const std::string &serialized_retry_exception_allowlist, - std::vector> *results) { + Status ExecuteTask( + const rpc::Address &caller_address, + TaskType task_type, + const std::string task_name, + const RayFunction &ray_function, + const std::unordered_map &required_resources, + const std::vector> &args, + const std::vector &arg_refs, + const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist, + std::vector>> *returns, + std::vector>> *dynamic_returns) { // Note that this doesn't include dummy object id. const FunctionDescriptor function_descriptor = ray_function.GetFunctionDescriptor(); RAY_CHECK(function_descriptor->Type() == @@ -78,34 +80,30 @@ class MockWorker { if ("actor creation task" == typed_descriptor->ModuleName()) { return Status::OK(); } else if ("GetWorkerPid" == typed_descriptor->ModuleName()) { - // Get mock worker pid - return GetWorkerPid(results); + // Save the pid of current process to the return object. + std::string pid_string = std::to_string(static_cast(getpid())); + auto data = + const_cast(reinterpret_cast(pid_string.data())); + auto memory_buffer = + std::make_shared(data, pid_string.size(), true); + RAY_CHECK(returns->size() == 1); + (*returns)[0].second = std::make_shared( + memory_buffer, nullptr, std::vector()); + return Status::OK(); } else if ("MergeInputArgsAsOutput" == typed_descriptor->ModuleName()) { // Merge input args and write the merged content to each of return ids - return MergeInputArgsAsOutput(args, return_ids, results); + return MergeInputArgsAsOutput(args, returns); } else if ("WhileTrueLoop" == typed_descriptor->ModuleName()) { - return WhileTrueLoop(args, return_ids, results); + return WhileTrueLoop(); } else { return Status::TypeError("Unknown function descriptor: " + typed_descriptor->ModuleName()); } } - Status GetWorkerPid(std::vector> *results) { - // Save the pid of current process to the return object. - std::string pid_string = std::to_string(static_cast(getpid())); - auto data = - const_cast(reinterpret_cast(pid_string.data())); - auto memory_buffer = - std::make_shared(data, pid_string.size(), true); - results->push_back(std::make_shared( - memory_buffer, nullptr, std::vector())); - return Status::OK(); - } - - Status MergeInputArgsAsOutput(const std::vector> &args, - const std::vector &return_ids, - std::vector> *results) { + Status MergeInputArgsAsOutput( + const std::vector> &args, + std::vector>> *returns) { // Merge all the content from input args. std::vector buffer; for (const auto &arg : args) { @@ -126,17 +124,15 @@ class MockWorker { std::make_shared(buffer.data(), buffer.size(), true); // Write the merged content to each of return ids. - for (size_t i = 0; i < return_ids.size(); i++) { - results->push_back(std::make_shared( - memory_buffer, nullptr, std::vector())); + for (size_t i = 0; i < returns->size(); i++) { + (*returns)[i].second = std::make_shared( + memory_buffer, nullptr, std::vector()); } return Status::OK(); } - Status WhileTrueLoop(const std::vector> &args, - const std::vector &return_ids, - std::vector> *results) { + Status WhileTrueLoop() { while (1) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index a9884a547940..a3276a950084 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -61,6 +61,7 @@ class MockRayletClient : public PinObjectsInterface { void PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const rpc::ClientCallback &callback) override { RAY_LOG(INFO) << "PinObjectIDs " << object_ids.size(); callbacks.push_back(callback); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 89fdd94df12f..27ed85ca849c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -25,6 +25,36 @@ using namespace ray::gcs; namespace ray { namespace core { +void SerializeReturnObject(const ObjectID &object_id, + const std::shared_ptr &return_object, + rpc::ReturnObject *return_object_proto) { + return_object_proto->set_object_id(object_id.Binary()); + + if (!return_object) { + // This should only happen if the local raylet died. Caller should + // retry the task. + RAY_LOG(WARNING) << "Failed to create task return object " << object_id + << " in the object store, exiting."; + QuickExit(); + } + return_object_proto->set_size(return_object->GetSize()); + if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { + return_object_proto->set_in_plasma(true); + } else { + if (return_object->GetData() != nullptr) { + return_object_proto->set_data(return_object->GetData()->Data(), + return_object->GetData()->Size()); + } + if (return_object->GetMetadata() != nullptr) { + return_object_proto->set_metadata(return_object->GetMetadata()->Data(), + return_object->GetMetadata()->Size()); + } + } + for (const auto &nested_ref : return_object->GetNestedRefs()) { + return_object_proto->add_nested_inlined_refs()->CopyFrom(nested_ref); + } +} + void CoreWorkerDirectTaskReceiver::Init( std::shared_ptr client_pool, rpc::Address rpc_address, @@ -91,45 +121,47 @@ void CoreWorkerDirectTaskReceiver::HandleTask( } RAY_CHECK(num_returns >= 0); - std::vector> return_objects; + std::vector>> return_objects; + std::vector>> dynamic_return_objects; bool is_retryable_error = false; auto status = task_handler_(task_spec, resource_ids, &return_objects, + &dynamic_return_objects, reply->mutable_borrowed_refs(), &is_retryable_error); reply->set_is_retryable_error(is_retryable_error); bool objects_valid = return_objects.size() == num_returns; - if (objects_valid) { - for (size_t i = 0; i < return_objects.size(); i++) { - auto return_object = reply->add_return_objects(); - ObjectID id = ObjectID::FromIndex(task_spec.TaskId(), /*index=*/i + 1); - return_object->set_object_id(id.Binary()); + for (const auto &return_object : return_objects) { + if (return_object.second == NULL) { + objects_valid = false; + } + } - if (!return_objects[i]) { - // This should only happen if the local raylet died. Caller should - // retry the task. - RAY_LOG(WARNING) << "Failed to create task return object " << id - << " in the object store, exiting."; - QuickExit(); - } - const auto &result = return_objects[i]; - return_object->set_size(result->GetSize()); - if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) { - return_object->set_in_plasma(true); - } else { - if (result->GetData() != nullptr) { - return_object->set_data(result->GetData()->Data(), result->GetData()->Size()); - } - if (result->GetMetadata() != nullptr) { - return_object->set_metadata(result->GetMetadata()->Data(), - result->GetMetadata()->Size()); - } - } - for (const auto &nested_ref : result->GetNestedRefs()) { - return_object->add_nested_inlined_refs()->CopyFrom(nested_ref); + if (objects_valid) { + if (task_spec.ReturnsDynamic()) { + size_t num_dynamic_returns_expected = task_spec.DynamicReturnIds().size(); + if (num_dynamic_returns_expected > 0) { + RAY_CHECK(dynamic_return_objects.size() == num_dynamic_returns_expected) + << "Expected " << num_dynamic_returns_expected + << " dynamic returns, but task generated " << dynamic_return_objects.size(); } + } else { + RAY_CHECK(dynamic_return_objects.size() == 0) + << "Task with static num_returns returned " << dynamic_return_objects.size() + << " objects dynamically"; + } + for (const auto &dynamic_return : dynamic_return_objects) { + auto return_object_proto = reply->add_dynamic_return_objects(); + SerializeReturnObject( + dynamic_return.first, dynamic_return.second, return_object_proto); + } + for (size_t i = 0; i < return_objects.size(); i++) { + const auto &return_object = return_objects[i]; + auto return_object_proto = reply->add_return_objects(); + SerializeReturnObject( + return_object.first, return_object.second, return_object_proto); } if (task_spec.IsActorCreationTask()) { @@ -160,7 +192,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask( send_reply_callback(status, nullptr, nullptr); } } else { - RAY_CHECK(objects_valid) << return_objects.size() << " " << num_returns; + RAY_CHECK(objects_valid); send_reply_callback(status, nullptr, nullptr); } }; diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 1f8760f97849..a43f3fb0cbc8 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -49,12 +49,14 @@ namespace core { class CoreWorkerDirectTaskReceiver { public: - using TaskHandler = - std::function resource_ids, - std::vector> *return_objects, - ReferenceCounter::ReferenceTableProto *borrower_refs, - bool *is_retryable_error)>; + using TaskHandler = std::function resource_ids, + std::vector>> *return_objects, + std::vector>> + *dynamic_return_objects, + ReferenceCounter::ReferenceTableProto *borrower_refs, + bool *is_retryable_error)>; using OnTaskDone = std::function; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index a6b82d56cdd9..92f2ed7fdbb4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -257,6 +257,7 @@ struct GcsServerMocker { void PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const ray::rpc::ClientCallback &callback) override {} /// DependencyWaiterInterface diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 1041929c6cf6..6984a1930608 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -55,6 +55,7 @@ struct Mocker { TaskID::Nil(), owner_address, 1, + false, required_resources, required_placement_resources, "", diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 25d03980a5fb..4f35abb342e3 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -132,6 +132,7 @@ class IObjectDirectory { const NodeID &node_id, const rpc::Address &owner_address, const std::string &spilled_url, + const ObjectID &generator_id, const bool spilled_to_local_storage) = 0; /// Record metrics. diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index 7bf539a24a64..847e48f006e7 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -167,6 +167,7 @@ void OwnershipBasedObjectDirectory::ReportObjectSpilled( const NodeID &node_id, const rpc::Address &owner_address, const std::string &spilled_url, + const ObjectID &generator_id, const bool spilled_to_local_storage) { RAY_LOG(DEBUG) << "Sending spilled URL " << spilled_url << " for object " << object_id << " to owner " << WorkerID::FromBinary(owner_address.worker_id()); @@ -186,6 +187,9 @@ void OwnershipBasedObjectDirectory::ReportObjectSpilled( update.mutable_spilled_location_update()->set_spilled_url(spilled_url); update.mutable_spilled_location_update()->set_spilled_to_local_storage( spilled_to_local_storage); + if (!generator_id.IsNil()) { + update.set_generator_id(generator_id.Binary()); + } if (!existing_object) { location_buffers_[worker_id].first.emplace_back(object_id); } diff --git a/src/ray/object_manager/ownership_based_object_directory.h b/src/ray/object_manager/ownership_based_object_directory.h index 5ca0bd0f10d3..31ad127e1043 100644 --- a/src/ray/object_manager/ownership_based_object_directory.h +++ b/src/ray/object_manager/ownership_based_object_directory.h @@ -81,6 +81,7 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { const NodeID &node_id, const rpc::Address &owner_address, const std::string &spilled_url, + const ObjectID &generator_id, const bool spilled_to_local_storage) override; void RecordMetrics(uint64_t duration_ms) override; diff --git a/src/ray/object_manager/test/ownership_based_object_directory_test.cc b/src/ray/object_manager/test/ownership_based_object_directory_test.cc index 0b952380a2b4..e15199f0a224 100644 --- a/src/ray/object_manager/test/ownership_based_object_directory_test.cc +++ b/src/ray/object_manager/test/ownership_based_object_directory_test.cc @@ -228,8 +228,12 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBatchBasic) { auto object_info_spilled = CreateNewObjectInfo(owner_id); rpc::Address owner_address; owner_address.set_worker_id(object_info_spilled.owner_worker_id.Binary()); - obod_.ReportObjectSpilled( - object_info_spilled.object_id, current_node_id, owner_address, "url1", true); + obod_.ReportObjectSpilled(object_info_spilled.object_id, + current_node_id, + owner_address, + "url1", + ObjectID::Nil(), + true); rpc::ObjectLocationUpdate update = owner_client->buffered_object_locations_.at(object_info_spilled.owner_worker_id) .at(object_info_spilled.object_id); @@ -279,8 +283,12 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBufferedUpdate) { obod_.ReportObjectRemoved(object_info.object_id, current_node_id, object_info); rpc::Address owner_address; owner_address.set_worker_id(object_info.owner_worker_id.Binary()); - obod_.ReportObjectSpilled( - object_info.object_id, current_node_id, owner_address, "url1", true); + obod_.ReportObjectSpilled(object_info.object_id, + current_node_id, + owner_address, + "url1", + ObjectID::Nil(), + true); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchReplied(), 1); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 186aeffaac2e..c14a294994ca 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -330,6 +330,15 @@ message TaskSpec { // A count of the number of times this task has been attempted so far. 0 // means this is the first execution. uint64 attempt_number = 29; + // This task returns a dynamic number of objects. + bool returns_dynamic = 30; + // A list of ObjectIDs that were created by this task but that should be + // owned by the task's caller. The task should return the corresponding + // ObjectRefs in its actual return value. + // NOTE(swang): This should only be set when the attempt number > 0. On the + // first execution, we do not yet know whether the task has dynamic return + // objects. + repeated bytes dynamic_return_ids = 31; } message TaskInfoEntry { diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 46d3acc6b5b9..fadf36c71876 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -108,6 +108,12 @@ message PushTaskRequest { message PushTaskReply { // The returned objects. repeated ReturnObject return_objects = 1; + // Dynamically created objects. These are objects whose refs were allocated + // by the task at run time instead of by the task caller at f.remote() time. + // We need to notify the task caller that they own these objects. The + // language-level ObjectRefs should be returned inside one of the statically + // allocated return objects. + repeated ReturnObject dynamic_return_objects = 2; // Set to true if the worker will be exiting. bool worker_exiting = 3; // The references that the worker borrowed during the task execution. A @@ -210,6 +216,11 @@ message ObjectLocationUpdate { optional ObjectPlasmaLocationUpdate plasma_location_update = 2; // When it's set, it contains where the object is spilled to. optional ObjectSpilledLocationUpdate spilled_location_update = 3; + // When it's set, this means that it was a dynamically created ObjectID, so + // we need to notify the owner of the outer ObjectID, which should be owned + // by the same worker. If the outer ObjectID is still in scope, then the + // owner can add the dynamically created ObjectID to its ref count. + optional bytes generator_id = 4; } message GetObjectLocationsOwnerRequest { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 13d10f1eda7b..68184dde7a8f 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -167,6 +167,13 @@ message PinObjectIDsRequest { Address owner_address = 1; // ObjectIDs to pin. repeated bytes object_ids = 2; + // For object IDs that were generated dynamically during task execution. The + // owner learns of these ObjectRefs at task execution time instead of at + // ray.put() or f.remote() time. This is the outer ObjectID that the + // dynamically generated ObjectRefs are stored inside. We need this information + // so that we can notify the owner about the nested ObjectRefs, in case the + // owner does not know about them yet. + optional bytes generator_id = 3; } message PinObjectIDsReply { diff --git a/src/ray/protobuf/pubsub.proto b/src/ray/protobuf/pubsub.proto index 3479d5407137..a1fc55bdacad 100644 --- a/src/ray/protobuf/pubsub.proto +++ b/src/ray/protobuf/pubsub.proto @@ -169,6 +169,13 @@ message WorkerObjectEvictionSubMessage { bytes object_id = 2; // Address of the subscriber. Address subscriber_address = 3; + // For object IDs that were generated dynamically during task execution. The + // owner learns of these ObjectRefs at task execution time instead of at + // ray.put() or f.remote() time. This is the outer ObjectID that the + // dynamically generated ObjectRefs are stored inside. We need this information + // so that we can notify the owner about the nested ObjectRefs, in case the + // owner does not know about them yet. + optional bytes generator_id = 4; } message WorkerRefRemovedSubMessage { diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 946ed7a970ba..3bb8c39d5cb8 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -25,7 +25,8 @@ namespace raylet { void LocalObjectManager::PinObjectsAndWaitForFree( const std::vector &object_ids, std::vector> &&objects, - const rpc::Address &owner_address) { + const rpc::Address &owner_address, + const ObjectID &generator_id) { for (size_t i = 0; i < object_ids.size(); i++) { const auto &object_id = object_ids[i]; auto &object = objects[i]; @@ -36,7 +37,7 @@ void LocalObjectManager::PinObjectsAndWaitForFree( } const auto inserted = - local_objects_.emplace(object_id, std::make_pair<>(owner_address, false)); + local_objects_.emplace(object_id, LocalObjectInfo(owner_address, generator_id)); if (inserted.second) { // This is the first time we're pinning this object. RAY_LOG(DEBUG) << "Pinning object " << object_id; @@ -44,7 +45,7 @@ void LocalObjectManager::PinObjectsAndWaitForFree( pinned_objects_.emplace(object_id, std::move(object)); } else { auto original_worker_id = - WorkerID::FromBinary(inserted.first->second.first.worker_id()); + WorkerID::FromBinary(inserted.first->second.owner_address.worker_id()); auto new_worker_id = WorkerID::FromBinary(owner_address.worker_id()); if (original_worker_id != new_worker_id) { // TODO(swang): Handle this case. We should use the new owner address @@ -61,6 +62,9 @@ void LocalObjectManager::PinObjectsAndWaitForFree( auto wait_request = std::make_unique(); wait_request->set_object_id(object_id.Binary()); wait_request->set_intended_worker_id(owner_address.worker_id()); + if (!generator_id.IsNil()) { + wait_request->set_generator_id(generator_id.Binary()); + } rpc::Address subscriber_address; subscriber_address.set_raylet_id(self_node_id_.Binary()); subscriber_address.set_ip_address(self_node_address_); @@ -101,14 +105,14 @@ void LocalObjectManager::PinObjectsAndWaitForFree( void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) { // Only free the object if it is not already freed. auto it = local_objects_.find(object_id); - if (it == local_objects_.end() || it->second.second) { + if (it == local_objects_.end() || it->second.is_freed) { return; } // Mark the object as freed. NOTE(swang): We have to mark this instead of // deleting the entry immediately in case the object is currently being // spilled. In that case, we should process the free event once the object // spill is complete. - it->second.second = true; + it->second.is_freed = true; RAY_LOG(DEBUG) << "Unpinning object " << object_id; // The object should be in one of these states: pinned, spilling, or spilled. @@ -307,13 +311,13 @@ void LocalObjectManager::SpillObjectsInternal( RAY_CHECK(it != objects_pending_spill_.end()); auto freed_it = local_objects_.find(object_id); // If the object hasn't already been freed, spill it. - if (freed_it == local_objects_.end() || freed_it->second.second) { + if (freed_it == local_objects_.end() || freed_it->second.is_freed) { num_bytes_pending_spill_ -= it->second->GetSize(); objects_pending_spill_.erase(it); } else { auto ref = request.add_object_refs_to_spill(); ref->set_object_id(object_id.Binary()); - ref->mutable_owner_address()->CopyFrom(freed_it->second.first); + ref->mutable_owner_address()->CopyFrom(freed_it->second.owner_address); RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; requested_objects_to_spill.push_back(object_id); } @@ -407,15 +411,20 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids // Asynchronously Update the spilled URL. auto freed_it = local_objects_.find(object_id); - if (freed_it == local_objects_.end() || freed_it->second.second) { + if (freed_it == local_objects_.end() || freed_it->second.is_freed) { RAY_LOG(DEBUG) << "Spilled object already freed, skipping send of spilled URL to " "object directory for object " << object_id; continue; } - const auto &worker_addr = freed_it->second.first; + const auto &worker_addr = freed_it->second.owner_address; object_directory_->ReportObjectSpilled( - object_id, self_node_id_, worker_addr, object_url, is_external_storage_type_fs_); + object_id, + self_node_id_, + worker_addr, + object_url, + freed_it->second.generator_id.value_or(ObjectID::Nil()), + is_external_storage_type_fs_); } } diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 8e71c7af751a..c1e2c4dfcd3d 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -82,9 +82,15 @@ class LocalObjectManager { /// \param objects Pointers to the objects to be pinned. The pointer should /// be kept in scope until the object can be released. /// \param owner_address The owner of the objects to be pinned. + /// \param generator_id When it's set, this means that it was a dynamically + /// created ObjectID, so we need to notify the owner of the outer ObjectID + /// that should already be owned by the same worker. If the outer ObjectID is + /// still in scope, then the owner can add the dynamically created ObjectID + /// to its ref count. Set to nil for statically allocated ObjectIDs. void PinObjectsAndWaitForFree(const std::vector &object_ids, std::vector> &&objects, - const rpc::Address &owner_address); + const rpc::Address &owner_address, + const ObjectID &generator_id = ObjectID::Nil()); /// Spill objects as much as possible as fast as possible up to the max throughput. /// @@ -161,6 +167,16 @@ class LocalObjectManager { std::string DebugString() const; private: + struct LocalObjectInfo { + LocalObjectInfo(const rpc::Address &owner_address, const ObjectID &generator_id) + : owner_address(owner_address), + generator_id(generator_id.IsNil() ? std::nullopt + : std::optional(generator_id)) {} + rpc::Address owner_address; + bool is_freed = false; + const std::optional generator_id; + }; + FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSizeZero); FRIEND_TEST(LocalObjectManagerTest, TestSpillUptoMaxFuseCount); FRIEND_TEST(LocalObjectManagerTest, @@ -219,14 +235,14 @@ class LocalObjectManager { /// A callback to call when an object has been freed. std::function &)> on_objects_freed_; - /// Hashmap from local objects that we are waiting to free to a tuple of - /// (their owner address, whether the object has been freed). + /// Hashmap from local objects that we are waiting to free to metadata about + /// the object including their owner address. /// All objects in this hashmap should also be in exactly one of the /// following maps: /// - pinned_objects_: objects pinned in shared memory /// - objects_pending_spill_: objects pinned and waiting for spill to complete /// - spilled_objects_url_: objects already spilled - absl::flat_hash_map> local_objects_; + absl::flat_hash_map local_objects_; // Objects that are pinned on this node. absl::flat_hash_map> pinned_objects_; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 016d98272f43..c76f2a239818 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2468,7 +2468,6 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, rpc::SendReplyCallback send_reply_callback) { std::vector object_ids; object_ids.reserve(request.object_ids_size()); - const auto &owner_address = request.owner_address(); for (const auto &object_id_binary : request.object_ids()) { object_ids.push_back(ObjectID::FromBinary(object_id_binary)); } @@ -2496,8 +2495,11 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, } } // Wait for the object to be freed by the owner, which keeps the ref count. + ObjectID generator_id = request.has_generator_id() + ? ObjectID::FromBinary(request.generator_id()) + : ObjectID::Nil(); local_object_manager_.PinObjectsAndWaitForFree( - object_ids, std::move(results), owner_address); + object_ids, std::move(results), request.owner_address(), generator_id); } RAY_CHECK_EQ(request.object_ids_size(), reply->successes_size()); send_reply_callback(Status::OK(), nullptr, nullptr); diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 71ae5a6e0bf1..189e42d853f4 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -162,6 +162,7 @@ RayTask CreateTask( TaskID::Nil(), address, 0, + /*returns_dynamic=*/false, required_resources, {}, "", diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 536cedc32571..bc4ae255d74d 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -469,12 +469,16 @@ void raylet::RayletClient::ReleaseUnusedBundles( void raylet::RayletClient::PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const rpc::ClientCallback &callback) { rpc::PinObjectIDsRequest request; request.mutable_owner_address()->CopyFrom(caller_address); for (const ObjectID &object_id : object_ids) { request.add_object_ids(object_id.Binary()); } + if (!generator_id.IsNil()) { + request.set_generator_id(generator_id.Binary()); + } pins_in_flight_++; auto rpc_callback = [this, callback = std::move(callback)]( Status status, const rpc::PinObjectIDsReply &reply) { diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 0d8eea2c5591..d879771fba3d 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -53,6 +53,7 @@ class PinObjectsInterface { virtual void PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const ray::rpc::ClientCallback &callback) = 0; virtual ~PinObjectsInterface(){}; @@ -448,6 +449,7 @@ class RayletClient : public RayletClientInterface { void PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, + const ObjectID &generator_id, const ray::rpc::ClientCallback &callback) override; void ShutdownRaylet(