Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Revamp ray core design patterns doc [11/n]: limit running tasks #29046

Merged
merged 12 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ async def heavy_compute(self):
# __without_backpressure_end__

# __with_backpressure_start__
MAX_NUM_IN_FLIGHT_TASKS = 100
MAX_NUM_PENDING_TASKS = 100
result_refs = []
for _ in range(NUM_TASKS):
if len(result_refs) > MAX_NUM_IN_FLIGHT_TASKS:
if len(result_refs) > MAX_NUM_PENDING_TASKS:
# update result_refs to only
# track the remaining tasks.
ready_refs, result_refs = ray.wait(result_refs, num_returns=1)
Expand Down
35 changes: 35 additions & 0 deletions doc/source/ray-core/doc_code/limit_running_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# __without_limit_start__
import ray

# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()


@ray.remote
def process(file):
# Actual work is reading the file and process the data.
# Assume it needs to use 2G memory.
pass


NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
# By default, process task will use 1 CPU resource and no other resources.
# This means 16 tasks can run concurrently
# and will OOM since 32G memory is needed while the node only has 16G.
result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)
# __without_limit_end__

# __with_limit_start__
result_refs = []
for i in range(NUM_FILES):
# Now each task will use 2G memory resource
# and the number of concurrently running tasks is limited to 8.
# In this case, setting num_cpus to 2 has the same effect.
result_refs.append(
process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
)
ray.get(result_refs)
# __with_limit_end__
25 changes: 4 additions & 21 deletions doc/source/ray-core/examples/batch_prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"The driver launches all tasks for the entire input dataset. In order to not overload cluster and cause OOM, we use ray.wait() to limit the number of in-flight tasks, see details about this design pattern in {doc}`/ray-core/patterns/limit-tasks`."
"The driver launches all tasks for the entire input dataset. "
]
},
{
Expand All @@ -108,36 +108,18 @@
"model = load_model()\n",
"model_ref = ray.put(model)\n",
"\n",
"# NOTE: This should be set to a number that's large enough (e.g. at least\n",
"# the number of CPUs in the cluster, usually can be even larger) to enable good\n",
"# parallelization. In practice you can start with sys.maxsize (i.e. no limit),\n",
"# and scale down if you have massive number of tasks causing overload/OOM the node.\n",
"import sys\n",
"max_in_flight_tasks = sys.maxsize\n",
"result_refs = []\n",
"results = []\n",
"\n",
"# Launch all prediction tasks.\n",
"for file in input_files:\n",
" # Apply backpressure: when there are more than max_in_flight_tasks tasks pending,\n",
" # we wait with ray.wait() untill one of the object ref is ready. \n",
" if len(result_refs) > max_in_flight_tasks:\n",
" num_ready = len(result_refs) - max_in_flight_tasks\n",
" newly_completed, result_refs = ray.wait(result_refs, num_returns=num_ready)\n",
" for completed_ref in newly_completed:\n",
" results.append(ray.get(completed_ref))\n",
"\n",
" # Launch a prediction task by passing model reference and shard file to it.\n",
" # NOTE: it would be highly inefficient if you are passing the model itself\n",
" # like make_prediction.remote(model, file), which in order to pass the model\n",
" # to remote node will ray.put(model) for each task, potentially overwhelming\n",
" # the local object store and causing out-of-disk error.\n",
" result_refs.append(make_prediction.remote(model_ref, file))\n",
"\n",
"# Wait the remaining pending tasks to complete.\n",
"newly_completed, result_refs = ray.wait(result_refs, num_returns=len(result_refs))\n",
"for completed_ref in newly_completed:\n",
" results.append(ray.get(completed_ref))\n",
"results = ray.get(result_refs)\n",
"\n",
"# Let's check prediction output size.\n",
"for r in results:\n",
Expand All @@ -148,7 +130,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"If it's easy for your to get a good estimate of the in-memory size for data loaded from external storage, you can use a more intuitive way to control the parallelism by specifying the amount of memory needed for each task, e.g. launching tasks with ``make_prediction.options(memory=100*1023*1025).remote(model_ref, file)``. Ray will then do the right thing and make sure tasks scheduled to a node will not exceed its total memory."
"In order to not overload the cluster and cause OOM, we can control the parallelism by setting the proper resource requirement for tasks, see details about this design pattern in {doc}`/ray-core/patterns/limit-running-tasks`.\n",
"For example, if it's easy for your to get a good estimate of the in-memory size for data loaded from external storage, you can control the parallelism by specifying the amount of memory needed for each task, e.g. launching tasks with ``make_prediction.options(memory=100*1023*1025).remote(model_ref, file)``. Ray will then do the right thing and make sure tasks scheduled to a node will not exceed its total memory."
]
},
{
Expand Down
3 changes: 2 additions & 1 deletion doc/source/ray-core/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ This section is a collection of common design patterns and anti-patterns for wri
:maxdepth: 1

generators
limit-tasks
limit-pending-tasks
limit-running-tasks
ray-get-loop
unnecessary-ray-get
ray-get-submission-order
Expand Down
49 changes: 49 additions & 0 deletions doc/source/ray-core/patterns/limit-pending-tasks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
.. _core-patterns-limit-pending-tasks:

Pattern: Using ray.wait to limit the number of pending tasks
jjyao marked this conversation as resolved.
Show resolved Hide resolved
============================================================

In this pattern, we use :ref:`ray.wait() <ray-wait-ref>` to limit the number of pending tasks.

If we submit tasks faster than their process time, we will have tasks accumulated in the pending task queue, which will eventually cause OOM.
jjyao marked this conversation as resolved.
Show resolved Hide resolved
With ``ray.wait()``, we can apply backpressure and limit the number of pending tasks so that the pending task queue won't grow indefinitely and cause OOM.

.. note::

If we submit a finite number of tasks, it's unlikely that we will hit the issue mentioned above since each task only uses a small amount of memory in the queue.
It's more likely to happen when we have an infinite stream of tasks to run.

.. note::
While we can limit the number of concurrently running tasks with this pattern, it is not recommended.
Use :ref:`resources <core-patterns-limit-running-tasks>` to limit that instead
so that we can submit more tasks than concurrently running ones to overlap task execution with task scheduling.
Also resources based limitation can decide the parallelism automatically based on the node's resources.
jjyao marked this conversation as resolved.
Show resolved Hide resolved

Example use case
----------------

You have a worker actor that process tasks at a rate of X tasks per second and you want to submit tasks to it at a rate lower than X to avoid OOM.
jjyao marked this conversation as resolved.
Show resolved Hide resolved

For example, Ray Serve uses this pattern to limit the number of pending queries for each worker.

.. figure:: ../images/limit-pending-tasks.svg

Limit number of pending tasks


Code example
------------

**Without backpressure:**

.. literalinclude:: ../doc_code/limit_pending_tasks.py
:language: python
:start-after: __without_backpressure_start__
:end-before: __without_backpressure_end__

**With backpressure:**

.. literalinclude:: ../doc_code/limit_pending_tasks.py
:language: python
:start-after: __with_backpressure_start__
:end-before: __with_backpressure_end__
39 changes: 39 additions & 0 deletions doc/source/ray-core/patterns/limit-running-tasks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
.. _core-patterns-limit-running-tasks:

Pattern: Using resources to limit the number of concurrently running tasks
==========================================================================

In this pattern, we use :ref:`resources <resource-requirements>` to limit the number of concurrently running tasks.

Running too many tasks at the same time might overload the cluster and cause issues like OOM.
If that is the case, we can reduce the number of concurrently running tasks by increasing the amount of resources requested by those tasks.
This works because Ray makes sure that the sum of the resource requirements of all of the concurrently running tasks on a given node cannot exceed the node's total resources.
jjyao marked this conversation as resolved.
Show resolved Hide resolved

.. note::

For actor tasks, the number of actors we create controls the number of concurrently running actor tasks we can have.

Example use case
----------------

You have a data processing workload that processes each input file independently using Ray :ref:`remote functions <ray-remote-functions>`.
Since each task needs to load the input data into heap memory and do the processing, running too many of them can cause OOM.
In this case, you can use the ``memory`` resource to limit the number of concurrently running tasks (usage of other resources like ``num_cpus`` can achieve the same goal as well).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add a note to clarify that memory is a logical resource.

Also, it would be nice to link or mention @clarng's ongoing memory monitor work.



Code example
------------

**Without limit:**

.. literalinclude:: ../doc_code/limit_running_tasks.py
:language: python
:start-after: __without_limit_start__
:end-before: __without_limit_end__

**With limit:**

.. literalinclude:: ../doc_code/limit_running_tasks.py
:language: python
:start-after: __with_limit_start__
:end-before: __with_limit_end__
37 changes: 0 additions & 37 deletions doc/source/ray-core/patterns/limit-tasks.rst

This file was deleted.