Skip to content

Commit

Permalink
[Doc] Make doc code snippet testable [6/n] (#36942)
Browse files Browse the repository at this point in the history
Change code snippet from ..code-block:: to ..testcode::

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Jul 7, 2023
1 parent 957f9b7 commit b56aab4
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 88 deletions.
14 changes: 13 additions & 1 deletion doc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,26 @@ doctest(
"source/serve/production-guide/fault-tolerance.md",
"source/data/batch_inference.rst",
"source/data/transforming-data.rst",
"source/train/faq.rst"
"source/train/faq.rst",
"source/workflows/**/*.rst",
"source/workflows/**/*.md"
]
),
size = "large",
tags = ["team:none"]
)


doctest(
name="doctest[workflow]",
files = glob(
include=[
"source/workflows/**/*.rst",
"source/workflows/**/*.md"
]
),
tags = ["team:core"]
)

doctest(
files = [
Expand Down
9 changes: 8 additions & 1 deletion doc/source/workflows/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ Ray Workflows provides strong fault tolerance and exactly-once execution semanti

Checkpoints can be skipped by specifying ``checkpoint=False``:

.. code-block:: python
.. testcode::

import ray
from ray import workflow

@ray.remote
def read_data(num: int):
return [i for i in range(num)]

data = read_data.options(**workflow.options(checkpoint=False)).bind(10)
Expand Down
96 changes: 67 additions & 29 deletions doc/source/workflows/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ Here is a single three-node DAG (note the use of ``.bind(...)`` instead of
``.remote(...)``). The DAG will not be executed until further actions are
taken on it:

.. code-block:: python
.. testcode::
:hide:

import tempfile
import ray

temp_dir = tempfile.TemporaryDirectory()

ray.init(num_gpus=1, storage=f"file://{temp_dir.name}")

.. testcode::

from typing import List
import ray
Expand Down Expand Up @@ -46,7 +56,7 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``:

Next, let's execute the DAG we defined and inspect the result:

.. code-block:: python
.. testcode::

# <follow the previous code>
from ray import workflow
Expand All @@ -59,6 +69,11 @@ Next, let's execute the DAG we defined and inspect the result:
output_ref = workflow.run_async(output)
print(ray.get(output_ref))

.. testoutput::

285
285


Each node in the original DAG becomes a workflow task. You can think of workflow
tasks as wrappers around Ray tasks that insert *checkpointing logic* to
Expand All @@ -72,7 +87,7 @@ You can directly set Ray options to a workflow task just like a normal
Ray remote function. To set workflow-specific options, use ``workflow.options``
either as a decorator or as kwargs to ``<task>.options``:

.. code-block:: python
.. testcode::

import ray
from ray import workflow
Expand All @@ -91,7 +106,7 @@ Retrieving Workflow Results

To retrieve a workflow result, assign ``workflow_id`` when running a workflow:

.. code-block:: python
.. testcode::

import ray
from ray import workflow
Expand All @@ -100,7 +115,7 @@ To retrieve a workflow result, assign ``workflow_id`` when running a workflow:
# Cleanup previous workflows
# An exception will be raised if it doesn't exist.
workflow.delete("add_example")
except workflow.WorkflowNotFoundError:
except workflow.exceptions.WorkflowNotFoundError:
pass

@ray.remote
Expand All @@ -113,18 +128,26 @@ To retrieve a workflow result, assign ``workflow_id`` when running a workflow:

ret = add.bind(get_val.bind(), 20)

assert workflow.run(ret, workflow_id="add_example") == 30
print(workflow.run(ret, workflow_id="add_example"))

.. testoutput::

30

The workflow results can be retrieved with
``workflow.get_output(workflow_id)``. If a workflow is not given a
``workflow_id``, a random string is set as the ``workflow_id``. To list all
workflow ids, call ``ray.workflow.list_all()``.

.. code-block:: python
.. testcode::

assert workflow.get_output("add_example") == 30
print(workflow.get_output("add_example"))
# "workflow.get_output_async" is an asynchronous version

.. testoutput::

30

Sub-Task Results
~~~~~~~~~~~~~~~~

Expand All @@ -139,7 +162,7 @@ If there are multiple tasks with the same id, a suffix with a counter ``_n`` wil
Once a task id is given, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_id")``.
If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples:

.. code-block:: python
.. testcode::

import ray
from ray import workflow
Expand All @@ -148,7 +171,7 @@ If the task with the given ``task_id`` hasn't been executed before the workflow
try:
# cleanup previous workflows
workflow.delete(workflow_id)
except workflow.WorkflowNotFoundError:
except workflow.exceptions.WorkflowNotFoundError:
pass

@ray.remote
Expand Down Expand Up @@ -180,7 +203,7 @@ Workflow provides two ways to handle application-level exceptions: (1) automatic
``max_retries`` and ``retry_exceptions`` are also Ray task options,
so they should be used inside the Ray remote decorator. Here is how you could use them:

.. code-block:: python
.. testcode::

# specify in decorator
@workflow.options(catch_exceptions=True)
Expand All @@ -196,7 +219,7 @@ so they should be used inside the Ray remote decorator. Here is how you could us

Here is one example:

.. code-block:: python
.. testcode::

from typing import Tuple
import random
Expand All @@ -212,7 +235,10 @@ Here is one example:

# Tries up to five times before giving up.
r1 = faulty_function.options(max_retries=5).bind()
workflow.run(r1)
try:
workflow.run(r1)
except ray.exceptions.RayTaskError:
pass

@ray.remote
def handle_errors(result: Tuple[str, Exception]):
Expand Down Expand Up @@ -244,8 +270,10 @@ Failure model

Note that tasks that have side effects still need to be idempotent. This is because the task could always fail before its result is logged.

.. code-block:: python
:caption: Non-idempotent workflow:
Non-idempotent workflow:

.. testcode::
:skipif: True

@ray.remote
def book_flight_unsafe() -> FlightTicket:
Expand All @@ -256,8 +284,10 @@ Note that tasks that have side effects still need to be idempotent. This is beca
# UNSAFE: we could book multiple flight tickets
workflow.run(book_flight_unsafe.bind())

.. code-block:: python
:caption: Idempotent workflow:
Idempotent workflow:

.. testcode::
:skipif: True

@ray.remote
def generate_id() -> str:
Expand All @@ -282,7 +312,7 @@ Workflow tasks can be dynamically created in the runtime. In theory, Ray DAG is
static which means a DAG node can't be returned in a DAG node. For example, the
following code is invalid:

.. code-block:: python
.. testcode::

@ray.remote
def bar(): ...
Expand All @@ -291,12 +321,19 @@ following code is invalid:
def foo():
return bar.bind() # This is invalid since Ray DAG is static

ray.get(foo.bind().execute()) # This will error
try:
ray.get(foo.bind().execute()) # This will error
except ray.exceptions.RayTaskError:
print("Ray DAG is static")

.. testoutput::

Ray DAG is static

Workflow introduces a utility function called ``workflow.continuation`` which
makes Ray DAG node can return a DAG in the runtime:

.. code-block:: python
.. testcode::

@ray.remote
def bar():
Expand All @@ -317,7 +354,7 @@ The dynamic workflow enables nesting, looping, and recursion within workflows.
The following example shows how to implement the recursive ``factorial`` program
using dynamically workflow:

.. code-block:: python
.. testcode::

@ray.remote
def factorial(n: int) -> int:
Expand All @@ -343,7 +380,8 @@ substituted for the task's return.

To better understand dynamic workflows, let's look at a more realistic example of booking a trip:

.. code-block:: python
.. testcode::
:skipif: True

@ray.remote
def book_flight(...) -> Flight: ...
Expand Down Expand Up @@ -401,10 +439,10 @@ not resolved. But we ensure that all ancestors of a task are fully executed
before the task starts which is different from passing them into a Ray remote
function whether they have been executed or not is not defined.

.. code-block:: python
.. testcode::

@ray.remote
def add(values: List[ray.ObjectRef[int]]) -> int:
def add(values: List[ray.ObjectRef]) -> int:
# although those values are not resolved, they have been
# *fully executed and checkpointed*. This guarantees exactly-once
# execution semantics.
Expand All @@ -426,7 +464,7 @@ recoverability, their contents will be logged to durable storage before
executing. However, an object will not be checkpointed more than once, even if
it is passed to many different tasks.

.. code-block:: python
.. testcode::

@ray.remote
def do_add(a, b):
Expand All @@ -446,10 +484,10 @@ Setting custom resources for tasks

You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, ``num_gpus``, and ``resources`` arguments that Ray tasks take):

.. code-block:: python
.. testcode::

@ray.remote(num_gpus=1)
def train_model() -> Model:
@ray.remote
def train_model():
pass # This task is assigned to a GPU by Ray.

workflow.run(train_model.bind())
workflow.run(train_model.options(num_gpus=1).bind())
34 changes: 22 additions & 12 deletions doc/source/workflows/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,34 @@ Using events

Workflow events are a special type of workflow task. They "finish" when the event occurs. `workflow.wait_for_event(EventListenerType)` can be used to create an event task.

.. testcode::
:hide:

import tempfile
import ray

temp_dir = tempfile.TemporaryDirectory()

ray.init(storage=f"file://{temp_dir.name}")

.. code-block:: python

.. testcode::

import time
import ray
from ray import workflow

# Create an event which finishes after 60 seconds.
event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 60)
# Create an event which finishes after 2 seconds.
event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)

# Create another event which finishes after 30 seconds.
event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 30)
# Create another event which finishes after 1 seconds.
event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1)

@ray.remote
def gather(*args):
return args
# Gather will run after 60 seconds when both event1 and event2 are done.
# Gather will run after 2 seconds when both event1 and event2 are done.
workflow.run(gather.bind(event1_task, event2_task))


Expand Down Expand Up @@ -81,7 +91,9 @@ Custom event listeners

Custom event listeners can be written by subclassing the EventListener interface.

.. code-block:: python
.. testcode::

from ray.workflow.common import Event

class EventListener:
def __init__(self):
Expand All @@ -100,7 +112,7 @@ Custom event listeners can be written by subclassing the EventListener interface

The `listener.poll_for_events()` coroutine should finish when the event is done. Arguments to `workflow.wait_for_event` are passed to `poll_for_events()`. For example, an event listener which sleeps until a timestamp can be written as:

.. code-block:: python
.. testcode::

class TimerListener(EventListener):
async def poll_for_event(self, timestamp):
Expand All @@ -116,25 +128,23 @@ The `event_checkpointed` routine can be overridden to support systems with exact
After the workflow finishes checkpointing the event, the event listener will be invoked and can free the event. For example, to guarantee that events are consumed from a `kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits>` queue:


.. code-block:: python
.. testcode::

KafkaEventType = ...

class QueueEventListener:
def __init__(self):
# Initialize the poll consumer.
self.consumer = Consumer({'enable.auto.commit': False})

async def poll_for_event(self, topic) -> KafkaEventType:
self.consumer.subscribe(topic)

message = await self.consumer.poll()
return message

async def event_checkpointed(self, event: KafkaEventType) -> None:
self.consuemr.commit(event, asynchronous=False)
self.consumer.commit(event, asynchronous=False)


(Advanced) Event listener semantics
Expand Down
Loading

0 comments on commit b56aab4

Please sign in to comment.