Skip to content

Commit

Permalink
[Doc] Revamp ray core design patterns doc [13/n]: nested tasks (ray-p…
Browse files Browse the repository at this point in the history
…roject#29342)

- Edit pass: make the patter generic for nested tasks instead of just recursive tasks.
- Move the code to doc_code

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: Stephanie Wang <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
  • Loading branch information
2 people authored and WeichenXu123 committed Dec 19, 2022
1 parent 7783213 commit c310bb8
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 115 deletions.
72 changes: 72 additions & 0 deletions doc/source/ray-core/doc_code/pattern_nested_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# __pattern_start__
import ray
import time
from numpy import random


def partition(collection):
# Use the last element as the pivot
pivot = collection.pop()
greater, lesser = [], []
for element in collection:
if element > pivot:
greater.append(element)
else:
lesser.append(element)
return lesser, pivot, greater


def quick_sort(collection):
if len(collection) <= 200000: # magic number
return sorted(collection)
else:
lesser, pivot, greater = partition(collection)
lesser = quick_sort(lesser)
greater = quick_sort(greater)
return lesser + [pivot] + greater


@ray.remote
def quick_sort_distributed(collection):
# Tiny tasks are an antipattern.
# Thus, in our example we have a "magic number" to
# toggle when distributed recursion should be used vs
# when the sorting should be done in place. The rule
# of thumb is that the duration of an individual task
# should be at least 1 second.
if len(collection) <= 200000: # magic number
return sorted(collection)
else:
lesser, pivot, greater = partition(collection)
lesser = quick_sort_distributed.remote(lesser)
greater = quick_sort_distributed.remote(greater)
return ray.get(lesser) + [pivot] + ray.get(greater)


for size in [200000, 4000000, 8000000]:
print(f"Array size: {size}")
unsorted = random.randint(1000000, size=(size)).tolist()
s = time.time()
quick_sort(unsorted)
print(f"Sequential execution: {(time.time() - s):.3f}")
s = time.time()
ray.get(quick_sort_distributed.remote(unsorted))
print(f"Distributed execution: {(time.time() - s):.3f}")
print("--" * 10)

# Outputs:

# Array size: 200000
# Sequential execution: 0.040
# Distributed execution: 0.152
# --------------------
# Array size: 4000000
# Sequential execution: 6.161
# Distributed execution: 5.779
# --------------------
# Array size: 8000000
# Sequential execution: 15.459
# Distributed execution: 11.282
# --------------------

# __pattern_end__
2 changes: 1 addition & 1 deletion doc/source/ray-core/examples/batch_training.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@
"source": [
"The `task` Ray task contains all logic necessary to load a data batch, transform it and fit and evaluate models on it.\n",
"\n",
"You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-tree-of-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code."
"You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-nested-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code."
]
},
{
Expand Down
1 change: 1 addition & 0 deletions doc/source/ray-core/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This section is a collection of common design patterns and anti-patterns for wri
.. toctree::
:maxdepth: 1

nested-tasks
generators
limit-pending-tasks
limit-running-tasks
Expand Down
35 changes: 35 additions & 0 deletions doc/source/ray-core/patterns/nested-tasks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
.. _task-pattern-nested-tasks:

Pattern: Using nested tasks to achieve nested parallelism
=========================================================

In this pattern, a remote task can dynamically call other remote tasks (including itself) for nested parallelism.
This is useful when sub-tasks can be parallelized.

Keep in mind, though, that nested tasks come with their own cost: extra worker processes, scheduling overhead, bookkeeping overhead, etc.
To achieve speedup with nested parallelism, make sure each of your nested tasks does significant work. See :doc:`too-fine-grained-tasks` for more details.

Example use case
----------------

You want to quick-sort a large list of numbers.
By using nested tasks, we can sort the list in a distributed and parallel fashion.

.. figure:: ../images/tree-of-tasks.svg

Tree of tasks


Code example
------------

.. literalinclude:: ../doc_code/pattern_nested_tasks.py
:language: python
:start-after: __pattern_start__
:end-before: __pattern_end__

We call :ref:`ray.get() <ray-get-ref>` after both ``quick_sort_distributed`` function invocations take place.
This allows you to maximize parallelism in the workload. See :doc:`ray-get-loop` for more details.

Notice in the execution times above that with smaller tasks, the non-distributed version is faster. However, as the task execution
time increases, i.e. because the lists to sort are larger, the distributed version is faster.
1 change: 0 additions & 1 deletion doc/source/ray-core/tasks/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ You may also be interested in visiting the design patterns section for :ref:`act
.. toctree::
:maxdepth: -1

tree-of-tasks
map-reduce
113 changes: 0 additions & 113 deletions doc/source/ray-core/tasks/patterns/tree-of-tasks.rst

This file was deleted.

0 comments on commit c310bb8

Please sign in to comment.