diff --git a/doc/source/ray-core/doc_code/pattern_nested_tasks.py b/doc/source/ray-core/doc_code/pattern_nested_tasks.py new file mode 100644 index 000000000000..e37005e218e4 --- /dev/null +++ b/doc/source/ray-core/doc_code/pattern_nested_tasks.py @@ -0,0 +1,72 @@ +# __pattern_start__ +import ray +import time +from numpy import random + + +def partition(collection): + # Use the last element as the pivot + pivot = collection.pop() + greater, lesser = [], [] + for element in collection: + if element > pivot: + greater.append(element) + else: + lesser.append(element) + return lesser, pivot, greater + + +def quick_sort(collection): + if len(collection) <= 200000: # magic number + return sorted(collection) + else: + lesser, pivot, greater = partition(collection) + lesser = quick_sort(lesser) + greater = quick_sort(greater) + return lesser + [pivot] + greater + + +@ray.remote +def quick_sort_distributed(collection): + # Tiny tasks are an antipattern. + # Thus, in our example we have a "magic number" to + # toggle when distributed recursion should be used vs + # when the sorting should be done in place. The rule + # of thumb is that the duration of an individual task + # should be at least 1 second. + if len(collection) <= 200000: # magic number + return sorted(collection) + else: + lesser, pivot, greater = partition(collection) + lesser = quick_sort_distributed.remote(lesser) + greater = quick_sort_distributed.remote(greater) + return ray.get(lesser) + [pivot] + ray.get(greater) + + +for size in [200000, 4000000, 8000000]: + print(f"Array size: {size}") + unsorted = random.randint(1000000, size=(size)).tolist() + s = time.time() + quick_sort(unsorted) + print(f"Sequential execution: {(time.time() - s):.3f}") + s = time.time() + ray.get(quick_sort_distributed.remote(unsorted)) + print(f"Distributed execution: {(time.time() - s):.3f}") + print("--" * 10) + +# Outputs: + +# Array size: 200000 +# Sequential execution: 0.040 +# Distributed execution: 0.152 +# -------------------- +# Array size: 4000000 +# Sequential execution: 6.161 +# Distributed execution: 5.779 +# -------------------- +# Array size: 8000000 +# Sequential execution: 15.459 +# Distributed execution: 11.282 +# -------------------- + +# __pattern_end__ diff --git a/doc/source/ray-core/examples/batch_training.ipynb b/doc/source/ray-core/examples/batch_training.ipynb index adad814e77b7..25080e59b208 100644 --- a/doc/source/ray-core/examples/batch_training.ipynb +++ b/doc/source/ray-core/examples/batch_training.ipynb @@ -292,7 +292,7 @@ "source": [ "The `task` Ray task contains all logic necessary to load a data batch, transform it and fit and evaluate models on it.\n", "\n", - "You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-tree-of-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code." + "You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-nested-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code." ] }, { diff --git a/doc/source/ray-core/tasks/patterns/tree-of-tasks.svg b/doc/source/ray-core/images/tree-of-tasks.svg similarity index 100% rename from doc/source/ray-core/tasks/patterns/tree-of-tasks.svg rename to doc/source/ray-core/images/tree-of-tasks.svg diff --git a/doc/source/ray-core/patterns/index.rst b/doc/source/ray-core/patterns/index.rst index 54a6541d5bbb..dd96e43a2381 100644 --- a/doc/source/ray-core/patterns/index.rst +++ b/doc/source/ray-core/patterns/index.rst @@ -8,6 +8,7 @@ This section is a collection of common design patterns and anti-patterns for wri .. toctree:: :maxdepth: 1 + nested-tasks generators limit-pending-tasks limit-running-tasks diff --git a/doc/source/ray-core/patterns/nested-tasks.rst b/doc/source/ray-core/patterns/nested-tasks.rst new file mode 100644 index 000000000000..170b8f63525d --- /dev/null +++ b/doc/source/ray-core/patterns/nested-tasks.rst @@ -0,0 +1,35 @@ +.. _task-pattern-nested-tasks: + +Pattern: Using nested tasks to achieve nested parallelism +========================================================= + +In this pattern, a remote task can dynamically call other remote tasks (including itself) for nested parallelism. +This is useful when sub-tasks can be parallelized. + +Keep in mind, though, that nested tasks come with their own cost: extra worker processes, scheduling overhead, bookkeeping overhead, etc. +To achieve speedup with nested parallelism, make sure each of your nested tasks does significant work. See :doc:`too-fine-grained-tasks` for more details. + +Example use case +---------------- + +You want to quick-sort a large list of numbers. +By using nested tasks, we can sort the list in a distributed and parallel fashion. + +.. figure:: ../images/tree-of-tasks.svg + + Tree of tasks + + +Code example +------------ + +.. literalinclude:: ../doc_code/pattern_nested_tasks.py + :language: python + :start-after: __pattern_start__ + :end-before: __pattern_end__ + +We call :ref:`ray.get() ` after both ``quick_sort_distributed`` function invocations take place. +This allows you to maximize parallelism in the workload. See :doc:`ray-get-loop` for more details. + +Notice in the execution times above that with smaller tasks, the non-distributed version is faster. However, as the task execution +time increases, i.e. because the lists to sort are larger, the distributed version is faster. diff --git a/doc/source/ray-core/tasks/patterns/index.rst b/doc/source/ray-core/tasks/patterns/index.rst index d97f4e7f2b40..e2f1a131e207 100644 --- a/doc/source/ray-core/tasks/patterns/index.rst +++ b/doc/source/ray-core/tasks/patterns/index.rst @@ -13,5 +13,4 @@ You may also be interested in visiting the design patterns section for :ref:`act .. toctree:: :maxdepth: -1 - tree-of-tasks map-reduce diff --git a/doc/source/ray-core/tasks/patterns/tree-of-tasks.rst b/doc/source/ray-core/tasks/patterns/tree-of-tasks.rst deleted file mode 100644 index 0cb6176c593b..000000000000 --- a/doc/source/ray-core/tasks/patterns/tree-of-tasks.rst +++ /dev/null @@ -1,113 +0,0 @@ -.. _task-pattern-tree-of-tasks: - -Pattern: Tree of tasks -====================== - -In this pattern, remote tasks are spawned in a recursive fashion to sort a list. Within the definition of a remote function, it is possible to invoke itself (quick_sort_distributed.remote). A single call to the task triggers the dispatch of multiple tasks. - -Example use case ----------------- - -You have a large list of items that you need to process recursively (i.e., sorting). - -We call ``ray.get`` after both ray function invocations take place. This allows you to maximize parallelism in the workload. -Notice in the execution times below that with smaller and finer tasks, the non-distributed version is faster; however, as the task execution -time increases, that is the task with larger list takes longer, the distributed version is faster. Takeaway here is that fine grained tasks are an -anti Ray pattern. - -.. code-block:: python - - lesser = quick_sort_distributed.remote(lesser) - greater = quick_sort_distributed.remote(greater) - ray.get(lesser) + [pivot] + ray.get(greater) - - -.. figure:: tree-of-tasks.svg - - Tree of tasks - -Code example ------------- - -.. code-block:: python - - import ray - - def partition(collection): - # Use the last element as the first pivot - pivot = collection.pop() - greater, lesser = [], [] - for element in collection: - if element > pivot: - greater.append(element) - else: - lesser.append(element) - return lesser, pivot, greater - - def quick_sort(collection): - - if len(collection) <= 200000: # magic number - return sorted(collection) - else: - lesser, pivot, greater = partition(collection) - lesser = quick_sort(lesser) - greater = quick_sort(greater) - return lesser + [pivot] + greater - - @ray.remote - def quick_sort_distributed(collection): - # Tiny tasks are an antipattern. - # Thus, in our example we have a "magic number" to - # toggle when distributed recursion should be used vs - # when the sorting should be done in place. The rule - # of thumb is that the duration of an individual task - # should be at least 1 second. - if len(collection) <= 200000: # magic number - return sorted(collection) - else: - lesser, pivot, greater = partition(collection) - lesser = quick_sort_distributed.remote(lesser) - greater = quick_sort_distributed.remote(greater) - return ray.get(lesser) + [pivot] + ray.get(greater) - - if __name__ == "__main__": - from numpy import random - import time - - ray.init() - for size in [200000, 4000000, 8000000, 10000000, 20000000]: - print(f'Array size: {size}') - unsorted = random.randint(1000000, size=(size)).tolist() - s = time.time() - quick_sort(unsorted) - print(f"Sequential execution: {(time.time() - s):.3f}") - s = time.time() - # put the large object in the global store and pass only the reference - unsorted_obj = ray.put(unsorted) - ray.get(quick_sort_distributed.remote(unsorted_obj)) - print(f"Distributed execution: {(time.time() - s):.3f}") - print("--" * 10) - -.. code-block:: text - - Array size: 200000 - Sequential execution: 0.040 - Distributed execution: 0.152 - -------------------- - Array size: 4000000 - Sequential execution: 6.161 - Distributed execution: 5.779 - -------------------- - Array size: 8000000 - Sequential execution: 15.459 - Distributed execution: 11.282 - -------------------- - Array size: 10000000 - Sequential execution: 20.671 - Distributed execution: 13.132 - -------------------- - Array size: 20000000 - Sequential execution: 47.352 - Distributed execution: 36.213 - -------------------- -