Skip to content

Commit

Permalink
Revert "Revert "[core] Support generators for tasks with multiple ret…
Browse files Browse the repository at this point in the history
…urn values (ray-project#25247)" (ray-project#25380)"

This reverts commit 80168a0.
  • Loading branch information
stephanie-wang committed Jun 1, 2022
1 parent cb151d5 commit 74408f1
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 4 deletions.
17 changes: 16 additions & 1 deletion doc/source/ray-core/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,24 @@ Multiple returns
@ray.remote(num_returns=3)
def return_multiple():
return 1, 2, 3
return 0, 1, 2
a, b, c = return_multiple.remote()
For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. See the :ref:`user guide <generators>` for more details on use cases.

.. code-block:: python
@ray.remote(num_returns=3)
def return_multiple_as_generator():
for i in range(3):
yield i
# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()
.. tabbed:: Java

Java remote functions doesn't support returning multiple objects.
Expand All @@ -189,6 +203,7 @@ Multiple returns

C++ remote functions doesn't support returning multiple objects.


Cancelling tasks
----------------

Expand Down
96 changes: 96 additions & 0 deletions doc/source/ray-core/tasks/patterns/generators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
.. _generators:

Pattern: Using generators to reduce heap memory usage
=====================================================

In this pattern, we use **generators** in Python to reduce the total heap memory usage during a task. The key idea is that for tasks that return multiple objects, we can return them one at a time instead of all at once. This allows a worker to free the heap memory used by a previous return value before returning the next one.

Example use case
----------------

You have a task that returns multiple large values. Another possibility is a task that returns a single large value, but you want to stream this value through Ray's object store by breaking it up into smaller chunks.

Using normal Python functions, we can write such a task like this. Here's an example that returns numpy arrays of size 100MB each:

.. code-block:: python
import numpy as np
@ray.remote
def large_values(num_returns):
return [
np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
for _ in range(num_returns)
]
However, this will require the task to hold all ``num_returns`` arrays in heap memory at the same time at the end of the task. If there are many return values, this can lead to high heap memory usage and potentially an out-of-memory error.

We can fix the above example by rewriting ``large_values`` as a **generator**. Instead of returning all values at once as a tuple or list, we can ``yield`` one value at a time.

.. code-block:: python
@ray.remote
def large_values_generator(num_returns):
for _ in range(num_returns):
yield np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
Code example
------------

.. code-block:: python
import numpy as np
import ray
@ray.remote(max_retries=0)
def large_values(num_returns):
return [
np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
for _ in range(num_returns)
]
@ray.remote(max_retries=0)
def large_values_generator(num_returns):
for i in range(num_returns):
yield np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
print(f"yielded return value {i}")
num_returns = 100
# Worker will likely OOM using normal returns.
print("Using normal functions...")
try:
ray.get(large_values.options(num_returns=num_returns).remote(num_returns)[0])
except ray.exceptions.WorkerCrashedError:
print("Worker failed with normal function")
# Using a generator will allow the worker to finish.
# Note that this will block until the full task is complete, i.e. the
# last yield finishes.
print("Using generators...")
ray.get(
large_values_generator.options(num_returns=num_returns).remote(num_returns)[0]
)
print("Success!")
.. code-block:: text
$ RAY_IGNORE_UNHANDLED_ERRORS=1 python test.py
Using normal functions...
... -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker...
Worker failed
Using generators...
(large_values_generator pid=373609) yielded return value 0
(large_values_generator pid=373609) yielded return value 1
(large_values_generator pid=373609) yielded return value 2
...
Success!
1 change: 1 addition & 0 deletions doc/source/ray-core/tasks/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ You may also be interested in visiting the design patterns section for :ref:`act
tree-of-tasks
map-reduce
limit-tasks
generators
closure-capture
fine-grained-tasks
global-variables
Expand Down
23 changes: 20 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import threading
import time
import traceback
import _thread
import typing

from libc.stdint cimport (
int32_t,
Expand Down Expand Up @@ -699,6 +700,8 @@ cdef execute_task(
exc_info=True)
raise e
if c_return_ids.size() == 1:
# If there is only one return specified, we should return
# all return values as a single object.
outputs = (outputs,)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
# Record actor repr via :actor_name: magic token in the log.
Expand All @@ -720,11 +723,14 @@ cdef execute_task(
task_exception = True
raise TaskCancelledError(
core_worker.get_current_task_id())

if (c_return_ids.size() > 0 and
not inspect.isgenerator(outputs) and
len(outputs) != int(c_return_ids.size())):
raise ValueError(
"Task returned {} objects, but num_returns={}.".format(
len(outputs), c_return_ids.size()))

# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
core_worker.store_task_outputs(
Expand Down Expand Up @@ -2029,11 +2035,16 @@ cdef class CoreWorker:
if return_ids.size() == 0:
return

n_returns = len(outputs)
n_returns = return_ids.size()
returns.resize(n_returns)
task_output_inlined_bytes = 0
for i in range(n_returns):
return_id, output = return_ids[i], outputs[i]
for i, output in enumerate(outputs):
if i >= n_returns:
raise ValueError(
"Task returned more than num_returns={} objects.".format(
n_returns))

return_id = return_ids[i]
context = worker.get_serialization_context()
serialized_object = context.serialize(output)
data_size = serialized_object.total_bytes
Expand Down Expand Up @@ -2062,6 +2073,12 @@ cdef class CoreWorker:
contained_id, &task_output_inlined_bytes,
&returns[0][i])

i += 1
if i < n_returns:
raise ValueError(
"Task returned {} objects, but num_returns={}.".format(
i, n_returns))

cdef c_function_descriptors_to_python(
self,
const c_vector[CFunctionDescriptor] &c_function_descriptors):
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ py_test_module_list(
"test_basic_5.py",
"test_cancel.py",
"test_gcs_fault_tolerance.py",
"test_generators.py",
"test_metrics_agent.py",
"test_component_failures_2.py",
"test_component_failures_3.py",
Expand Down
98 changes: 98 additions & 0 deletions python/ray/tests/test_generators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import pytest
import numpy as np
import sys

import ray


def test_generator_oom(ray_start_regular):
@ray.remote(max_retries=0)
def large_values(num_returns):
return [
np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
for _ in range(num_returns)
]

@ray.remote(max_retries=0)
def large_values_generator(num_returns):
for _ in range(num_returns):
yield np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)

num_returns = 100
try:
# Worker may OOM using normal returns.
ray.get(large_values.options(num_returns=num_returns).remote(num_returns)[0])
except ray.exceptions.WorkerCrashedError:
pass

# Using a generator will allow the worker to finish.
ray.get(
large_values_generator.options(num_returns=num_returns).remote(num_returns)[0]
)


@pytest.mark.parametrize("use_actors", [False, True])
def test_generator_returns(ray_start_regular, use_actors):
remote_generator_fn = None
if use_actors:

@ray.remote
class Generator:
def __init__(self):
pass

def generator(self, num_returns):
for i in range(num_returns):
yield i

g = Generator.remote()
remote_generator_fn = g.generator
else:

@ray.remote(max_retries=0)
def generator(num_returns):
for i in range(num_returns):
yield i

remote_generator_fn = generator

# Check cases when num_returns does not match the number of values returned
# by the generator.
num_returns = 3

try:
ray.get(
remote_generator_fn.options(num_returns=num_returns).remote(num_returns - 1)
)
assert False
except ray.exceptions.RayTaskError as e:
assert isinstance(e.as_instanceof_cause(), ValueError)

try:
ray.get(
remote_generator_fn.options(num_returns=num_returns).remote(num_returns + 1)
)
assert False
except ray.exceptions.RayTaskError as e:
assert isinstance(e.as_instanceof_cause(), ValueError)

# Check num_returns=1 case, should receive TypeError because generator
# cannot be pickled.
try:
ray.get(remote_generator_fn.remote(num_returns))
assert False
except ray.exceptions.RayTaskError as e:
assert isinstance(e.as_instanceof_cause(), TypeError)

# Check return values.
ray.get(
remote_generator_fn.options(num_returns=num_returns).remote(num_returns)
) == list(range(num_returns))


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 74408f1

Please sign in to comment.