-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Doc] Revamp ray core design patterns doc [6/n]: ray wait limits in-flight tasks #28469
Changes from 7 commits
ed1238c
6bdbdcc
5f69fb5
8ee98f1
ec621e2
03386ad
7614014
5c353b2
151807a
2f11a44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,38 @@ | ||
import numpy as np | ||
|
||
# __without_backpressure_start__ | ||
import ray | ||
|
||
ray.init() | ||
|
||
|
||
# __defining_actor_start__ | ||
@ray.remote | ||
class Actor: | ||
def heavy_compute(self, large_array): | ||
async def heavy_compute(self): | ||
# taking a long time... | ||
# await asyncio.sleep(5) | ||
return | ||
|
||
|
||
# __defining_actor_end__ | ||
|
||
|
||
# __creating_actor_start__ | ||
actor = Actor.remote() | ||
# __creating_actor_end__ | ||
|
||
# __executing_task_start__ | ||
NUM_TASKS = 1000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be better to make the number of tasks infinite so that it better matches the issue description? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the test code will be run by our CI. I couldn't figure out a good way to show infinite on the example but a small number for CI. |
||
result_refs = [] | ||
# When NUM_TASKS is large enough, this will eventually OOM. | ||
for _ in range(NUM_TASKS): | ||
result_refs.append(actor.heavy_compute.remote()) | ||
ray.get(result_refs) | ||
# __without_backpressure_end__ | ||
|
||
# __with_backpressure_start__ | ||
MAX_NUM_IN_FLIGHT_TASKS = 100 | ||
result_refs = [] | ||
results = [] | ||
max_in_flight_tasks = 100 | ||
for i in range(1000): | ||
large_array = np.zeros(1_000_000) | ||
|
||
# Allow 100 in flight calls | ||
# For example, if i = 100, ray.wait blocks until | ||
# 1 of the object_refs in result_refs is ready | ||
# and available before we submit another. | ||
if len(result_refs) > max_in_flight_tasks: | ||
for _ in range(NUM_TASKS): | ||
if len(result_refs) > MAX_NUM_IN_FLIGHT_TASKS: | ||
# update result_refs to only | ||
# track the remaining tasks. | ||
num_ready = len(result_refs) - max_in_flight_tasks | ||
newly_completed, result_refs = ray.wait(result_refs, num_returns=num_ready) | ||
for completed_ref in newly_completed: | ||
results.append(ray.get(completed_ref)) | ||
ready_refs, result_refs = ray.wait(result_refs, num_returns=1) | ||
ray.get(ready_refs) | ||
|
||
result_refs.append(actor.heavy_compute.remote(large_array)) | ||
result_refs.append(actor.heavy_compute.remote()) | ||
|
||
newly_completed, result_refs = ray.wait(result_refs, num_returns=len(result_refs)) | ||
for completed_ref in newly_completed: | ||
results.append(ray.get(completed_ref)) | ||
# __executing_task_end__ | ||
ray.get(result_refs) | ||
# __with_backpressure_end__ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
Pattern: Using ray.wait to limit the number of in-flight tasks | ||
============================================================== | ||
|
||
In this pattern, we use :ref:`ray.wait() <ray-wait-ref>` to limit the number of in-flight tasks. | ||
|
||
If we submit tasks faster than their process time, we will have tasks accumulated in the pending task queue, which will eventually cause OOM. | ||
With ``ray.wait()``, we can apply backpressure and limit the number of in-flight tasks so that the pending task queue won't grow indefinitely and cause OOM. | ||
|
||
|
||
Example use case | ||
---------------- | ||
|
||
You have a worker actor that process tasks at a rate of X tasks per second and you want to submit tasks to it at a rate lower than X to avoid OOM. | ||
|
||
Ray Serve uses this pattern to limit the number of in-flight queries for each worker. | ||
jjyao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
.. figure:: ../images/limit-tasks.svg | ||
jjyao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Limit number of in-flight tasks | ||
|
||
|
||
Code example | ||
------------ | ||
|
||
**Without backpressure:** | ||
|
||
.. literalinclude:: ../doc_code/limit_tasks.py | ||
:language: python | ||
:start-after: __without_backpressure_start__ | ||
:end-before: __without_backpressure_end__ | ||
|
||
**With backpressure:** | ||
|
||
.. literalinclude:: ../doc_code/limit_tasks.py | ||
:language: python | ||
:start-after: __with_backpressure_start__ | ||
:end-before: __with_backpressure_end__ |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why async? Did the example not work previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I chose to use async so tasks are finished in a different order than the submission order so using
ray.wait()
makes more sense otherwise we could justray.get()
one by one.