From ed1238c2136cc61fa087c5079a5dc020fcfa508c Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 13 Sep 2022 08:06:30 -0700 Subject: [PATCH 1/9] Revamp ray core design patterns doc [6/n]: ray wait limits in-flight tasks Signed-off-by: Jiajun Yao --- doc/source/ray-core/doc_code/limit_tasks.py | 46 +- doc/source/ray-core/images/limit-tasks.svg | 1 + doc/source/ray-core/patterns/index.rst | 1 + doc/source/ray-core/patterns/limit-tasks.rst | 37 ++ doc/source/ray-core/tasks/patterns/index.rst | 1 - .../ray-core/tasks/patterns/limit-tasks.rst | 42 -- .../ray-core/tasks/patterns/limit-tasks.svg | 597 ------------------ 7 files changed, 60 insertions(+), 665 deletions(-) create mode 100644 doc/source/ray-core/images/limit-tasks.svg create mode 100644 doc/source/ray-core/patterns/limit-tasks.rst delete mode 100644 doc/source/ray-core/tasks/patterns/limit-tasks.rst delete mode 100644 doc/source/ray-core/tasks/patterns/limit-tasks.svg diff --git a/doc/source/ray-core/doc_code/limit_tasks.py b/doc/source/ray-core/doc_code/limit_tasks.py index 4f6e66b5cdd1..6b5b4c50770e 100644 --- a/doc/source/ray-core/doc_code/limit_tasks.py +++ b/doc/source/ray-core/doc_code/limit_tasks.py @@ -1,47 +1,43 @@ -import numpy as np - +# __without_backpressure_start__ import ray ray.init() -# __defining_actor_start__ @ray.remote class Actor: - def heavy_compute(self, large_array): + async def heavy_compute(self): # taking a long time... + # await asyncio.sleep(5) return -# __defining_actor_end__ - - -# __creating_actor_start__ actor = Actor.remote() -# __creating_actor_end__ -# __executing_task_start__ +NUM_TASKS = 1000 result_refs = [] -results = [] -max_in_flight_tasks = 100 -for i in range(1000): - large_array = np.zeros(1_000_000) - - # Allow 100 in flight calls +# When NUM_TASKS is large enough, this will eventually OOM. +for _ in range(NUM_TASKS): + result_refs.append(actor.heavy_compute.remote()) +ray.get(result_refs) +# __without_backpressure_end__ + +# __with_backpressure_start__ +MAX_NUM_IN_FLIGHT_TASKS = 100 +result_refs = [] +for _ in range(NUM_TASKS): + # Allow 100 in flight tasks. # For example, if i = 100, ray.wait blocks until # 1 of the object_refs in result_refs is ready # and available before we submit another. - if len(result_refs) > max_in_flight_tasks: + if len(result_refs) > MAX_NUM_IN_FLIGHT_TASKS: + num_ready = len(result_refs) - MAX_NUM_IN_FLIGHT_TASKS # update result_refs to only # track the remaining tasks. - num_ready = len(result_refs) - max_in_flight_tasks newly_completed, result_refs = ray.wait(result_refs, num_returns=num_ready) - for completed_ref in newly_completed: - results.append(ray.get(completed_ref)) + ray.get(newly_completed) - result_refs.append(actor.heavy_compute.remote(large_array)) + result_refs.append(actor.heavy_compute.remote()) -newly_completed, result_refs = ray.wait(result_refs, num_returns=len(result_refs)) -for completed_ref in newly_completed: - results.append(ray.get(completed_ref)) -# __executing_task_end__ +ray.get(result_refs) +# __with_backpressure_end__ diff --git a/doc/source/ray-core/images/limit-tasks.svg b/doc/source/ray-core/images/limit-tasks.svg new file mode 100644 index 000000000000..e814e346d851 --- /dev/null +++ b/doc/source/ray-core/images/limit-tasks.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/doc/source/ray-core/patterns/index.rst b/doc/source/ray-core/patterns/index.rst index 162fbfb1bd84..0143b2e1d0a1 100644 --- a/doc/source/ray-core/patterns/index.rst +++ b/doc/source/ray-core/patterns/index.rst @@ -9,6 +9,7 @@ This section is a collection of common design patterns and anti-patterns for wri :maxdepth: 1 generators + limit-tasks ray-get-loop unnecessary-ray-get ray-get-submission-order diff --git a/doc/source/ray-core/patterns/limit-tasks.rst b/doc/source/ray-core/patterns/limit-tasks.rst new file mode 100644 index 000000000000..6d497731c735 --- /dev/null +++ b/doc/source/ray-core/patterns/limit-tasks.rst @@ -0,0 +1,37 @@ +Pattern: Using ray.wait to limit the number of in-flight tasks +============================================================== + +In this pattern, we use :ref:`ray.wait() ` to limit the number of in-flight tasks. + +If we submit tasks faster than their process time, we will have tasks accumulated in the pending task queue, which will eventually cause OOM. +With ``ray.wait()``, we can apply backpressure and limit the number of in-flight tasks so that the pending task queue won't grow indefinitely and cause OOM. + + +Example use case +---------------- + +You have a worker actor that process tasks at rate of X tasks per second and you want to submit tasks to it at a rate lower than X to avoid OOM. + +Ray Serve uses this pattern to limit the number of in-flight queries for each worker. + +.. figure:: limit-tasks.svg + + Limit number of in-flight tasks + + +Code example +------------ + +**Without backpressure:** + +.. literalinclude:: ../../doc_code/limit_tasks.py + :language: python + :start-after: __without_backpressure_start__ + :end-before: __without_backpressure_end__ + +**With backpressure:** + +.. literalinclude:: ../../doc_code/limit_tasks.py + :language: python + :start-after: __with_backpressure_start__ + :end-before: __with_backpressure_end__ diff --git a/doc/source/ray-core/tasks/patterns/index.rst b/doc/source/ray-core/tasks/patterns/index.rst index 7b6e8dead24d..ba0b1bb0d385 100644 --- a/doc/source/ray-core/tasks/patterns/index.rst +++ b/doc/source/ray-core/tasks/patterns/index.rst @@ -15,7 +15,6 @@ You may also be interested in visiting the design patterns section for :ref:`act tree-of-tasks map-reduce - limit-tasks closure-capture global-variables too-many-results diff --git a/doc/source/ray-core/tasks/patterns/limit-tasks.rst b/doc/source/ray-core/tasks/patterns/limit-tasks.rst deleted file mode 100644 index 4f3b226efc43..000000000000 --- a/doc/source/ray-core/tasks/patterns/limit-tasks.rst +++ /dev/null @@ -1,42 +0,0 @@ -Pattern: Using ray.wait to limit the number of in-flight tasks -============================================================== - - -Example use case ----------------- - -When you submit a ray task or actor call, Ray will make sure the data is available to the worker. However, if you submit too many tasks rapidly, the worker might be overloaded and run out of memory. You should use ray.wait to block until a certain number of tasks are ready. - -Ray Serve uses this pattern to limit the number of in flight queries for each worker. - -.. figure:: limit-tasks.svg - - Limit number of parallel tasks - - -Code example ------------- - -**Without backpressure:** - -.. code-block:: python - - @ray.remote - class Actor: - def heavy_compute(self, large_array): - # taking a long time... - - actor = Actor.remote() - result_refs = [] - for i in range(1000): - large_array = np.zeros(1_000_000) - result_refs.append(actor.heavy_compute.remote(large_array)) - results = ray.get(result_refs) - -**With backpressure:** - -.. literalinclude:: ../../doc_code/limit_tasks.py - :language: python - :start-after: __executing_task_start__ - :end-before: __executing_task_end__ - diff --git a/doc/source/ray-core/tasks/patterns/limit-tasks.svg b/doc/source/ray-core/tasks/patterns/limit-tasks.svg deleted file mode 100644 index 23509876c289..000000000000 --- a/doc/source/ray-core/tasks/patterns/limit-tasks.svg +++ /dev/null @@ -1,597 +0,0 @@ - - - - - - image/svg+xml - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - From 6bdbdcc1111fc080908a55e975451e7ee8d0f7c2 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 13 Sep 2022 10:51:52 -0700 Subject: [PATCH 2/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/patterns/limit-tasks.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/source/ray-core/patterns/limit-tasks.rst b/doc/source/ray-core/patterns/limit-tasks.rst index 6d497731c735..f20ab65ab39a 100644 --- a/doc/source/ray-core/patterns/limit-tasks.rst +++ b/doc/source/ray-core/patterns/limit-tasks.rst @@ -10,11 +10,11 @@ With ``ray.wait()``, we can apply backpressure and limit the number of in-flight Example use case ---------------- -You have a worker actor that process tasks at rate of X tasks per second and you want to submit tasks to it at a rate lower than X to avoid OOM. +You have a worker actor that process tasks at a rate of X tasks per second and you want to submit tasks to it at a rate lower than X to avoid OOM. Ray Serve uses this pattern to limit the number of in-flight queries for each worker. -.. figure:: limit-tasks.svg +.. figure:: ../images/limit-tasks.svg Limit number of in-flight tasks @@ -24,14 +24,14 @@ Code example **Without backpressure:** -.. literalinclude:: ../../doc_code/limit_tasks.py +.. literalinclude:: ../doc_code/limit_tasks.py :language: python :start-after: __without_backpressure_start__ :end-before: __without_backpressure_end__ **With backpressure:** -.. literalinclude:: ../../doc_code/limit_tasks.py +.. literalinclude:: ../doc_code/limit_tasks.py :language: python :start-after: __with_backpressure_start__ :end-before: __with_backpressure_end__ From 5f69fb5219f4cbb971e16501f44547c1c8e3c6aa Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 13 Sep 2022 13:27:45 -0700 Subject: [PATCH 3/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/images/limit-tasks.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/images/limit-tasks.svg b/doc/source/ray-core/images/limit-tasks.svg index e814e346d851..fae6f2e3f030 100644 --- a/doc/source/ray-core/images/limit-tasks.svg +++ b/doc/source/ray-core/images/limit-tasks.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file From 8ee98f1f347c4c32b2755be2abc7c35eb5b69453 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 13 Sep 2022 14:04:40 -0700 Subject: [PATCH 4/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/images/limit-tasks.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/images/limit-tasks.svg b/doc/source/ray-core/images/limit-tasks.svg index fae6f2e3f030..748e807226dd 100644 --- a/doc/source/ray-core/images/limit-tasks.svg +++ b/doc/source/ray-core/images/limit-tasks.svg @@ -1 +1 @@ - \ No newline at end of file + From ec621e23929a0cef9db81497ac929cf28da60274 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 13 Sep 2022 15:53:56 -0700 Subject: [PATCH 5/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/patterns/limit-tasks.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/doc/source/ray-core/patterns/limit-tasks.rst b/doc/source/ray-core/patterns/limit-tasks.rst index f20ab65ab39a..e350c3c10e12 100644 --- a/doc/source/ray-core/patterns/limit-tasks.rst +++ b/doc/source/ray-core/patterns/limit-tasks.rst @@ -28,6 +28,8 @@ Code example :language: python :start-after: __without_backpressure_start__ :end-before: __without_backpressure_end__ + :start-after: __with_backpressure_start__ + :end-before: __with_backpressure_end__ **With backpressure:** @@ -35,3 +37,7 @@ Code example :language: python :start-after: __with_backpressure_start__ :end-before: __with_backpressure_end__ +.. literalinclude:: ../doc_code/limit_tasks.py + :language: python + :start-after: __without_backpressure_start__ + :end-before: __without_backpressure_end__ From 03386ad72ef6646a38bce9b6ee7cd9a881d8330b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 13 Sep 2022 16:27:35 -0700 Subject: [PATCH 6/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/patterns/limit-tasks.rst | 6 ------ 1 file changed, 6 deletions(-) diff --git a/doc/source/ray-core/patterns/limit-tasks.rst b/doc/source/ray-core/patterns/limit-tasks.rst index e350c3c10e12..f20ab65ab39a 100644 --- a/doc/source/ray-core/patterns/limit-tasks.rst +++ b/doc/source/ray-core/patterns/limit-tasks.rst @@ -28,8 +28,6 @@ Code example :language: python :start-after: __without_backpressure_start__ :end-before: __without_backpressure_end__ - :start-after: __with_backpressure_start__ - :end-before: __with_backpressure_end__ **With backpressure:** @@ -37,7 +35,3 @@ Code example :language: python :start-after: __with_backpressure_start__ :end-before: __with_backpressure_end__ -.. literalinclude:: ../doc_code/limit_tasks.py - :language: python - :start-after: __without_backpressure_start__ - :end-before: __without_backpressure_end__ From 7614014daf6f72ede36d6aba2c92232a7d2e0974 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 15 Sep 2022 23:05:31 -0700 Subject: [PATCH 7/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/doc_code/limit_tasks.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/doc/source/ray-core/doc_code/limit_tasks.py b/doc/source/ray-core/doc_code/limit_tasks.py index 6b5b4c50770e..d761b866a36f 100644 --- a/doc/source/ray-core/doc_code/limit_tasks.py +++ b/doc/source/ray-core/doc_code/limit_tasks.py @@ -26,16 +26,11 @@ async def heavy_compute(self): MAX_NUM_IN_FLIGHT_TASKS = 100 result_refs = [] for _ in range(NUM_TASKS): - # Allow 100 in flight tasks. - # For example, if i = 100, ray.wait blocks until - # 1 of the object_refs in result_refs is ready - # and available before we submit another. if len(result_refs) > MAX_NUM_IN_FLIGHT_TASKS: - num_ready = len(result_refs) - MAX_NUM_IN_FLIGHT_TASKS # update result_refs to only # track the remaining tasks. - newly_completed, result_refs = ray.wait(result_refs, num_returns=num_ready) - ray.get(newly_completed) + ready_refs, result_refs = ray.wait(result_refs, num_returns=1) + ray.get(ready_refs) result_refs.append(actor.heavy_compute.remote()) From 151807a44f2c8bb16c0b46ec99e5567d6026e5e1 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 19 Sep 2022 14:01:10 -0700 Subject: [PATCH 8/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/images/limit-tasks.svg | 2 +- doc/source/ray-core/patterns/limit-tasks.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/ray-core/images/limit-tasks.svg b/doc/source/ray-core/images/limit-tasks.svg index 748e807226dd..a47b76ba9410 100644 --- a/doc/source/ray-core/images/limit-tasks.svg +++ b/doc/source/ray-core/images/limit-tasks.svg @@ -1 +1 @@ - + diff --git a/doc/source/ray-core/patterns/limit-tasks.rst b/doc/source/ray-core/patterns/limit-tasks.rst index f20ab65ab39a..6e51d46dc836 100644 --- a/doc/source/ray-core/patterns/limit-tasks.rst +++ b/doc/source/ray-core/patterns/limit-tasks.rst @@ -12,7 +12,7 @@ Example use case You have a worker actor that process tasks at a rate of X tasks per second and you want to submit tasks to it at a rate lower than X to avoid OOM. -Ray Serve uses this pattern to limit the number of in-flight queries for each worker. +For example, Ray Serve uses this pattern to limit the number of in-flight queries for each worker. .. figure:: ../images/limit-tasks.svg From 2f11a44c7f824ce28b1cc326158e3441bc4ce54f Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 19 Sep 2022 14:46:08 -0700 Subject: [PATCH 9/9] up Signed-off-by: Jiajun Yao --- doc/source/ray-core/images/limit-tasks.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/images/limit-tasks.svg b/doc/source/ray-core/images/limit-tasks.svg index a47b76ba9410..4b1b07973e55 100644 --- a/doc/source/ray-core/images/limit-tasks.svg +++ b/doc/source/ray-core/images/limit-tasks.svg @@ -1 +1 @@ - +