Skip to content

Commit

Permalink
[core] Support generators to allow tasks to return a dynamic number o…
Browse files Browse the repository at this point in the history
…f objects (#28291)

This adds support for tasks that need to return a dynamic number of objects. When a remote generator function is invoked and num_returns for the task is 1, the worker will dynamically allocate ray.put IDs for these objects and store an ObjectRefGenerator as its return value. This allows the worker to choose how many objects to return and to keep heap memory low, since it does not need to keep all objects in memory simultaneously.

Unlike normal ray.put(), we assign the task caller as the owner of the object. This is to improve fault tolerance, as the owner can recover dynamically generated objects through the normal lineage reconstruction codepath.

The main complication has to do with notifying the task caller that it owns these objects. We do this in two places, which is necessary because the protocols are asynchronous, so either message can arrive first.

    When the task reply is received.
    When the primary raylet subscribes to the eviction notice from the owner.
    To register the dynamic return, the owner adds the ObjectRef to the ref counter and marks that it is contained in the generator object.

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: Clark Zinzow <[email protected]>
Co-authored-by: Eric Liang <[email protected]>
  • Loading branch information
3 people authored Sep 21, 2022
1 parent 014279b commit 45d7cd2
Show file tree
Hide file tree
Showing 58 changed files with 1,323 additions and 284 deletions.
1 change: 1 addition & 0 deletions cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
local_mode_ray_tuntime_.GetCurrentTaskId(),
address,
1,
/*returns_dynamic=*/false,
required_resources,
required_placement_resources,
"",
Expand Down
15 changes: 9 additions & 6 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
}

Status TaskExecutor::ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args_buffer,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *returns,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down Expand Up @@ -209,11 +210,10 @@ Status TaskExecutor::ExecuteTask(
data = std::make_shared<msgpack::sbuffer>(std::move(buf));
}

results->resize(return_ids.size(), nullptr);
if (task_type != ray::TaskType::ACTOR_CREATION_TASK) {
size_t data_size = data->size();
auto &result_id = return_ids[0];
auto result_ptr = &(*results)[0];
auto &result_id = (*returns)[0].first;
auto result_ptr = &(*returns)[0].second;
int64_t task_output_inlined_bytes = 0;

if (cross_lang && meta_buffer == nullptr) {
Expand Down Expand Up @@ -250,7 +250,10 @@ Status TaskExecutor::ExecuteTask(
}
}

RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(
result_id,
result,
/*generator_id=*/ObjectID::Nil()));
} else {
if (!status.ok()) {
return ray::Status::CreationTaskError("");
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ class TaskExecutor {
absl::Mutex &actor_contexts_mutex);

static Status ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *returns,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down
71 changes: 71 additions & 0 deletions doc/source/ray-core/doc_code/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# __program_start__
import ray
from ray import ObjectRefGenerator

# fmt: off
# __dynamic_generator_start__
import numpy as np


@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
while len(array) > 0:
yield array[:chunk_size]
array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000

# Returns an ObjectRef[ObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.ObjectRefGenerator object at 0x7f7e2116b290>
for i, ref in enumerate(ref_generator):
# Each ObjectRefGenerator iteration returns an ObjectRef.
assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.

# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref
# __dynamic_generator_end__
# fmt: on


# fmt: off
# __dynamic_generator_pass_start__
@ray.remote
def get_size(ref_generator : ObjectRefGenerator):
print(ref_generator)
num_elements = 0
for ref in ref_generator:
array = ray.get(ref)
assert len(array) <= block_size
num_elements += len(array)
return num_elements


# Returns an ObjectRef[ObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184) <ray._raylet.ObjectRefGenerator object at 0x7f81c4250ad0>

# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184) <ray._raylet.ObjectRefGenerator object at 0x7f81c4251b50>
# __dynamic_generator_pass_end__
# fmt: on
2 changes: 1 addition & 1 deletion doc/source/ray-core/patterns/generators.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. _generators:
.. _generator-pattern:

Pattern: Using generators to reduce heap memory usage
=====================================================
Expand Down
4 changes: 3 additions & 1 deletion doc/source/ray-core/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ By default, a Ray task only returns a single Object Ref. However, you can config

.. literalinclude:: doc_code/tasks_multiple_returns.py

For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. See the :ref:`user guide <generators>` for more details on use cases.

For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. Ray also supports an option to set the number of return values dynamically, which can be useful when the task caller does not know how many return values to expect. See the :ref:`user guide <generators>` for more details on use cases.

.. tabbed:: Python

Expand Down Expand Up @@ -213,6 +214,7 @@ More about Ray Tasks
tasks/resources.rst
tasks/using-ray-with-gpus.rst
tasks/nested-tasks.rst
tasks/generators.rst
tasks/fault-tolerance.rst
tasks/scheduling.rst
tasks/patterns/index.rst
86 changes: 86 additions & 0 deletions doc/source/ray-core/tasks/generators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
.. _generators:

Generators
==========

Python generators are functions that behave like an iterator, yielding one
value per iteration. Ray supports remote generators for two use cases:

1. To reduce max heap memory usage when returning multiple values from a remote
function. See the :ref:`design pattern guide <generator-pattern>` for an
example.
2. When the number of return values is set dynamically by the remote function
instead of by the caller.

Remote generators can be used in both actor and non-actor tasks.

`num_returns` set by the task caller
------------------------------------

Where possible, the caller should set the remote function's number of return values using ``@ray.remote(num_returns=x)`` or ``foo.options(num_returns=x).remote()``.
Ray will return this many ``ObjectRefs`` to the caller.
The remote task should then return the same number of values, usually as a tuple or list.
Compared to setting the number of return values dynamically, this adds less complexity to user code and less performance overhead, as Ray will know exactly how many ``ObjectRefs`` to return to the caller ahead of time.

Without changing the caller's syntax, we can also use a remote generator function to return the values iteratively.
The generator should return the same number of return values specified by the caller, and these will be stored one at a time in Ray's object store.
An error will be returned for generators that return a different number of values from the one specified by the caller.

For example, we can swap the following code that returns a list of return values:

.. literalinclude:: ../doc_code/pattern_generators.py
:language: python
:start-after: __large_values_start__
:end-before: __large_values_end__

for this code, which uses a generator function:

.. literalinclude:: ../doc_code/pattern_generators.py
:language: python
:start-after: __large_values_generator_start__
:end-before: __large_values_generator_end__

The advantage of doing so is that the generator function does not need to hold all of its return values in memory at once.
It can return the arrays one at a time to reduce memory pressure.

`num_returns` set by the task executor
--------------------------------------

In some cases, the caller may not know the number of return values to expect from a remote function.
For example, suppose we want to write a task that breaks up its argument into equal-size chunks and returns these.
We may not know the size of the argument until we execute the task, so we don't know the number of return values to expect.

In these cases, we can use a remote generator function that returns a *dynamic* number of values.
To use this feature, set ``num_returns="dynamic"`` in the ``@ray.remote`` decorator or the remote function's ``.options()``.
Then, when invoking the remote function, Ray will return a *single* ``ObjectRef`` that will get populated with an ``ObjectRefGenerator`` when the task completes.
The ``ObjectRefGenerator`` can be used to iterate over a list of ``ObjectRefs`` containing the actual values returned by the task.

.. note:: ``num_returns="dynamic"`` is currently an experimental API in v2.1+.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_start__
:end-before: __dynamic_generator_end__

We can also pass the ``ObjectRef`` returned by a task with ``num_returns="dynamic"`` to another task. The task will receive the ``ObjectRefGenerator``, which it can use to iterate over the task's return values. Similarly, you can also pass an ``ObjectRefGenerator`` as a task argument.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_pass_start__
:end-before: __dynamic_generator_pass_end__

Limitations
-----------

Although a generator function creates ``ObjectRefs`` one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list.

``num_returns="dynamic"`` is not yet supported for actor tasks.

If a generator function raises an exception before yielding all its values, all values returned by the generator will be replaced by the exception traceback, including values that were already successfully yielded.
If the task was called with ``num_returns="dynamic"``, the exception will be stored in the ``ObjectRef`` returned by the task instead of the usual ``ObjectRefGenerator``.

Note that there is currently a known bug where exceptions will not be propagated for generators that yield objects in Ray's shared-memory object store before erroring. In this case, these objects will still be accessible through the returned ``ObjectRefs`` and you may see an error like the following:

.. code-block:: text
$ ERROR worker.py:754 -- Generator threw exception after returning partial values in the object store, error may be unhandled.
2 changes: 2 additions & 0 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def _configure_system():
FunctionID,
ObjectID,
ObjectRef,
ObjectRefGenerator,
TaskID,
UniqueID,
Language,
Expand Down Expand Up @@ -248,6 +249,7 @@ def __getattr__(self, attr):
"FunctionID",
"ObjectID",
"ObjectRef",
"ObjectRefGenerator",
"TaskID",
"UniqueID",
"PlacementGroupID",
Expand Down
8 changes: 7 additions & 1 deletion python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ def issubclass_safe(obj: Any, cls_: type) -> bool:
),
# override "_common_options"
"num_cpus": _resource_option("num_cpus", default_value=1),
"num_returns": _counting_option("num_returns", False, default_value=1),
"num_returns": Option(
(int, str, type(None)),
lambda x: x is None or x == "dynamic" or x >= 0,
"The keyword 'num_returns' only accepts None, a non-negative integer, or "
'"dynamic" (for generators)',
default_value=1,
),
"object_store_memory": Option( # override "_common_options"
(int, type(None)),
lambda x: x is None,
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray._raylet import (
MessagePackSerializedObject,
MessagePackSerializer,
ObjectRefGenerator,
Pickle5SerializedObject,
Pickle5Writer,
RawSerializedObject,
Expand Down Expand Up @@ -121,6 +122,14 @@ def object_ref_reducer(obj):
)

self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer)

def object_ref_generator_reducer(obj):
return ObjectRefGenerator, (obj._refs,)

self._register_cloudpickle_reducer(
ObjectRefGenerator, object_ref_generator_reducer
)

serialization_addons.apply(self)

def _register_cloudpickle_reducer(self, cls, reducer):
Expand Down
10 changes: 6 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2266,8 +2266,7 @@ def get(

if not isinstance(object_refs, list):
raise ValueError(
"'object_refs' must either be an object ref "
"or a list of object refs."
"'object_refs' must either be an ObjectRef or a list of ObjectRefs."
)

# TODO(ujvl): Consider how to allow user to retrieve the ready objects.
Expand Down Expand Up @@ -2888,8 +2887,11 @@ def remote(
Args:
num_returns: This is only for *remote functions*. It specifies
the number of object refs returned by
the remote function invocation.
the number of object refs returned by the remote function
invocation. Pass "dynamic" to allow the task to decide how many
return values to return during execution, and the caller will
receive an ObjectRef[ObjectRefGenerator] (note, this setting is
experimental).
num_cpus: The quantity of CPU cores to reserve
for this task or for the lifetime of the actor.
num_gpus: The quantity of GPUs to reserve
Expand Down
21 changes: 17 additions & 4 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ from libc.stdint cimport (
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_map cimport unordered_map
from libcpp.memory cimport (
shared_ptr,
unique_ptr
)
from libcpp.pair cimport pair as c_pair
from libcpp.utility cimport pair
from ray.includes.optional cimport (
optional,
nullopt,
make_optional,
)

from ray.includes.common cimport (
CBuffer,
CRayObject,
Expand Down Expand Up @@ -123,13 +132,17 @@ cdef class CoreWorker:
c_bool inline_small_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object, const CObjectID &return_id, size_t
data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
self, serialized_object,
const CObjectID &return_id,
const CObjectID &generator_id,
size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
&contained_id, int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_ptr)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
self,
worker, outputs,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
cdef c_function_descriptors_to_python(
Expand Down
Loading

0 comments on commit 45d7cd2

Please sign in to comment.