diff --git a/doc/source/ray-core/tasks.rst b/doc/source/ray-core/tasks.rst index 6d6651d79901..4f4bb1b8d66f 100644 --- a/doc/source/ray-core/tasks.rst +++ b/doc/source/ray-core/tasks.rst @@ -177,10 +177,24 @@ Multiple returns @ray.remote(num_returns=3) def return_multiple(): - return 1, 2, 3 + return 0, 1, 2 a, b, c = return_multiple.remote() + For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. See the :ref:`user guide ` for more details on use cases. + + .. code-block:: python + + @ray.remote(num_returns=3) + def return_multiple_as_generator(): + for i in range(3): + yield i + + # NOTE: Similar to normal functions, these objects will not be available + # until the full task is complete and all returns have been generated. + a, b, c = return_multiple_as_generator.remote() + + .. tabbed:: Java Java remote functions doesn't support returning multiple objects. @@ -189,6 +203,7 @@ Multiple returns C++ remote functions doesn't support returning multiple objects. + Cancelling tasks ---------------- diff --git a/doc/source/ray-core/tasks/patterns/generators.rst b/doc/source/ray-core/tasks/patterns/generators.rst new file mode 100644 index 000000000000..1c6bfb5a8f5f --- /dev/null +++ b/doc/source/ray-core/tasks/patterns/generators.rst @@ -0,0 +1,96 @@ +.. _generators: + +Pattern: Using generators to reduce heap memory usage +===================================================== + +In this pattern, we use **generators** in Python to reduce the total heap memory usage during a task. The key idea is that for tasks that return multiple objects, we can return them one at a time instead of all at once. This allows a worker to free the heap memory used by a previous return value before returning the next one. + +Example use case +---------------- + +You have a task that returns multiple large values. Another possibility is a task that returns a single large value, but you want to stream this value through Ray's object store by breaking it up into smaller chunks. + +Using normal Python functions, we can write such a task like this. Here's an example that returns numpy arrays of size 100MB each: + +.. code-block:: python + + import numpy as np + + @ray.remote + def large_values(num_returns): + return [ + np.random.randint( + np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8 + ) + for _ in range(num_returns) + ] + +However, this will require the task to hold all ``num_returns`` arrays in heap memory at the same time at the end of the task. If there are many return values, this can lead to high heap memory usage and potentially an out-of-memory error. + +We can fix the above example by rewriting ``large_values`` as a **generator**. Instead of returning all values at once as a tuple or list, we can ``yield`` one value at a time. + +.. code-block:: python + + @ray.remote + def large_values_generator(num_returns): + for _ in range(num_returns): + yield np.random.randint( + np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8 + ) + +Code example +------------ + +.. code-block:: python + + import numpy as np + import ray + + @ray.remote(max_retries=0) + def large_values(num_returns): + return [ + np.random.randint( + np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8 + ) + for _ in range(num_returns) + ] + + @ray.remote(max_retries=0) + def large_values_generator(num_returns): + for i in range(num_returns): + yield np.random.randint( + np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8 + ) + print(f"yielded return value {i}") + + num_returns = 100 + # Worker will likely OOM using normal returns. + print("Using normal functions...") + try: + ray.get(large_values.options(num_returns=num_returns).remote(num_returns)[0]) + except ray.exceptions.WorkerCrashedError: + print("Worker failed with normal function") + + # Using a generator will allow the worker to finish. + # Note that this will block until the full task is complete, i.e. the + # last yield finishes. + print("Using generators...") + ray.get( + large_values_generator.options(num_returns=num_returns).remote(num_returns)[0] + ) + print("Success!") + +.. code-block:: text + + $ RAY_IGNORE_UNHANDLED_ERRORS=1 python test.py + + Using normal functions... + ... -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker... + Worker failed + Using generators... + (large_values_generator pid=373609) yielded return value 0 + (large_values_generator pid=373609) yielded return value 1 + (large_values_generator pid=373609) yielded return value 2 + ... + Success! + diff --git a/doc/source/ray-core/tasks/patterns/index.rst b/doc/source/ray-core/tasks/patterns/index.rst index 97fdefae2c1c..6abe8320930c 100644 --- a/doc/source/ray-core/tasks/patterns/index.rst +++ b/doc/source/ray-core/tasks/patterns/index.rst @@ -16,6 +16,7 @@ You may also be interested in visiting the design patterns section for :ref:`act tree-of-tasks map-reduce limit-tasks + generators closure-capture fine-grained-tasks global-variables diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1e2ad85fa433..4b505467c489 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -19,6 +19,7 @@ import threading import time import traceback import _thread +import typing from libc.stdint cimport ( int32_t, @@ -699,6 +700,8 @@ cdef execute_task( exc_info=True) raise e if c_return_ids.size() == 1: + # If there is only one return specified, we should return + # all return values as a single object. outputs = (outputs,) if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): # Record actor repr via :actor_name: magic token in the log. @@ -720,11 +723,14 @@ cdef execute_task( task_exception = True raise TaskCancelledError( core_worker.get_current_task_id()) + if (c_return_ids.size() > 0 and + not inspect.isgenerator(outputs) and len(outputs) != int(c_return_ids.size())): raise ValueError( "Task returned {} objects, but num_returns={}.".format( len(outputs), c_return_ids.size())) + # Store the outputs in the object store. with core_worker.profile_event(b"task:store_outputs"): core_worker.store_task_outputs( @@ -2029,11 +2035,16 @@ cdef class CoreWorker: if return_ids.size() == 0: return - n_returns = len(outputs) + n_returns = return_ids.size() returns.resize(n_returns) task_output_inlined_bytes = 0 - for i in range(n_returns): - return_id, output = return_ids[i], outputs[i] + for i, output in enumerate(outputs): + if i >= n_returns: + raise ValueError( + "Task returned more than num_returns={} objects.".format( + n_returns)) + + return_id = return_ids[i] context = worker.get_serialization_context() serialized_object = context.serialize(output) data_size = serialized_object.total_bytes @@ -2062,6 +2073,12 @@ cdef class CoreWorker: contained_id, &task_output_inlined_bytes, &returns[0][i]) + i += 1 + if i < n_returns: + raise ValueError( + "Task returned {} objects, but num_returns={}.".format( + i, n_returns)) + cdef c_function_descriptors_to_python( self, const c_vector[CFunctionDescriptor] &c_function_descriptors): diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index c126672a447c..d310a0125054 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -44,6 +44,7 @@ py_test_module_list( "test_basic_5.py", "test_cancel.py", "test_gcs_fault_tolerance.py", + "test_generators.py", "test_metrics_agent.py", "test_component_failures_2.py", "test_component_failures_3.py", diff --git a/python/ray/tests/test_generators.py b/python/ray/tests/test_generators.py new file mode 100644 index 000000000000..211b894fe2f3 --- /dev/null +++ b/python/ray/tests/test_generators.py @@ -0,0 +1,98 @@ +import pytest +import numpy as np +import sys + +import ray + + +def test_generator_oom(ray_start_regular): + @ray.remote(max_retries=0) + def large_values(num_returns): + return [ + np.random.randint( + np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8 + ) + for _ in range(num_returns) + ] + + @ray.remote(max_retries=0) + def large_values_generator(num_returns): + for _ in range(num_returns): + yield np.random.randint( + np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8 + ) + + num_returns = 100 + try: + # Worker may OOM using normal returns. + ray.get(large_values.options(num_returns=num_returns).remote(num_returns)[0]) + except ray.exceptions.WorkerCrashedError: + pass + + # Using a generator will allow the worker to finish. + ray.get( + large_values_generator.options(num_returns=num_returns).remote(num_returns)[0] + ) + + +@pytest.mark.parametrize("use_actors", [False, True]) +def test_generator_returns(ray_start_regular, use_actors): + remote_generator_fn = None + if use_actors: + + @ray.remote + class Generator: + def __init__(self): + pass + + def generator(self, num_returns): + for i in range(num_returns): + yield i + + g = Generator.remote() + remote_generator_fn = g.generator + else: + + @ray.remote(max_retries=0) + def generator(num_returns): + for i in range(num_returns): + yield i + + remote_generator_fn = generator + + # Check cases when num_returns does not match the number of values returned + # by the generator. + num_returns = 3 + + try: + ray.get( + remote_generator_fn.options(num_returns=num_returns).remote(num_returns - 1) + ) + assert False + except ray.exceptions.RayTaskError as e: + assert isinstance(e.as_instanceof_cause(), ValueError) + + try: + ray.get( + remote_generator_fn.options(num_returns=num_returns).remote(num_returns + 1) + ) + assert False + except ray.exceptions.RayTaskError as e: + assert isinstance(e.as_instanceof_cause(), ValueError) + + # Check num_returns=1 case, should receive TypeError because generator + # cannot be pickled. + try: + ray.get(remote_generator_fn.remote(num_returns)) + assert False + except ray.exceptions.RayTaskError as e: + assert isinstance(e.as_instanceof_cause(), TypeError) + + # Check return values. + ray.get( + remote_generator_fn.options(num_returns=num_returns).remote(num_returns) + ) == list(range(num_returns)) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/dask/scheduler.py b/python/ray/util/dask/scheduler.py index e0f3a0e98fb9..f3dbca063b5a 100644 --- a/python/ray/util/dask/scheduler.py +++ b/python/ray/util/dask/scheduler.py @@ -1,6 +1,7 @@ import atexit import threading from collections import defaultdict +from collections import OrderedDict from dataclasses import dataclass from multiprocessing.pool import ThreadPool from typing import Optional @@ -590,7 +591,10 @@ class MultipleReturnFunc: num_returns: int def __call__(self, *args, **kwargs): - return self.func(*args, **kwargs) + returns = self.func(*args, **kwargs) + if isinstance(returns, dict) or isinstance(returns, OrderedDict): + returns = [returns[k] for k in range(len(returns))] + return returns def multiple_return_get(multiple_returns, idx):