From 4467ce0427d01dbb45708f5e5d9863eeb6f8ac34 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 7 Jul 2023 09:34:49 -0700 Subject: [PATCH] [Doc] Make doc code snippet testable [6/n] (#36942) Change code snippet from ..code-block:: to ..testcode:: Signed-off-by: Jiajun Yao Signed-off-by: e428265 --- doc/BUILD | 14 +++- doc/source/workflows/advanced.rst | 9 +- doc/source/workflows/basics.rst | 96 +++++++++++++++------- doc/source/workflows/events.rst | 34 +++++--- doc/source/workflows/key-concepts.rst | 51 ++++++++---- doc/source/workflows/management.rst | 41 +++++++-- doc/source/workflows/metadata.rst | 47 +++++++---- python/ray/workflow/api.py | 2 +- python/ray/workflow/http_event_provider.py | 17 ++-- 9 files changed, 223 insertions(+), 88 deletions(-) diff --git a/doc/BUILD b/doc/BUILD index 1e4871ceef95..20998c3a831a 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -313,7 +313,9 @@ doctest( "source/serve/production-guide/fault-tolerance.md", "source/data/batch_inference.rst", "source/data/transforming-data.rst", - "source/train/faq.rst" + "source/train/faq.rst", + "source/workflows/**/*.rst", + "source/workflows/**/*.md" ] ), size = "large", @@ -321,6 +323,16 @@ doctest( ) +doctest( + name="doctest[workflow]", + files = glob( + include=[ + "source/workflows/**/*.rst", + "source/workflows/**/*.md" + ] + ), + tags = ["team:core"] +) doctest( files = [ diff --git a/doc/source/workflows/advanced.rst b/doc/source/workflows/advanced.rst index d5624a54177b..b5615268c819 100644 --- a/doc/source/workflows/advanced.rst +++ b/doc/source/workflows/advanced.rst @@ -8,7 +8,14 @@ Ray Workflows provides strong fault tolerance and exactly-once execution semanti Checkpoints can be skipped by specifying ``checkpoint=False``: -.. code-block:: python +.. testcode:: + + import ray + from ray import workflow + + @ray.remote + def read_data(num: int): + return [i for i in range(num)] data = read_data.options(**workflow.options(checkpoint=False)).bind(10) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index d875f8d4d12d..f551ac392981 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -13,7 +13,17 @@ Here is a single three-node DAG (note the use of ``.bind(...)`` instead of ``.remote(...)``). The DAG will not be executed until further actions are taken on it: -.. code-block:: python +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(num_gpus=1, storage=f"file://{temp_dir.name}") + +.. testcode:: from typing import List import ray @@ -46,7 +56,7 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``: Next, let's execute the DAG we defined and inspect the result: -.. code-block:: python +.. testcode:: # from ray import workflow @@ -59,6 +69,11 @@ Next, let's execute the DAG we defined and inspect the result: output_ref = workflow.run_async(output) print(ray.get(output_ref)) +.. testoutput:: + + 285 + 285 + Each node in the original DAG becomes a workflow task. You can think of workflow tasks as wrappers around Ray tasks that insert *checkpointing logic* to @@ -72,7 +87,7 @@ You can directly set Ray options to a workflow task just like a normal Ray remote function. To set workflow-specific options, use ``workflow.options`` either as a decorator or as kwargs to ``.options``: -.. code-block:: python +.. testcode:: import ray from ray import workflow @@ -91,7 +106,7 @@ Retrieving Workflow Results To retrieve a workflow result, assign ``workflow_id`` when running a workflow: -.. code-block:: python +.. testcode:: import ray from ray import workflow @@ -100,7 +115,7 @@ To retrieve a workflow result, assign ``workflow_id`` when running a workflow: # Cleanup previous workflows # An exception will be raised if it doesn't exist. workflow.delete("add_example") - except workflow.WorkflowNotFoundError: + except workflow.exceptions.WorkflowNotFoundError: pass @ray.remote @@ -113,18 +128,26 @@ To retrieve a workflow result, assign ``workflow_id`` when running a workflow: ret = add.bind(get_val.bind(), 20) - assert workflow.run(ret, workflow_id="add_example") == 30 + print(workflow.run(ret, workflow_id="add_example")) + +.. testoutput:: + + 30 The workflow results can be retrieved with ``workflow.get_output(workflow_id)``. If a workflow is not given a ``workflow_id``, a random string is set as the ``workflow_id``. To list all workflow ids, call ``ray.workflow.list_all()``. -.. code-block:: python +.. testcode:: - assert workflow.get_output("add_example") == 30 + print(workflow.get_output("add_example")) # "workflow.get_output_async" is an asynchronous version +.. testoutput:: + + 30 + Sub-Task Results ~~~~~~~~~~~~~~~~ @@ -139,7 +162,7 @@ If there are multiple tasks with the same id, a suffix with a counter ``_n`` wil Once a task id is given, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_id")``. If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples: -.. code-block:: python +.. testcode:: import ray from ray import workflow @@ -148,7 +171,7 @@ If the task with the given ``task_id`` hasn't been executed before the workflow try: # cleanup previous workflows workflow.delete(workflow_id) - except workflow.WorkflowNotFoundError: + except workflow.exceptions.WorkflowNotFoundError: pass @ray.remote @@ -180,7 +203,7 @@ Workflow provides two ways to handle application-level exceptions: (1) automatic ``max_retries`` and ``retry_exceptions`` are also Ray task options, so they should be used inside the Ray remote decorator. Here is how you could use them: -.. code-block:: python +.. testcode:: # specify in decorator @workflow.options(catch_exceptions=True) @@ -196,7 +219,7 @@ so they should be used inside the Ray remote decorator. Here is how you could us Here is one example: -.. code-block:: python +.. testcode:: from typing import Tuple import random @@ -212,7 +235,10 @@ Here is one example: # Tries up to five times before giving up. r1 = faulty_function.options(max_retries=5).bind() - workflow.run(r1) + try: + workflow.run(r1) + except ray.exceptions.RayTaskError: + pass @ray.remote def handle_errors(result: Tuple[str, Exception]): @@ -244,8 +270,10 @@ Failure model Note that tasks that have side effects still need to be idempotent. This is because the task could always fail before its result is logged. -.. code-block:: python - :caption: Non-idempotent workflow: +Non-idempotent workflow: + +.. testcode:: + :skipif: True @ray.remote def book_flight_unsafe() -> FlightTicket: @@ -256,8 +284,10 @@ Note that tasks that have side effects still need to be idempotent. This is beca # UNSAFE: we could book multiple flight tickets workflow.run(book_flight_unsafe.bind()) -.. code-block:: python - :caption: Idempotent workflow: +Idempotent workflow: + +.. testcode:: + :skipif: True @ray.remote def generate_id() -> str: @@ -282,7 +312,7 @@ Workflow tasks can be dynamically created in the runtime. In theory, Ray DAG is static which means a DAG node can't be returned in a DAG node. For example, the following code is invalid: -.. code-block:: python +.. testcode:: @ray.remote def bar(): ... @@ -291,12 +321,19 @@ following code is invalid: def foo(): return bar.bind() # This is invalid since Ray DAG is static - ray.get(foo.bind().execute()) # This will error + try: + ray.get(foo.bind().execute()) # This will error + except ray.exceptions.RayTaskError: + print("Ray DAG is static") + +.. testoutput:: + + Ray DAG is static Workflow introduces a utility function called ``workflow.continuation`` which makes Ray DAG node can return a DAG in the runtime: -.. code-block:: python +.. testcode:: @ray.remote def bar(): @@ -317,7 +354,7 @@ The dynamic workflow enables nesting, looping, and recursion within workflows. The following example shows how to implement the recursive ``factorial`` program using dynamically workflow: -.. code-block:: python +.. testcode:: @ray.remote def factorial(n: int) -> int: @@ -343,7 +380,8 @@ substituted for the task's return. To better understand dynamic workflows, let's look at a more realistic example of booking a trip: -.. code-block:: python +.. testcode:: + :skipif: True @ray.remote def book_flight(...) -> Flight: ... @@ -401,10 +439,10 @@ not resolved. But we ensure that all ancestors of a task are fully executed before the task starts which is different from passing them into a Ray remote function whether they have been executed or not is not defined. -.. code-block:: python +.. testcode:: @ray.remote - def add(values: List[ray.ObjectRef[int]]) -> int: + def add(values: List[ray.ObjectRef]) -> int: # although those values are not resolved, they have been # *fully executed and checkpointed*. This guarantees exactly-once # execution semantics. @@ -426,7 +464,7 @@ recoverability, their contents will be logged to durable storage before executing. However, an object will not be checkpointed more than once, even if it is passed to many different tasks. -.. code-block:: python +.. testcode:: @ray.remote def do_add(a, b): @@ -446,10 +484,10 @@ Setting custom resources for tasks You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, ``num_gpus``, and ``resources`` arguments that Ray tasks take): -.. code-block:: python +.. testcode:: - @ray.remote(num_gpus=1) - def train_model() -> Model: + @ray.remote + def train_model(): pass # This task is assigned to a GPU by Ray. - workflow.run(train_model.bind()) + workflow.run(train_model.options(num_gpus=1).bind()) diff --git a/doc/source/workflows/events.rst b/doc/source/workflows/events.rst index b4ff53243989..1e091b64e358 100644 --- a/doc/source/workflows/events.rst +++ b/doc/source/workflows/events.rst @@ -14,24 +14,34 @@ Using events Workflow events are a special type of workflow task. They "finish" when the event occurs. `workflow.wait_for_event(EventListenerType)` can be used to create an event task. +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") -.. code-block:: python + +.. testcode:: import time import ray from ray import workflow - # Create an event which finishes after 60 seconds. - event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 60) + # Create an event which finishes after 2 seconds. + event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2) - # Create another event which finishes after 30 seconds. - event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 30) + # Create another event which finishes after 1 seconds. + event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1) @ray.remote def gather(*args): return args - # Gather will run after 60 seconds when both event1 and event2 are done. + # Gather will run after 2 seconds when both event1 and event2 are done. workflow.run(gather.bind(event1_task, event2_task)) @@ -81,7 +91,9 @@ Custom event listeners Custom event listeners can be written by subclassing the EventListener interface. -.. code-block:: python +.. testcode:: + + from ray.workflow.common import Event class EventListener: def __init__(self): @@ -100,7 +112,7 @@ Custom event listeners can be written by subclassing the EventListener interface The `listener.poll_for_events()` coroutine should finish when the event is done. Arguments to `workflow.wait_for_event` are passed to `poll_for_events()`. For example, an event listener which sleeps until a timestamp can be written as: -.. code-block:: python +.. testcode:: class TimerListener(EventListener): async def poll_for_event(self, timestamp): @@ -116,17 +128,15 @@ The `event_checkpointed` routine can be overridden to support systems with exact After the workflow finishes checkpointing the event, the event listener will be invoked and can free the event. For example, to guarantee that events are consumed from a `kafkaesque` queue: -.. code-block:: python +.. testcode:: KafkaEventType = ... class QueueEventListener: - def __init__(self): # Initialize the poll consumer. self.consumer = Consumer({'enable.auto.commit': False}) - async def poll_for_event(self, topic) -> KafkaEventType: self.consumer.subscribe(topic) @@ -134,7 +144,7 @@ After the workflow finishes checkpointing the event, the event listener will be return message async def event_checkpointed(self, event: KafkaEventType) -> None: - self.consuemr.commit(event, asynchronous=False) + self.consumer.commit(event, asynchronous=False) (Advanced) Event listener semantics diff --git a/doc/source/workflows/key-concepts.rst b/doc/source/workflows/key-concepts.rst index de1892a5b849..233bd467f03c 100644 --- a/doc/source/workflows/key-concepts.rst +++ b/doc/source/workflows/key-concepts.rst @@ -19,8 +19,19 @@ Ray DAG nodes can otherwise be composed like normal Ray tasks. However, unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on DAG nodes. Instead, the DAG needs to be *executed* in order to compute a result. -.. code-block:: python - :caption: Composing functions together into a DAG: +Composing functions together into a DAG: + +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + +.. testcode:: import ray @@ -40,8 +51,7 @@ Workflow Execution To execute a DAG with workflows, use `workflow.run`: -.. code-block:: python - :caption: Executing a DAG with Ray Workflows. +.. testcode:: from ray import workflow @@ -61,14 +71,20 @@ When executing the workflow DAG, workflow tasks are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again. -.. code-block:: python - :caption: Getting the result of a workflow. +Getting the result of a workflow: + +.. testcode:: + :hide: + + ray.shutdown() + +.. testcode:: # configure the storage with "ray.init" or "ray start --head --storage=" # A default temporary storage is used by by the workflow if starting without # Ray init. ray.init(storage="/tmp/data") - assert output.run(workflow_id="run_1") == 101 + assert workflow.run(dag, workflow_id="run_1") == 101 assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL assert workflow.get_output("run_1") == 101 # workflow.get_output_async returns an ObjectRef. @@ -82,8 +98,9 @@ when initially returned from a task. After checkpointing, the object can be shared among any number of workflow tasks at memory-speed via the Ray object store. -.. code-block:: python - :caption: Using Ray objects in a workflow: +Using Ray objects in a workflow: + +.. testcode:: import ray from typing import List @@ -109,10 +126,11 @@ Dynamic Workflows Workflows can generate new tasks at runtime. This is achieved by returning a continuation of a DAG. A continuation is something returned by a function and executed after it returns. The continuation feature enables nesting, looping, -and recursion within workflows: +and recursion within workflows. + +The Fibonacci recursive workflow: -.. code-block:: python - :caption: The Fibonacci recursive workflow: +.. testcode:: @ray.remote def add(a: int, b: int) -> int: @@ -133,14 +151,15 @@ Events Events are external signals sent to the workflow. Workflows can be efficiently triggered by timers or external events using the event system. -.. code-block:: python - :caption: Using events. +.. testcode:: + + import time # Sleep is a special type of event. - sleep_task = workflow.sleep(100) + sleep_task = workflow.sleep(1) # `wait_for_events` allows for pluggable event listeners. - event_task = workflow.wait_for_event(MyEventListener) + event_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2) @ray.remote def gather(*args): diff --git a/doc/source/workflows/management.rst b/doc/source/workflows/management.rst index 750f12930166..bd93468a9cfc 100644 --- a/doc/source/workflows/management.rst +++ b/doc/source/workflows/management.rst @@ -27,17 +27,34 @@ SUCCESSFUL The workflow has been executed successfully. Single workflow management APIs ------------------------------- -.. code-block:: python +.. testcode:: + :hide: + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + +.. testcode:: + + import ray from ray import workflow + @ray.remote + def task(): + return 3 + + workflow.run(task.bind(), workflow_id="workflow_id") + # Get the status of a workflow. try: status = workflow.get_status(workflow_id="workflow_id") assert status in { "RUNNING", "RESUMABLE", "FAILED", "CANCELED", "SUCCESSFUL"} - except workflow.WorkflowNotFoundError: + except workflow.exceptions.WorkflowNotFoundError: print("Workflow doesn't exist.") # Resume a workflow. @@ -50,30 +67,38 @@ Single workflow management APIs # Delete the workflow. workflow.delete(workflow_id="workflow_id") +.. testoutput:: + + 3 + Bulk workflow management APIs ----------------------------- -.. code-block:: python +.. testcode:: # List all running workflows. print(workflow.list_all("RUNNING")) - # [("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")] # List RUNNING and CANCELED workflows. print(workflow.list_all({"RUNNING", "CANCELED"})) - # [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] # List all workflows. print(workflow.list_all()) - # [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] # Resume all resumable workflows. This won't include failed workflow print(workflow.resume_all()) - # [("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)] # To resume workflows including failed ones, use `include_failed=True` print(workflow.resume_all(include_failed=True)) - # [("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)] + +.. testoutput:: + :options: +MOCK + + [("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")] + [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] + [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] + [("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)] + [("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)] Recurring workflows ------------------- diff --git a/doc/source/workflows/metadata.rst b/doc/source/workflows/metadata.rst index e7dfee9aca2f..7540957da382 100644 --- a/doc/source/workflows/metadata.rst +++ b/doc/source/workflows/metadata.rst @@ -13,7 +13,20 @@ Retrieving metadata Workflow metadata can be retrieved with ``workflow.get_metadata(workflow_id)``. For example: -.. code-block:: python +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + +.. testcode:: + + import ray + from ray import workflow @ray.remote def add(left: int, right: int) -> int: @@ -30,7 +43,7 @@ For example: You can also retrieve metadata for individual workflow tasks by providing the task name: -.. code-block:: python +.. testcode:: workflow.run( add.options( @@ -51,7 +64,7 @@ workflow or workflow task. - workflow-level metadata can be added via ``.run(metadata=metadata)`` - task-level metadata can be added via ``.options(**workflow.options(metadata=metadata))`` or in the decorator ``@workflow.options(metadata=metadata)`` -.. code-block:: python +.. testcode:: workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20), workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"}) @@ -86,40 +99,42 @@ be available in the result if corresponding metadata is not available (e.g., ``metadata["stats"]["end_time"]`` won't be available until the workflow is completed). -.. code-block:: python +.. testcode:: + + import time @ray.remote def simple(): - flag.touch() # touch a file here time.sleep(1000) return 0 - workflow.run_async(simple.bind(), workflow_id=workflow_id) + workflow.run_async(simple.bind(), workflow_id="workflow_id") # make sure workflow task starts running - while not flag.exists(): - time.sleep(1) + time.sleep(2) - workflow_metadata = workflow.get_metadata(workflow_id) + workflow_metadata = workflow.get_metadata("workflow_id") assert workflow_metadata["status"] == "RUNNING" assert "start_time" in workflow_metadata["stats"] assert "end_time" not in workflow_metadata["stats"] - workflow.cancel(workflow_id) + workflow.cancel("workflow_id") - workflow_metadata = workflow.get_metadata(workflow_id) + workflow_metadata = workflow.get_metadata("workflow_id") assert workflow_metadata["status"] == "CANCELED" - assert "task_options" in workflow_metadata assert "start_time" in workflow_metadata["stats"] assert "end_time" not in workflow_metadata["stats"] 2. For resumed workflows, the current behavior is that "stats" will be updated whenever a workflow is resumed. -.. code-block:: python +.. testcode:: + + from pathlib import Path workflow_id = "simple" - error_flag = tmp_path / "error" + + error_flag = Path("error") error_flag.touch() @ray.remote @@ -128,8 +143,10 @@ be updated whenever a workflow is resumed. raise ValueError() return 0 - with pytest.raises(ray.exceptions.RaySystemError): + try: workflow.run(simple.bind(), workflow_id=workflow_id) + except ray.exceptions.RayTaskError: + pass workflow_metadata_failed = workflow.get_metadata(workflow_id) assert workflow_metadata_failed["status"] == "FAILED" diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index eae4af1801a6..f6bda3d26ce2 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -735,7 +735,7 @@ class options: Examples: - .. code-block:: python + .. testcode:: import ray from ray import workflow diff --git a/python/ray/workflow/http_event_provider.py b/python/ray/workflow/http_event_provider.py index f54d2e2221a4..3f958a774b2b 100644 --- a/python/ray/workflow/http_event_provider.py +++ b/python/ray/workflow/http_event_provider.py @@ -213,16 +213,23 @@ async def event_checkpointed(self, event) -> None: Example Usage ============= - .. code-block:: python + .. testcode:: - from ray.workflow.http_event_provider import HTTPEventProvider, HTTPListener + import tempfile + from ray import workflow + from ray.workflow.http_event_provider import HTTPListener - ray.init(address='auto', namespace='serve') + temp_dir = tempfile.TemporaryDirectory() + ray.init(storage=f"file://{temp_dir.name}") serve.start(detached=True) event_node = workflow.wait_for_event(HTTPListener, event_key='') - handle_event = ... - workflow.run(handle_event.bind(event_node)) + + @ray.remote + def handle_event(arg): + return arg + + workflow.run_async(handle_event.bind(event_node), workflow_id="http_listener") """ def __init__(self):