From 7046ef80ba3d0cbbf550d7c0bcf0e6b70b21c546 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 28 Jun 2023 22:33:56 -0700 Subject: [PATCH 01/22] [Doc] Make doc code snippet testable [6/n] Signed-off-by: Jiajun Yao --- doc/source/workflows/advanced.rst | 9 ++- doc/source/workflows/basics.rst | 81 +++++++++++++++------- doc/source/workflows/events.rst | 21 +++--- doc/source/workflows/key-concepts.rst | 41 ++++++----- doc/source/workflows/management.rst | 31 ++++++--- doc/source/workflows/metadata.rst | 37 ++++++---- python/ray/workflow/api.py | 2 +- python/ray/workflow/http_event_provider.py | 3 +- 8 files changed, 148 insertions(+), 77 deletions(-) diff --git a/doc/source/workflows/advanced.rst b/doc/source/workflows/advanced.rst index d5624a54177b..b5615268c819 100644 --- a/doc/source/workflows/advanced.rst +++ b/doc/source/workflows/advanced.rst @@ -8,7 +8,14 @@ Ray Workflows provides strong fault tolerance and exactly-once execution semanti Checkpoints can be skipped by specifying ``checkpoint=False``: -.. code-block:: python +.. testcode:: + + import ray + from ray import workflow + + @ray.remote + def read_data(num: int): + return [i for i in range(num)] data = read_data.options(**workflow.options(checkpoint=False)).bind(10) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index d875f8d4d12d..eb951c2aab77 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -13,7 +13,7 @@ Here is a single three-node DAG (note the use of ``.bind(...)`` instead of ``.remote(...)``). The DAG will not be executed until further actions are taken on it: -.. code-block:: python +.. testcode:: from typing import List import ray @@ -46,7 +46,7 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``: Next, let's execute the DAG we defined and inspect the result: -.. code-block:: python +.. testcode:: # from ray import workflow @@ -59,6 +59,11 @@ Next, let's execute the DAG we defined and inspect the result: output_ref = workflow.run_async(output) print(ray.get(output_ref)) +.. testoutput:: + + 285 + 285 + Each node in the original DAG becomes a workflow task. You can think of workflow tasks as wrappers around Ray tasks that insert *checkpointing logic* to @@ -72,7 +77,7 @@ You can directly set Ray options to a workflow task just like a normal Ray remote function. To set workflow-specific options, use ``workflow.options`` either as a decorator or as kwargs to ``.options``: -.. code-block:: python +.. testcode:: import ray from ray import workflow @@ -91,7 +96,7 @@ Retrieving Workflow Results To retrieve a workflow result, assign ``workflow_id`` when running a workflow: -.. code-block:: python +.. testcode:: import ray from ray import workflow @@ -100,7 +105,7 @@ To retrieve a workflow result, assign ``workflow_id`` when running a workflow: # Cleanup previous workflows # An exception will be raised if it doesn't exist. workflow.delete("add_example") - except workflow.WorkflowNotFoundError: + except workflow.exceptions.WorkflowNotFoundError: pass @ray.remote @@ -113,18 +118,26 @@ To retrieve a workflow result, assign ``workflow_id`` when running a workflow: ret = add.bind(get_val.bind(), 20) - assert workflow.run(ret, workflow_id="add_example") == 30 + print(workflow.run(ret, workflow_id="add_example")) + +.. testoutput:: + + 30 The workflow results can be retrieved with ``workflow.get_output(workflow_id)``. If a workflow is not given a ``workflow_id``, a random string is set as the ``workflow_id``. To list all workflow ids, call ``ray.workflow.list_all()``. -.. code-block:: python +.. testcode:: - assert workflow.get_output("add_example") == 30 + print(workflow.get_output("add_example")) # "workflow.get_output_async" is an asynchronous version +.. testoutput:: + + 30 + Sub-Task Results ~~~~~~~~~~~~~~~~ @@ -139,7 +152,7 @@ If there are multiple tasks with the same id, a suffix with a counter ``_n`` wil Once a task id is given, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_id")``. If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples: -.. code-block:: python +.. testcode:: import ray from ray import workflow @@ -148,7 +161,7 @@ If the task with the given ``task_id`` hasn't been executed before the workflow try: # cleanup previous workflows workflow.delete(workflow_id) - except workflow.WorkflowNotFoundError: + except workflow.exceptions.WorkflowNotFoundError: pass @ray.remote @@ -180,7 +193,7 @@ Workflow provides two ways to handle application-level exceptions: (1) automatic ``max_retries`` and ``retry_exceptions`` are also Ray task options, so they should be used inside the Ray remote decorator. Here is how you could use them: -.. code-block:: python +.. testcode:: # specify in decorator @workflow.options(catch_exceptions=True) @@ -196,7 +209,7 @@ so they should be used inside the Ray remote decorator. Here is how you could us Here is one example: -.. code-block:: python +.. testcode:: from typing import Tuple import random @@ -212,7 +225,10 @@ Here is one example: # Tries up to five times before giving up. r1 = faulty_function.options(max_retries=5).bind() - workflow.run(r1) + try: + workflow.run(r1) + except ray.exceptions.RayTaskError: + pass @ray.remote def handle_errors(result: Tuple[str, Exception]): @@ -244,8 +260,10 @@ Failure model Note that tasks that have side effects still need to be idempotent. This is because the task could always fail before its result is logged. -.. code-block:: python - :caption: Non-idempotent workflow: +Non-idempotent workflow: + +.. testcode:: + :skipif: True @ray.remote def book_flight_unsafe() -> FlightTicket: @@ -256,8 +274,10 @@ Note that tasks that have side effects still need to be idempotent. This is beca # UNSAFE: we could book multiple flight tickets workflow.run(book_flight_unsafe.bind()) -.. code-block:: python - :caption: Idempotent workflow: +Idempotent workflow: + +.. testcode:: + :skipif: True @ray.remote def generate_id() -> str: @@ -282,7 +302,7 @@ Workflow tasks can be dynamically created in the runtime. In theory, Ray DAG is static which means a DAG node can't be returned in a DAG node. For example, the following code is invalid: -.. code-block:: python +.. testcode:: @ray.remote def bar(): ... @@ -291,12 +311,19 @@ following code is invalid: def foo(): return bar.bind() # This is invalid since Ray DAG is static - ray.get(foo.bind().execute()) # This will error + try: + ray.get(foo.bind().execute()) # This will error + except ray.exceptions.RayTaskError: + print("Ray DAG is static") + +.. testoutput:: + + Ray DAG is static Workflow introduces a utility function called ``workflow.continuation`` which makes Ray DAG node can return a DAG in the runtime: -.. code-block:: python +.. testcode:: @ray.remote def bar(): @@ -317,7 +344,7 @@ The dynamic workflow enables nesting, looping, and recursion within workflows. The following example shows how to implement the recursive ``factorial`` program using dynamically workflow: -.. code-block:: python +.. testcode:: @ray.remote def factorial(n: int) -> int: @@ -343,7 +370,8 @@ substituted for the task's return. To better understand dynamic workflows, let's look at a more realistic example of booking a trip: -.. code-block:: python +.. testcode:: + :skipif: True @ray.remote def book_flight(...) -> Flight: ... @@ -401,10 +429,10 @@ not resolved. But we ensure that all ancestors of a task are fully executed before the task starts which is different from passing them into a Ray remote function whether they have been executed or not is not defined. -.. code-block:: python +.. testcode:: @ray.remote - def add(values: List[ray.ObjectRef[int]]) -> int: + def add(values: List[ray.ObjectRef]) -> int: # although those values are not resolved, they have been # *fully executed and checkpointed*. This guarantees exactly-once # execution semantics. @@ -426,7 +454,7 @@ recoverability, their contents will be logged to durable storage before executing. However, an object will not be checkpointed more than once, even if it is passed to many different tasks. -.. code-block:: python +.. testcode:: @ray.remote def do_add(a, b): @@ -446,7 +474,8 @@ Setting custom resources for tasks You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, ``num_gpus``, and ``resources`` arguments that Ray tasks take): -.. code-block:: python +.. testcode:: + :skipif: True @ray.remote(num_gpus=1) def train_model() -> Model: diff --git a/doc/source/workflows/events.rst b/doc/source/workflows/events.rst index b4ff53243989..f9838ace5546 100644 --- a/doc/source/workflows/events.rst +++ b/doc/source/workflows/events.rst @@ -15,23 +15,23 @@ Using events Workflow events are a special type of workflow task. They "finish" when the event occurs. `workflow.wait_for_event(EventListenerType)` can be used to create an event task. -.. code-block:: python +.. testcode:: import time import ray from ray import workflow - # Create an event which finishes after 60 seconds. - event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 60) + # Create an event which finishes after 2 seconds. + event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2) - # Create another event which finishes after 30 seconds. - event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 30) + # Create another event which finishes after 1 seconds. + event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1) @ray.remote def gather(*args): return args - # Gather will run after 60 seconds when both event1 and event2 are done. + # Gather will run after 2 seconds when both event1 and event2 are done. workflow.run(gather.bind(event1_task, event2_task)) @@ -81,7 +81,9 @@ Custom event listeners Custom event listeners can be written by subclassing the EventListener interface. -.. code-block:: python +.. testcode:: + + from ray.workflow.common import Event class EventListener: def __init__(self): @@ -100,7 +102,7 @@ Custom event listeners can be written by subclassing the EventListener interface The `listener.poll_for_events()` coroutine should finish when the event is done. Arguments to `workflow.wait_for_event` are passed to `poll_for_events()`. For example, an event listener which sleeps until a timestamp can be written as: -.. code-block:: python +.. testcode:: class TimerListener(EventListener): async def poll_for_event(self, timestamp): @@ -116,7 +118,8 @@ The `event_checkpointed` routine can be overridden to support systems with exact After the workflow finishes checkpointing the event, the event listener will be invoked and can free the event. For example, to guarantee that events are consumed from a `kafkaesque` queue: -.. code-block:: python +.. testcode:: + :skipif: True KafkaEventType = ... diff --git a/doc/source/workflows/key-concepts.rst b/doc/source/workflows/key-concepts.rst index de1892a5b849..d4af5f128162 100644 --- a/doc/source/workflows/key-concepts.rst +++ b/doc/source/workflows/key-concepts.rst @@ -19,8 +19,9 @@ Ray DAG nodes can otherwise be composed like normal Ray tasks. However, unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on DAG nodes. Instead, the DAG needs to be *executed* in order to compute a result. -.. code-block:: python - :caption: Composing functions together into a DAG: +Composing functions together into a DAG: + +.. testcode:: import ray @@ -40,8 +41,7 @@ Workflow Execution To execute a DAG with workflows, use `workflow.run`: -.. code-block:: python - :caption: Executing a DAG with Ray Workflows. +.. testcode:: from ray import workflow @@ -61,14 +61,20 @@ When executing the workflow DAG, workflow tasks are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again. -.. code-block:: python - :caption: Getting the result of a workflow. +Getting the result of a workflow: + +.. testcode:: + :hide: + + ray.shutdown() + +.. testcode:: # configure the storage with "ray.init" or "ray start --head --storage=" # A default temporary storage is used by by the workflow if starting without # Ray init. ray.init(storage="/tmp/data") - assert output.run(workflow_id="run_1") == 101 + assert workflow.run(dag, workflow_id="run_1") == 101 assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL assert workflow.get_output("run_1") == 101 # workflow.get_output_async returns an ObjectRef. @@ -82,8 +88,9 @@ when initially returned from a task. After checkpointing, the object can be shared among any number of workflow tasks at memory-speed via the Ray object store. -.. code-block:: python - :caption: Using Ray objects in a workflow: +Using Ray objects in a workflow: + +.. testcode:: import ray from typing import List @@ -109,10 +116,11 @@ Dynamic Workflows Workflows can generate new tasks at runtime. This is achieved by returning a continuation of a DAG. A continuation is something returned by a function and executed after it returns. The continuation feature enables nesting, looping, -and recursion within workflows: +and recursion within workflows. + +The Fibonacci recursive workflow: -.. code-block:: python - :caption: The Fibonacci recursive workflow: +.. testcode:: @ray.remote def add(a: int, b: int) -> int: @@ -133,14 +141,15 @@ Events Events are external signals sent to the workflow. Workflows can be efficiently triggered by timers or external events using the event system. -.. code-block:: python - :caption: Using events. +.. testcode:: + + import time # Sleep is a special type of event. - sleep_task = workflow.sleep(100) + sleep_task = workflow.sleep(1) # `wait_for_events` allows for pluggable event listeners. - event_task = workflow.wait_for_event(MyEventListener) + event_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2) @ray.remote def gather(*args): diff --git a/doc/source/workflows/management.rst b/doc/source/workflows/management.rst index 750f12930166..6abb65464553 100644 --- a/doc/source/workflows/management.rst +++ b/doc/source/workflows/management.rst @@ -27,17 +27,24 @@ SUCCESSFUL The workflow has been executed successfully. Single workflow management APIs ------------------------------- -.. code-block:: python +.. testcode:: + import ray from ray import workflow + @ray.remote + def task(): + return 3 + + workflow.run(task.bind(), workflow_id="workflow_id") + # Get the status of a workflow. try: status = workflow.get_status(workflow_id="workflow_id") assert status in { "RUNNING", "RESUMABLE", "FAILED", "CANCELED", "SUCCESSFUL"} - except workflow.WorkflowNotFoundError: + except workflow.exceptions.WorkflowNotFoundError: print("Workflow doesn't exist.") # Resume a workflow. @@ -50,30 +57,38 @@ Single workflow management APIs # Delete the workflow. workflow.delete(workflow_id="workflow_id") +.. testoutput:: + + 3 + Bulk workflow management APIs ----------------------------- -.. code-block:: python +.. testcode:: # List all running workflows. print(workflow.list_all("RUNNING")) - # [("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")] # List RUNNING and CANCELED workflows. print(workflow.list_all({"RUNNING", "CANCELED"})) - # [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] # List all workflows. print(workflow.list_all()) - # [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] # Resume all resumable workflows. This won't include failed workflow print(workflow.resume_all()) - # [("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)] # To resume workflows including failed ones, use `include_failed=True` print(workflow.resume_all(include_failed=True)) - # [("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)] + +.. testoutput:: + :options: +MOCK + + [("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")] + [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] + [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] + [("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)] + [("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)] Recurring workflows ------------------- diff --git a/doc/source/workflows/metadata.rst b/doc/source/workflows/metadata.rst index e7dfee9aca2f..62c1732c935e 100644 --- a/doc/source/workflows/metadata.rst +++ b/doc/source/workflows/metadata.rst @@ -13,7 +13,10 @@ Retrieving metadata Workflow metadata can be retrieved with ``workflow.get_metadata(workflow_id)``. For example: -.. code-block:: python +.. testcode:: + + import ray + from ray import workflow @ray.remote def add(left: int, right: int) -> int: @@ -30,7 +33,7 @@ For example: You can also retrieve metadata for individual workflow tasks by providing the task name: -.. code-block:: python +.. testcode:: workflow.run( add.options( @@ -51,7 +54,7 @@ workflow or workflow task. - workflow-level metadata can be added via ``.run(metadata=metadata)`` - task-level metadata can be added via ``.options(**workflow.options(metadata=metadata))`` or in the decorator ``@workflow.options(metadata=metadata)`` -.. code-block:: python +.. testcode:: workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20), workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"}) @@ -86,40 +89,42 @@ be available in the result if corresponding metadata is not available (e.g., ``metadata["stats"]["end_time"]`` won't be available until the workflow is completed). -.. code-block:: python +.. testcode:: + + import time @ray.remote def simple(): - flag.touch() # touch a file here time.sleep(1000) return 0 - workflow.run_async(simple.bind(), workflow_id=workflow_id) + workflow.run_async(simple.bind(), workflow_id="workflow_id") # make sure workflow task starts running - while not flag.exists(): - time.sleep(1) + time.sleep(2) - workflow_metadata = workflow.get_metadata(workflow_id) + workflow_metadata = workflow.get_metadata("workflow_id") assert workflow_metadata["status"] == "RUNNING" assert "start_time" in workflow_metadata["stats"] assert "end_time" not in workflow_metadata["stats"] - workflow.cancel(workflow_id) + workflow.cancel("workflow_id") - workflow_metadata = workflow.get_metadata(workflow_id) + workflow_metadata = workflow.get_metadata("workflow_id") assert workflow_metadata["status"] == "CANCELED" - assert "task_options" in workflow_metadata assert "start_time" in workflow_metadata["stats"] assert "end_time" not in workflow_metadata["stats"] 2. For resumed workflows, the current behavior is that "stats" will be updated whenever a workflow is resumed. -.. code-block:: python +.. testcode:: + + from pathlib import Path workflow_id = "simple" - error_flag = tmp_path / "error" + + error_flag = Path("error") error_flag.touch() @ray.remote @@ -128,8 +133,10 @@ be updated whenever a workflow is resumed. raise ValueError() return 0 - with pytest.raises(ray.exceptions.RaySystemError): + try: workflow.run(simple.bind(), workflow_id=workflow_id) + except ray.exceptions.RayTaskError: + pass workflow_metadata_failed = workflow.get_metadata(workflow_id) assert workflow_metadata_failed["status"] == "FAILED" diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index eae4af1801a6..f6bda3d26ce2 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -735,7 +735,7 @@ class options: Examples: - .. code-block:: python + .. testcode:: import ray from ray import workflow diff --git a/python/ray/workflow/http_event_provider.py b/python/ray/workflow/http_event_provider.py index f54d2e2221a4..a5757c5589cb 100644 --- a/python/ray/workflow/http_event_provider.py +++ b/python/ray/workflow/http_event_provider.py @@ -213,7 +213,8 @@ async def event_checkpointed(self, event) -> None: Example Usage ============= - .. code-block:: python + .. testcode:: + :skipif: True from ray.workflow.http_event_provider import HTTPEventProvider, HTTPListener From 43785bc3eae85f4e8af90949a5ccb46f1b5b0a2a Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 29 Jun 2023 11:39:36 -0700 Subject: [PATCH 02/22] up Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index eb951c2aab77..29970c609722 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -46,6 +46,11 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``: Next, let's execute the DAG we defined and inspect the result: +.. testcode:: + :hide: + + ray.shutdown() + .. testcode:: # From 0c33973e58db18d80660367a21b5ee14bd877599 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 29 Jun 2023 20:51:06 -0700 Subject: [PATCH 03/22] up Signed-off-by: Jiajun Yao --- doc/source/workflows/management.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/workflows/management.rst b/doc/source/workflows/management.rst index 6abb65464553..61597c0bbc70 100644 --- a/doc/source/workflows/management.rst +++ b/doc/source/workflows/management.rst @@ -65,6 +65,7 @@ Bulk workflow management APIs ----------------------------- .. testcode:: + :skipif: True # List all running workflows. print(workflow.list_all("RUNNING")) From f6841a422f9e84ef1119ff764a568ffeb5a1ac8c Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 29 Jun 2023 22:05:59 -0700 Subject: [PATCH 04/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/management.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/workflows/management.rst b/doc/source/workflows/management.rst index 61597c0bbc70..a4ae78c30aae 100644 --- a/doc/source/workflows/management.rst +++ b/doc/source/workflows/management.rst @@ -28,6 +28,7 @@ Single workflow management APIs ------------------------------- .. testcode:: + :skipif: True import ray from ray import workflow @@ -83,7 +84,6 @@ Bulk workflow management APIs print(workflow.resume_all(include_failed=True)) .. testoutput:: - :options: +MOCK [("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")] [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] From f75c1e87d968c1e6b25455f3a2235c9f17bea22c Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 30 Jun 2023 06:41:15 -0700 Subject: [PATCH 05/22] up Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 15 ++++++++++----- doc/source/workflows/events.rst | 10 ++++++++++ doc/source/workflows/key-concepts.rst | 10 ++++++++++ doc/source/workflows/management.rst | 13 +++++++++++-- doc/source/workflows/metadata.rst | 10 ++++++++++ 5 files changed, 51 insertions(+), 7 deletions(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 29970c609722..2d5d5985afd7 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -13,6 +13,16 @@ Here is a single three-node DAG (note the use of ``.bind(...)`` instead of ``.remote(...)``). The DAG will not be executed until further actions are taken on it: +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + .. testcode:: from typing import List @@ -46,11 +56,6 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``: Next, let's execute the DAG we defined and inspect the result: -.. testcode:: - :hide: - - ray.shutdown() - .. testcode:: # diff --git a/doc/source/workflows/events.rst b/doc/source/workflows/events.rst index f9838ace5546..ff4be78dfd61 100644 --- a/doc/source/workflows/events.rst +++ b/doc/source/workflows/events.rst @@ -14,6 +14,16 @@ Using events Workflow events are a special type of workflow task. They "finish" when the event occurs. `workflow.wait_for_event(EventListenerType)` can be used to create an event task. +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + .. testcode:: diff --git a/doc/source/workflows/key-concepts.rst b/doc/source/workflows/key-concepts.rst index d4af5f128162..233bd467f03c 100644 --- a/doc/source/workflows/key-concepts.rst +++ b/doc/source/workflows/key-concepts.rst @@ -21,6 +21,16 @@ DAG nodes. Instead, the DAG needs to be *executed* in order to compute a result. Composing functions together into a DAG: +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + .. testcode:: import ray diff --git a/doc/source/workflows/management.rst b/doc/source/workflows/management.rst index a4ae78c30aae..bd93468a9cfc 100644 --- a/doc/source/workflows/management.rst +++ b/doc/source/workflows/management.rst @@ -28,7 +28,16 @@ Single workflow management APIs ------------------------------- .. testcode:: - :skipif: True + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + +.. testcode:: import ray from ray import workflow @@ -66,7 +75,6 @@ Bulk workflow management APIs ----------------------------- .. testcode:: - :skipif: True # List all running workflows. print(workflow.list_all("RUNNING")) @@ -84,6 +92,7 @@ Bulk workflow management APIs print(workflow.resume_all(include_failed=True)) .. testoutput:: + :options: +MOCK [("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")] [("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")] diff --git a/doc/source/workflows/metadata.rst b/doc/source/workflows/metadata.rst index 62c1732c935e..7540957da382 100644 --- a/doc/source/workflows/metadata.rst +++ b/doc/source/workflows/metadata.rst @@ -13,6 +13,16 @@ Retrieving metadata Workflow metadata can be retrieved with ``workflow.get_metadata(workflow_id)``. For example: +.. testcode:: + :hide: + + import tempfile + import ray + + temp_dir = tempfile.TemporaryDirectory() + + ray.init(storage=f"file://{temp_dir.name}") + .. testcode:: import ray From 72c4d046aea759df3527bf5318e94a2db268b52e Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 30 Jun 2023 21:24:20 -0700 Subject: [PATCH 06/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 2d5d5985afd7..e5b93ff38c6a 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -19,6 +19,8 @@ taken on it: import tempfile import ray + ray.shutdown() + temp_dir = tempfile.TemporaryDirectory() ray.init(storage=f"file://{temp_dir.name}") From 6729cf300b309df9efa0cd3d0f66a750c3ec355b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sat, 1 Jul 2023 21:37:42 -0700 Subject: [PATCH 07/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index e5b93ff38c6a..09ce792f9766 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -26,6 +26,7 @@ taken on it: ray.init(storage=f"file://{temp_dir.name}") .. testcode:: + :skipif: True from typing import List import ray @@ -59,6 +60,7 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``: Next, let's execute the DAG we defined and inspect the result: .. testcode:: + :skipif: True # from ray import workflow @@ -90,6 +92,7 @@ Ray remote function. To set workflow-specific options, use ``workflow.options`` either as a decorator or as kwargs to ``.options``: .. testcode:: + :skipif: True import ray from ray import workflow @@ -109,6 +112,7 @@ Retrieving Workflow Results To retrieve a workflow result, assign ``workflow_id`` when running a workflow: .. testcode:: + :skipif: True import ray from ray import workflow @@ -142,6 +146,7 @@ The workflow results can be retrieved with workflow ids, call ``ray.workflow.list_all()``. .. testcode:: + :skipif: True print(workflow.get_output("add_example")) # "workflow.get_output_async" is an asynchronous version @@ -165,6 +170,7 @@ Once a task id is given, the result of the task will be retrievable via ``workfl If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples: .. testcode:: + :skipif: True import ray from ray import workflow @@ -206,6 +212,7 @@ Workflow provides two ways to handle application-level exceptions: (1) automatic so they should be used inside the Ray remote decorator. Here is how you could use them: .. testcode:: + :skipif: True # specify in decorator @workflow.options(catch_exceptions=True) @@ -222,6 +229,7 @@ so they should be used inside the Ray remote decorator. Here is how you could us Here is one example: .. testcode:: + :skipif: True from typing import Tuple import random @@ -315,6 +323,7 @@ static which means a DAG node can't be returned in a DAG node. For example, the following code is invalid: .. testcode:: + :skipif: True @ray.remote def bar(): ... @@ -336,6 +345,7 @@ Workflow introduces a utility function called ``workflow.continuation`` which makes Ray DAG node can return a DAG in the runtime: .. testcode:: + :skipif: True @ray.remote def bar(): @@ -357,6 +367,7 @@ The following example shows how to implement the recursive ``factorial`` program using dynamically workflow: .. testcode:: + :skipif: True @ray.remote def factorial(n: int) -> int: @@ -442,6 +453,7 @@ before the task starts which is different from passing them into a Ray remote function whether they have been executed or not is not defined. .. testcode:: + :skipif: True @ray.remote def add(values: List[ray.ObjectRef]) -> int: @@ -467,6 +479,7 @@ executing. However, an object will not be checkpointed more than once, even if it is passed to many different tasks. .. testcode:: + :skipif: True @ray.remote def do_add(a, b): From 649acf299615ac95aa3d090b9f374998b4fb9392 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sat, 1 Jul 2023 22:37:31 -0700 Subject: [PATCH 08/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 09ce792f9766..c57fb2631180 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -21,9 +21,11 @@ taken on it: ray.shutdown() - temp_dir = tempfile.TemporaryDirectory() + #ray.init() - ray.init(storage=f"file://{temp_dir.name}") + #temp_dir = tempfile.TemporaryDirectory() + + #ray.init(storage=f"file://{temp_dir.name}") .. testcode:: :skipif: True From 1f224d6d416acd4c4712fc0dbd3de9eb3ba1a0b6 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 07:35:43 -0700 Subject: [PATCH 09/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index c57fb2631180..ce24b0b3fd56 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -21,7 +21,7 @@ taken on it: ray.shutdown() - #ray.init() + ray.init() #temp_dir = tempfile.TemporaryDirectory() From ec0803504725624506952594d2d4e0dfeae257c6 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 10:38:19 -0700 Subject: [PATCH 10/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index ce24b0b3fd56..07167e11de12 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -21,6 +21,8 @@ taken on it: ray.shutdown() + assert not ray.is_initialized() + ray.init() #temp_dir = tempfile.TemporaryDirectory() From b840d70096d29d65d72fe7a819c5e6f886391e37 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 11:52:41 -0700 Subject: [PATCH 11/22] try Signed-off-by: Jiajun Yao --- .buildkite/pipeline.build.yml | 2 +- doc/BUILD | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.buildkite/pipeline.build.yml b/.buildkite/pipeline.build.yml index 33ddb6100dce..94d7898bc9ad 100644 --- a/.buildkite/pipeline.build.yml +++ b/.buildkite/pipeline.build.yml @@ -272,7 +272,7 @@ - ./ci/env/env_info.sh - bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=doctest,-gpu - python/ray/... doc/... + doc/... - label: ":python: Ray on Spark Test" conditions: ["RAY_CI_PYTHON_AFFECTED"] diff --git a/doc/BUILD b/doc/BUILD index 1e4871ceef95..d29649ec3b3e 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -298,7 +298,7 @@ py_test_run_all_subdirectory( doctest( files = glob( - include=["source/**/*.rst", "source/**/*.md"], + include=["source/workflows/basics.rst"], exclude=[ "source/ray-contribute/getting-involved.rst", "source/ray-core/handling-dependencies.rst", From 71d8cb22b508c8a1deb8beeca2b0cd9e6a5d35d4 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 12:36:21 -0700 Subject: [PATCH 12/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 07167e11de12..1b16cb4d4bc3 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -21,7 +21,7 @@ taken on it: ray.shutdown() - assert not ray.is_initialized() + assert ray.is_initialized() ray.init() From da00699bdd15222d3cec2bee486b6cdab0a4d179 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 13:35:33 -0700 Subject: [PATCH 13/22] try Signed-off-by: Jiajun Yao --- doc/BUILD | 2 +- doc/source/workflows/basics.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/BUILD b/doc/BUILD index d29649ec3b3e..1e4871ceef95 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -298,7 +298,7 @@ py_test_run_all_subdirectory( doctest( files = glob( - include=["source/workflows/basics.rst"], + include=["source/**/*.rst", "source/**/*.md"], exclude=[ "source/ray-contribute/getting-involved.rst", "source/ray-core/handling-dependencies.rst", diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 1b16cb4d4bc3..07167e11de12 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -21,7 +21,7 @@ taken on it: ray.shutdown() - assert ray.is_initialized() + assert not ray.is_initialized() ray.init() From a3c3ec8b1e74c909bffde106b4c4ab351d55e011 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 13:39:06 -0700 Subject: [PATCH 14/22] try Signed-off-by: Jiajun Yao --- doc/source/workflows/basics.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 07167e11de12..ab4e034e0d2b 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -23,7 +23,7 @@ taken on it: assert not ray.is_initialized() - ray.init() + ray.init(address="local") #temp_dir = tempfile.TemporaryDirectory() From 21c8766c8190a5d731a02a7c8b9701d19f32a8cd Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 16:49:14 -0700 Subject: [PATCH 15/22] try Signed-off-by: Jiajun Yao --- doc/BUILD | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/doc/BUILD b/doc/BUILD index 1e4871ceef95..79629c74251c 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -313,7 +313,8 @@ doctest( "source/serve/production-guide/fault-tolerance.md", "source/data/batch_inference.rst", "source/data/transforming-data.rst", - "source/train/faq.rst" + "source/train/faq.rst", + "source/workflows/basics.rst" ] ), size = "large", @@ -321,6 +322,12 @@ doctest( ) +doctest( + files = [ + "source/workflows/basics.rst", + ], + tags = ["team:core"] +) doctest( files = [ From 22cf7b1afc99f0c2ffe50df256e9eac6e25a4f8a Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 21:26:26 -0700 Subject: [PATCH 16/22] try Signed-off-by: Jiajun Yao --- doc/BUILD | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/doc/BUILD b/doc/BUILD index 79629c74251c..4458d6e83341 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -324,9 +324,10 @@ doctest( doctest( files = [ - "source/workflows/basics.rst", + "source/workflows/basics.rst" ], - tags = ["team:core"] + size = "large", + tags = ["team:none"] ) doctest( From 90a001cc7cd1201937e7dfde10b919f9caa54fdd Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 5 Jul 2023 22:20:39 -0700 Subject: [PATCH 17/22] try Signed-off-by: Jiajun Yao --- doc/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/BUILD b/doc/BUILD index 4458d6e83341..16a779eff073 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -323,6 +323,7 @@ doctest( doctest( + name="doctest [jjyao]", files = [ "source/workflows/basics.rst" ], From 8391b38377aa301f9146ed16086704e297618092 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 6 Jul 2023 08:56:43 -0700 Subject: [PATCH 18/22] try Signed-off-by: Jiajun Yao --- doc/BUILD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/BUILD b/doc/BUILD index 16a779eff073..854c052838a9 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -323,7 +323,7 @@ doctest( doctest( - name="doctest [jjyao]", + name="doctest_jjyao", files = [ "source/workflows/basics.rst" ], From 7fcda33d688a3801d067cdd9afc3e93e5d7a7713 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 6 Jul 2023 11:05:26 -0700 Subject: [PATCH 19/22] try Signed-off-by: Jiajun Yao --- doc/BUILD | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/doc/BUILD b/doc/BUILD index 854c052838a9..ae595e5d460e 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -314,7 +314,8 @@ doctest( "source/data/batch_inference.rst", "source/data/transforming-data.rst", "source/train/faq.rst", - "source/workflows/basics.rst" + "source/workflows/basics.rst", + "source/workflows/events.rst" ] ), size = "large", @@ -325,7 +326,8 @@ doctest( doctest( name="doctest_jjyao", files = [ - "source/workflows/basics.rst" + "source/workflows/basics.rst", + "source/workflows/events.rst" ], size = "large", tags = ["team:none"] From 66be467db5d435d6943b6c7791a73eabb69c01b3 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 6 Jul 2023 13:20:41 -0700 Subject: [PATCH 20/22] fix Signed-off-by: Jiajun Yao --- doc/BUILD | 15 ++++++--------- doc/source/workflows/basics.rst | 23 ++--------------------- 2 files changed, 8 insertions(+), 30 deletions(-) diff --git a/doc/BUILD b/doc/BUILD index ae595e5d460e..c9ba115b1648 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -314,8 +314,7 @@ doctest( "source/data/batch_inference.rst", "source/data/transforming-data.rst", "source/train/faq.rst", - "source/workflows/basics.rst", - "source/workflows/events.rst" + "source/workflows/*.rst" ] ), size = "large", @@ -324,13 +323,11 @@ doctest( doctest( - name="doctest_jjyao", - files = [ - "source/workflows/basics.rst", - "source/workflows/events.rst" - ], - size = "large", - tags = ["team:none"] + name="doctest[workflow]", + files = glob( + include=["source/workflows/*.rst"] + ), + tags = ["team:core"] ) doctest( diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index ab4e034e0d2b..2d5d5985afd7 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -19,18 +19,11 @@ taken on it: import tempfile import ray - ray.shutdown() + temp_dir = tempfile.TemporaryDirectory() - assert not ray.is_initialized() - - ray.init(address="local") - - #temp_dir = tempfile.TemporaryDirectory() - - #ray.init(storage=f"file://{temp_dir.name}") + ray.init(storage=f"file://{temp_dir.name}") .. testcode:: - :skipif: True from typing import List import ray @@ -64,7 +57,6 @@ We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``: Next, let's execute the DAG we defined and inspect the result: .. testcode:: - :skipif: True # from ray import workflow @@ -96,7 +88,6 @@ Ray remote function. To set workflow-specific options, use ``workflow.options`` either as a decorator or as kwargs to ``.options``: .. testcode:: - :skipif: True import ray from ray import workflow @@ -116,7 +107,6 @@ Retrieving Workflow Results To retrieve a workflow result, assign ``workflow_id`` when running a workflow: .. testcode:: - :skipif: True import ray from ray import workflow @@ -150,7 +140,6 @@ The workflow results can be retrieved with workflow ids, call ``ray.workflow.list_all()``. .. testcode:: - :skipif: True print(workflow.get_output("add_example")) # "workflow.get_output_async" is an asynchronous version @@ -174,7 +163,6 @@ Once a task id is given, the result of the task will be retrievable via ``workfl If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples: .. testcode:: - :skipif: True import ray from ray import workflow @@ -216,7 +204,6 @@ Workflow provides two ways to handle application-level exceptions: (1) automatic so they should be used inside the Ray remote decorator. Here is how you could use them: .. testcode:: - :skipif: True # specify in decorator @workflow.options(catch_exceptions=True) @@ -233,7 +220,6 @@ so they should be used inside the Ray remote decorator. Here is how you could us Here is one example: .. testcode:: - :skipif: True from typing import Tuple import random @@ -327,7 +313,6 @@ static which means a DAG node can't be returned in a DAG node. For example, the following code is invalid: .. testcode:: - :skipif: True @ray.remote def bar(): ... @@ -349,7 +334,6 @@ Workflow introduces a utility function called ``workflow.continuation`` which makes Ray DAG node can return a DAG in the runtime: .. testcode:: - :skipif: True @ray.remote def bar(): @@ -371,7 +355,6 @@ The following example shows how to implement the recursive ``factorial`` program using dynamically workflow: .. testcode:: - :skipif: True @ray.remote def factorial(n: int) -> int: @@ -457,7 +440,6 @@ before the task starts which is different from passing them into a Ray remote function whether they have been executed or not is not defined. .. testcode:: - :skipif: True @ray.remote def add(values: List[ray.ObjectRef]) -> int: @@ -483,7 +465,6 @@ executing. However, an object will not be checkpointed more than once, even if it is passed to many different tasks. .. testcode:: - :skipif: True @ray.remote def do_add(a, b): From adb1c75a4b28f0e5f689a904eada7d97220166b2 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 6 Jul 2023 15:50:58 -0700 Subject: [PATCH 21/22] up Signed-off-by: Jiajun Yao --- .buildkite/pipeline.build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/pipeline.build.yml b/.buildkite/pipeline.build.yml index 94d7898bc9ad..33ddb6100dce 100644 --- a/.buildkite/pipeline.build.yml +++ b/.buildkite/pipeline.build.yml @@ -272,7 +272,7 @@ - ./ci/env/env_info.sh - bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=doctest,-gpu - doc/... + python/ray/... doc/... - label: ":python: Ray on Spark Test" conditions: ["RAY_CI_PYTHON_AFFECTED"] From 867c75ef2d887a34b9ca4ba9c220f5133f8d3535 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 6 Jul 2023 22:16:14 -0700 Subject: [PATCH 22/22] comments Signed-off-by: Jiajun Yao --- doc/BUILD | 8 ++++++-- doc/source/workflows/basics.rst | 9 ++++----- doc/source/workflows/events.rst | 5 +---- python/ray/workflow/http_event_provider.py | 16 +++++++++++----- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/doc/BUILD b/doc/BUILD index c9ba115b1648..20998c3a831a 100644 --- a/doc/BUILD +++ b/doc/BUILD @@ -314,7 +314,8 @@ doctest( "source/data/batch_inference.rst", "source/data/transforming-data.rst", "source/train/faq.rst", - "source/workflows/*.rst" + "source/workflows/**/*.rst", + "source/workflows/**/*.md" ] ), size = "large", @@ -325,7 +326,10 @@ doctest( doctest( name="doctest[workflow]", files = glob( - include=["source/workflows/*.rst"] + include=[ + "source/workflows/**/*.rst", + "source/workflows/**/*.md" + ] ), tags = ["team:core"] ) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index 2d5d5985afd7..f551ac392981 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -21,7 +21,7 @@ taken on it: temp_dir = tempfile.TemporaryDirectory() - ray.init(storage=f"file://{temp_dir.name}") + ray.init(num_gpus=1, storage=f"file://{temp_dir.name}") .. testcode:: @@ -485,10 +485,9 @@ Setting custom resources for tasks You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, ``num_gpus``, and ``resources`` arguments that Ray tasks take): .. testcode:: - :skipif: True - @ray.remote(num_gpus=1) - def train_model() -> Model: + @ray.remote + def train_model(): pass # This task is assigned to a GPU by Ray. - workflow.run(train_model.bind()) + workflow.run(train_model.options(num_gpus=1).bind()) diff --git a/doc/source/workflows/events.rst b/doc/source/workflows/events.rst index ff4be78dfd61..1e091b64e358 100644 --- a/doc/source/workflows/events.rst +++ b/doc/source/workflows/events.rst @@ -129,17 +129,14 @@ After the workflow finishes checkpointing the event, the event listener will be .. testcode:: - :skipif: True KafkaEventType = ... class QueueEventListener: - def __init__(self): # Initialize the poll consumer. self.consumer = Consumer({'enable.auto.commit': False}) - async def poll_for_event(self, topic) -> KafkaEventType: self.consumer.subscribe(topic) @@ -147,7 +144,7 @@ After the workflow finishes checkpointing the event, the event listener will be return message async def event_checkpointed(self, event: KafkaEventType) -> None: - self.consuemr.commit(event, asynchronous=False) + self.consumer.commit(event, asynchronous=False) (Advanced) Event listener semantics diff --git a/python/ray/workflow/http_event_provider.py b/python/ray/workflow/http_event_provider.py index a5757c5589cb..3f958a774b2b 100644 --- a/python/ray/workflow/http_event_provider.py +++ b/python/ray/workflow/http_event_provider.py @@ -214,16 +214,22 @@ async def event_checkpointed(self, event) -> None: ============= .. testcode:: - :skipif: True - from ray.workflow.http_event_provider import HTTPEventProvider, HTTPListener + import tempfile + from ray import workflow + from ray.workflow.http_event_provider import HTTPListener - ray.init(address='auto', namespace='serve') + temp_dir = tempfile.TemporaryDirectory() + ray.init(storage=f"file://{temp_dir.name}") serve.start(detached=True) event_node = workflow.wait_for_event(HTTPListener, event_key='') - handle_event = ... - workflow.run(handle_event.bind(event_node)) + + @ray.remote + def handle_event(arg): + return arg + + workflow.run_async(handle_event.bind(event_node), workflow_id="http_listener") """ def __init__(self):