Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Revamp ray core design patterns doc [13/n]: nested tasks #29342

Merged
merged 11 commits into from
Oct 18, 2022
72 changes: 72 additions & 0 deletions doc/source/ray-core/doc_code/pattern_nested_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# __pattern_start__
import ray
import time
from numpy import random


def partition(collection):
# Use the last element as the pivot
pivot = collection.pop()
greater, lesser = [], []
for element in collection:
if element > pivot:
greater.append(element)
else:
lesser.append(element)
return lesser, pivot, greater


def quick_sort(collection):
if len(collection) <= 200000: # magic number
return sorted(collection)
else:
lesser, pivot, greater = partition(collection)
lesser = quick_sort(lesser)
greater = quick_sort(greater)
return lesser + [pivot] + greater


@ray.remote
def quick_sort_distributed(collection):
# Tiny tasks are an antipattern.
# Thus, in our example we have a "magic number" to
# toggle when distributed recursion should be used vs
# when the sorting should be done in place. The rule
# of thumb is that the duration of an individual task
# should be at least 1 second.
if len(collection) <= 200000: # magic number
return sorted(collection)
else:
lesser, pivot, greater = partition(collection)
lesser = quick_sort_distributed.remote(lesser)
greater = quick_sort_distributed.remote(greater)
return ray.get(lesser) + [pivot] + ray.get(greater)


for size in [200000, 4000000, 8000000]:
print(f"Array size: {size}")
unsorted = random.randint(1000000, size=(size)).tolist()
s = time.time()
quick_sort(unsorted)
print(f"Sequential execution: {(time.time() - s):.3f}")
s = time.time()
ray.get(quick_sort_distributed.remote(unsorted))
print(f"Distributed execution: {(time.time() - s):.3f}")
print("--" * 10)

# Outputs:

# Array size: 200000
# Sequential execution: 0.040
# Distributed execution: 0.152
# --------------------
# Array size: 4000000
# Sequential execution: 6.161
# Distributed execution: 5.779
# --------------------
# Array size: 8000000
# Sequential execution: 15.459
# Distributed execution: 11.282
# --------------------

# __pattern_end__
8 changes: 4 additions & 4 deletions doc/source/ray-core/examples/batch_training.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@
"source": [
"The `task` Ray task contains all logic necessary to load a data batch, transform it and fit and evaluate models on it.\n",
"\n",
"You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-tree-of-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code."
"You may notice that we have previously defined `fit_and_score_sklearn` as a Ray task as well and set it to be executed from inside `task`. This allows us to dynamically create a [tree of tasks](task-pattern-nested-tasks), ensuring that the cluster resources are fully utillized. Without this pattern, each `task` would need to be assigned several CPU cores for the model fitting, meaning that if certain models finish faster, then those CPU cores would stil stay occupied. Thankfully, Ray is able to deal with nested parallelism in tasks without the need for any extra logic, allowing us to simplify the code."
]
},
{
Expand Down Expand Up @@ -602,7 +602,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8.10 ('venv': venv)",
"display_name": "Python 3.8.9 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -616,12 +616,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.8.9"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f"
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
Expand Down
1 change: 1 addition & 0 deletions doc/source/ray-core/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This section is a collection of common design patterns and anti-patterns for wri
.. toctree::
:maxdepth: 1

nested-tasks
generators
limit-pending-tasks
limit-running-tasks
Expand Down
36 changes: 36 additions & 0 deletions doc/source/ray-core/patterns/nested-tasks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
.. _task-pattern-nested-tasks:

Pattern: Using nested tasks to achieve nested parallelism
=========================================================

In this pattern, a remote task can dynamically call other remote tasks (including itself) for nested parallelism.
This is useful when sub-tasks can be parallelized and the speedup due to nested parallelism is more than the overhead.
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved

Keepy in mind that nested tasks come with their own cost: extra worker processes, scheduling overhead, bookkeeping overhead, etc
so make sure your nested tasks do significant work to hide the overhead. See :doc:`too-fine-grained-tasks` for more details.
As a tip, you should always benchmark the nested tasks version of the code and make sure you get the speedup instead of slowdown!
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved

Example use case
----------------

You want to quick-sort a large list of numbers.
By making the recursive calls distributed, we can sort the list distributedly and parallelly.
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved

.. figure:: ../images/tree-of-tasks.svg

Tree of tasks


Code example
------------

.. literalinclude:: ../doc_code/pattern_nested_tasks.py
:language: python
:start-after: __pattern_start__
:end-before: __pattern_end__

We call :ref:`ray.get() <ray-get-ref>` after both ``quick_sort_distributed`` function invocations take place.
This allows you to maximize parallelism in the workload. See :doc:`ray-get-loop` for more details.

Notice in the execution times above that with smaller and finer tasks, the non-distributed version is faster; however, as the task execution
time increases, that is the task with larger list takes longer, the distributed version is faster.
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
1 change: 0 additions & 1 deletion doc/source/ray-core/tasks/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ You may also be interested in visiting the design patterns section for :ref:`act
.. toctree::
:maxdepth: -1

tree-of-tasks
map-reduce
113 changes: 0 additions & 113 deletions doc/source/ray-core/tasks/patterns/tree-of-tasks.rst

This file was deleted.