From e770bdf134599e8c23aad40828b910035aab3a5a Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 18 Oct 2022 12:44:30 -0700 Subject: [PATCH 1/2] [Doc] Revamp ray core design patterns doc [13/n]: nested tasks (#29342) - Edit pass: make the patter generic for nested tasks instead of just recursive tasks. - Move the code to doc_code Signed-off-by: Jiajun Yao Signed-off-by: Stephanie Wang Co-authored-by: Stephanie Wang --- .../ray-core/doc_code/pattern_nested_tasks.py | 72 +++++++++++ .../ray-core/examples/batch_training.ipynb | 2 +- .../patterns => images}/tree-of-tasks.svg | 0 doc/source/ray-core/patterns/index.rst | 1 + doc/source/ray-core/patterns/nested-tasks.rst | 35 ++++++ doc/source/ray-core/tasks/patterns/index.rst | 1 - .../ray-core/tasks/patterns/tree-of-tasks.rst | 113 ------------------ 7 files changed, 109 insertions(+), 115 deletions(-) create mode 100644 doc/source/ray-core/doc_code/pattern_nested_tasks.py rename doc/source/ray-core/{tasks/patterns => images}/tree-of-tasks.svg (100%) create mode 100644 doc/source/ray-core/patterns/nested-tasks.rst delete mode 100644 doc/source/ray-core/tasks/patterns/tree-of-tasks.rst diff --git a/doc/source/ray-core/doc_code/pattern_nested_tasks.py b/doc/source/ray-core/doc_code/pattern_nested_tasks.py new file mode 100644 index 000000000000..e37005e218e4 --- /dev/null +++ b/doc/source/ray-core/doc_code/pattern_nested_tasks.py @@ -0,0 +1,72 @@ +# __pattern_start__ +import ray +import time +from numpy import random + + +def partition(collection): + # Use the last element as the pivot + pivot = collection.pop() + greater, lesser = [], [] + for element in collection: + if element > pivot: + greater.append(element) + else: + lesser.append(element) + return lesser, pivot, greater + + +def quick_sort(collection): + if len(collection) <= 200000: # magic number + return sorted(collection) + else: + lesser, pivot, greater = partition(collection) + lesser = quick_sort(lesser) + greater = quick_sort(greater) + return lesser + [pivot] + greater + + +@ray.remote +def quick_sort_distributed(collection): + # Tiny tasks are an antipattern. + # Thus, in our example we have a "magic number" to + # toggle when distributed recursion should be used vs + # when the sorting should be done in place. The rule + # of thumb is that the duration of an individual task + # should be at least 1 second. + if len(collection) <= 200000: # magic number + return sorted(collection) + else: + lesser, pivot, greater = partition(collection) + lesser = quick_sort_distributed.remote(lesser) + greater = quick_sort_distributed.remote(greater) + return ray.get(lesser) + [pivot] + ray.get(greater) + + +for size in [200000, 4000000, 8000000]: + print(f"Array size: {size}") + unsorted = random.randint(1000000, size=(size)).tolist() + s = time.time() + quick_sort(unsorted) + print(f"Sequential execution: {(time.time() - s):.3f}") + s = time.time() + ray.get(quick_sort_distributed.remote(unsorted)) + print(f"Distributed execution: {(time.time() - s):.3f}") + print("--" * 10) + +# Outputs: + +# Array size: 200000 +# Sequential execution: 0.040 +# Distributed execution: 0.152 +# -------------------- +# Array size: 4000000 +# Sequential execution: 6.161 +# Distributed execution: 5.779 +# -------------------- +# Array size: 8000000 +# Sequential execution: 15.459 +# Distributed execution: 11.282 +# -------------------- + +# __pattern_end__ diff --git a/doc/source/ray-core/examples/batch_training.ipynb b/doc/source/ray-core/examples/batch_training.ipynb index adad814e77b7..25080e59b208 100644 --- a/doc/source/ray-core/examples/batch_training.ipynb +++ b/doc/source/ray-core/examples/batch_training.ipynb @@ -292,7 +292,7 @@ "source": [ "The `task` Ray task contains all logic necessary to load a data batch, transform it and fit and evaluate models on it.\n", "\n", - "You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-tree-of-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code." + "You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-nested-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code." ] }, { diff --git a/doc/source/ray-core/tasks/patterns/tree-of-tasks.svg b/doc/source/ray-core/images/tree-of-tasks.svg similarity index 100% rename from doc/source/ray-core/tasks/patterns/tree-of-tasks.svg rename to doc/source/ray-core/images/tree-of-tasks.svg diff --git a/doc/source/ray-core/patterns/index.rst b/doc/source/ray-core/patterns/index.rst index 54a6541d5bbb..dd96e43a2381 100644 --- a/doc/source/ray-core/patterns/index.rst +++ b/doc/source/ray-core/patterns/index.rst @@ -8,6 +8,7 @@ This section is a collection of common design patterns and anti-patterns for wri .. toctree:: :maxdepth: 1 + nested-tasks generators limit-pending-tasks limit-running-tasks diff --git a/doc/source/ray-core/patterns/nested-tasks.rst b/doc/source/ray-core/patterns/nested-tasks.rst new file mode 100644 index 000000000000..170b8f63525d --- /dev/null +++ b/doc/source/ray-core/patterns/nested-tasks.rst @@ -0,0 +1,35 @@ +.. _task-pattern-nested-tasks: + +Pattern: Using nested tasks to achieve nested parallelism +========================================================= + +In this pattern, a remote task can dynamically call other remote tasks (including itself) for nested parallelism. +This is useful when sub-tasks can be parallelized. + +Keep in mind, though, that nested tasks come with their own cost: extra worker processes, scheduling overhead, bookkeeping overhead, etc. +To achieve speedup with nested parallelism, make sure each of your nested tasks does significant work. See :doc:`too-fine-grained-tasks` for more details. + +Example use case +---------------- + +You want to quick-sort a large list of numbers. +By using nested tasks, we can sort the list in a distributed and parallel fashion. + +.. figure:: ../images/tree-of-tasks.svg + + Tree of tasks + + +Code example +------------ + +.. literalinclude:: ../doc_code/pattern_nested_tasks.py + :language: python + :start-after: __pattern_start__ + :end-before: __pattern_end__ + +We call :ref:`ray.get() ` after both ``quick_sort_distributed`` function invocations take place. +This allows you to maximize parallelism in the workload. See :doc:`ray-get-loop` for more details. + +Notice in the execution times above that with smaller tasks, the non-distributed version is faster. However, as the task execution +time increases, i.e. because the lists to sort are larger, the distributed version is faster. diff --git a/doc/source/ray-core/tasks/patterns/index.rst b/doc/source/ray-core/tasks/patterns/index.rst index d97f4e7f2b40..e2f1a131e207 100644 --- a/doc/source/ray-core/tasks/patterns/index.rst +++ b/doc/source/ray-core/tasks/patterns/index.rst @@ -13,5 +13,4 @@ You may also be interested in visiting the design patterns section for :ref:`act .. toctree:: :maxdepth: -1 - tree-of-tasks map-reduce diff --git a/doc/source/ray-core/tasks/patterns/tree-of-tasks.rst b/doc/source/ray-core/tasks/patterns/tree-of-tasks.rst deleted file mode 100644 index 0cb6176c593b..000000000000 --- a/doc/source/ray-core/tasks/patterns/tree-of-tasks.rst +++ /dev/null @@ -1,113 +0,0 @@ -.. _task-pattern-tree-of-tasks: - -Pattern: Tree of tasks -====================== - -In this pattern, remote tasks are spawned in a recursive fashion to sort a list. Within the definition of a remote function, it is possible to invoke itself (quick_sort_distributed.remote). A single call to the task triggers the dispatch of multiple tasks. - -Example use case ----------------- - -You have a large list of items that you need to process recursively (i.e., sorting). - -We call ``ray.get`` after both ray function invocations take place. This allows you to maximize parallelism in the workload. -Notice in the execution times below that with smaller and finer tasks, the non-distributed version is faster; however, as the task execution -time increases, that is the task with larger list takes longer, the distributed version is faster. Takeaway here is that fine grained tasks are an -anti Ray pattern. - -.. code-block:: python - - lesser = quick_sort_distributed.remote(lesser) - greater = quick_sort_distributed.remote(greater) - ray.get(lesser) + [pivot] + ray.get(greater) - - -.. figure:: tree-of-tasks.svg - - Tree of tasks - -Code example ------------- - -.. code-block:: python - - import ray - - def partition(collection): - # Use the last element as the first pivot - pivot = collection.pop() - greater, lesser = [], [] - for element in collection: - if element > pivot: - greater.append(element) - else: - lesser.append(element) - return lesser, pivot, greater - - def quick_sort(collection): - - if len(collection) <= 200000: # magic number - return sorted(collection) - else: - lesser, pivot, greater = partition(collection) - lesser = quick_sort(lesser) - greater = quick_sort(greater) - return lesser + [pivot] + greater - - @ray.remote - def quick_sort_distributed(collection): - # Tiny tasks are an antipattern. - # Thus, in our example we have a "magic number" to - # toggle when distributed recursion should be used vs - # when the sorting should be done in place. The rule - # of thumb is that the duration of an individual task - # should be at least 1 second. - if len(collection) <= 200000: # magic number - return sorted(collection) - else: - lesser, pivot, greater = partition(collection) - lesser = quick_sort_distributed.remote(lesser) - greater = quick_sort_distributed.remote(greater) - return ray.get(lesser) + [pivot] + ray.get(greater) - - if __name__ == "__main__": - from numpy import random - import time - - ray.init() - for size in [200000, 4000000, 8000000, 10000000, 20000000]: - print(f'Array size: {size}') - unsorted = random.randint(1000000, size=(size)).tolist() - s = time.time() - quick_sort(unsorted) - print(f"Sequential execution: {(time.time() - s):.3f}") - s = time.time() - # put the large object in the global store and pass only the reference - unsorted_obj = ray.put(unsorted) - ray.get(quick_sort_distributed.remote(unsorted_obj)) - print(f"Distributed execution: {(time.time() - s):.3f}") - print("--" * 10) - -.. code-block:: text - - Array size: 200000 - Sequential execution: 0.040 - Distributed execution: 0.152 - -------------------- - Array size: 4000000 - Sequential execution: 6.161 - Distributed execution: 5.779 - -------------------- - Array size: 8000000 - Sequential execution: 15.459 - Distributed execution: 11.282 - -------------------- - Array size: 10000000 - Sequential execution: 20.671 - Distributed execution: 13.132 - -------------------- - Array size: 20000000 - Sequential execution: 47.352 - Distributed execution: 36.213 - -------------------- - From 232e2a9bc6fe5bf3222e28ddde95a464e3a59276 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 24 Oct 2022 16:39:27 -0700 Subject: [PATCH 2/2] [Doc] Revamp ray core design patterns doc [14/n]: map reduce (#29469) Remove the map-reduce pattern since we already have datasets for doing real map-reduce. Signed-off-by: Jiajun Yao Signed-off-by: Stephanie Wang Co-authored-by: Stephanie Wang --- doc/source/ray-core/actors/patterns/index.rst | 2 +- doc/source/ray-core/tasks.rst | 3 +- doc/source/ray-core/tasks/patterns/index.rst | 16 - .../ray-core/tasks/patterns/map-reduce.rst | 71 --- .../ray-core/tasks/patterns/map-reduce.svg | 466 ------------------ 5 files changed, 2 insertions(+), 556 deletions(-) delete mode 100644 doc/source/ray-core/tasks/patterns/index.rst delete mode 100644 doc/source/ray-core/tasks/patterns/map-reduce.rst delete mode 100644 doc/source/ray-core/tasks/patterns/map-reduce.svg diff --git a/doc/source/ray-core/actors/patterns/index.rst b/doc/source/ray-core/actors/patterns/index.rst index 2d602a8928cd..92e70e413ce9 100644 --- a/doc/source/ray-core/actors/patterns/index.rst +++ b/doc/source/ray-core/actors/patterns/index.rst @@ -8,7 +8,7 @@ This section is a collection of common design patterns (and anti-patterns) for R - New users trying to understand how to get started with Ray, and - Advanced users trying to optimize their use of Ray actors -You may also be interested in visiting the design patterns section for :ref:`tasks `. +You may also be interested in visiting the design patterns section for :ref:`tasks `. .. toctree:: :maxdepth: -1 diff --git a/doc/source/ray-core/tasks.rst b/doc/source/ray-core/tasks.rst index aa1f87863eb2..015362e808da 100644 --- a/doc/source/ray-core/tasks.rst +++ b/doc/source/ray-core/tasks.rst @@ -78,7 +78,7 @@ Ray enables arbitrary functions to be executed asynchronously on separate Python .. _ray-object-refs: -Passing object refs to Ray tasks +Passing object refs to Ray tasks --------------------------------------- In addition to values, `Object refs `__ can also be passed into remote functions. When the task gets executed, inside the function body **the argument will be the underlying value**. For example, take this function: @@ -217,4 +217,3 @@ More about Ray Tasks tasks/generators.rst tasks/fault-tolerance.rst tasks/scheduling.rst - tasks/patterns/index.rst diff --git a/doc/source/ray-core/tasks/patterns/index.rst b/doc/source/ray-core/tasks/patterns/index.rst deleted file mode 100644 index e2f1a131e207..000000000000 --- a/doc/source/ray-core/tasks/patterns/index.rst +++ /dev/null @@ -1,16 +0,0 @@ -.. _task-patterns: - -Task Design Patterns -==================== - -This section is a collection of common design patterns (and anti-patterns) for Ray tasks. It is meant as a handbook for both: - -- New users trying to understand how to get started with Ray, and -- Advanced users trying to optimize their use of Ray tasks - -You may also be interested in visiting the design patterns section for :ref:`actors `. - -.. toctree:: - :maxdepth: -1 - - map-reduce diff --git a/doc/source/ray-core/tasks/patterns/map-reduce.rst b/doc/source/ray-core/tasks/patterns/map-reduce.rst deleted file mode 100644 index b5512d6f4d8c..000000000000 --- a/doc/source/ray-core/tasks/patterns/map-reduce.rst +++ /dev/null @@ -1,71 +0,0 @@ -Pattern: Map and reduce -======================= - -For ``map``, this example uses Ray tasks to execute a given function multiple times in parallel (on a separate process). We then use ray.get to fetch the results of each of these functions. - -You can have many ``map`` stages and many ``reduce`` stages. - -Example use case ----------------- - -Implement generic map and reduce functionality with Ray tasks. “map” applies a function to a list of elements. - -.. figure:: map-reduce.svg - - Map and reduce - -Code examples -------------- - -**Single-threaded map:** - -.. code-block:: python - - items = list(range(100)) - map_func = lambda i : i*2 - output = [map_func(i) for i in items] - -**Ray parallel map:** - -.. code-block:: python - - @ray.remote - def map(obj, f): - return f(obj) - - items = list(range(100)) - map_func = lambda i : i*2 - output = ray.get([map.remote(i, map_func) for i in items]) - -**Single-threaded reduce:** - -.. code-block:: python - - items = list(range(100)) - map_func = lambda i : i*2 - output = sum([map_func(i) for i in items]) - -**Ray parallel reduce:** - -.. code-block:: python - - @ray.remote - def map(obj, f): - return f(obj) - @ray.remote - def sum_results(*elements): - return np.sum(elements) - - items = list(range(100)) - map_func = lambda i : i*2 - remote_elements = [map.remote(i, map_func) for i in items] - - # simple reduce - remote_final_sum = sum_results.remote(*remote_elements) - result = ray.get(remote_final_sum) - - # tree reduce - intermediate_results = [sum_results.remote( - *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)] - remote_final_sum = sum_results.remote(*intermediate_results) - result = ray.get(remote_final_sum) diff --git a/doc/source/ray-core/tasks/patterns/map-reduce.svg b/doc/source/ray-core/tasks/patterns/map-reduce.svg deleted file mode 100644 index 1895c2d45cf3..000000000000 --- a/doc/source/ray-core/tasks/patterns/map-reduce.svg +++ /dev/null @@ -1,466 +0,0 @@ - - - - - - image/svg+xml - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -