Skip to content

Commit

Permalink
[Workflow] Rename the argument of "workflow.get_output" (ray-project#…
Browse files Browse the repository at this point in the history
…26876)

* rename get_output

Signed-off-by: Siyuan Zhuang <[email protected]>

* update doc

Signed-off-by: Siyuan Zhuang <[email protected]>
Signed-off-by: Rohan138 <[email protected]>
  • Loading branch information
suquark authored and Rohan138 committed Jul 28, 2022
1 parent 1739274 commit 7c05145
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 38 deletions.
19 changes: 10 additions & 9 deletions doc/source/workflows/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ We can retrieve the results for individual workflow tasks too with *named tasks*
2) via decorator ``@workflow.options(name="task_name")``

If tasks are not given ``task_name``, the function name of the steps is set as the ``task_name``.
The ID of the task would be same as the name. If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.

Once a task is given a name, the result of the task will be retrievable via ``workflow.get_output(workflow_id, name="task_name")``. If the task with the given name hasn't been executed yet, an exception will be thrown. Here are some examples:
The suffix with a counter ``_n`` is a sequential number (1,2,3,...) of the tasks to be executed.
(Note that the first task does not have the suffix.)

Once a task is given a name, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_name")``.
If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples:

.. code-block:: python
Expand All @@ -153,17 +158,13 @@ Once a task is given a name, the result of the task will be retrievable via ``wo
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result_ref = workflow.run_async(outer_task, workflow_id="double")
inner = workflow.get_output_async(workflow_id, name="inner")
outer = workflow.get_output_async(workflow_id, name="outer")
inner = workflow.get_output_async(workflow_id, task_id="inner")
outer = workflow.get_output_async(workflow_id, task_id="outer")
assert ray.get(inner) == 2
assert ray.get(outer) == 4
assert ray.get(result_ref) == 4
If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.

The suffix with a counter ``_n`` is a sequential number (1,2,3,...) of the tasks to be executed.
(Note that the first task does not have the suffix.)
# TODO(suquark): make sure Ray DAG does not depend on Ray Serve and PyArrow.

Expand Down Expand Up @@ -192,9 +193,9 @@ For example,
x = simple.options(**workflow.options(name="step")).bind(x)
ret = workflow.run_async(x, workflow_id=workflow_id)
outputs = [workflow.get_output_async(workflow_id, name="step")]
outputs = [workflow.get_output_async(workflow_id, task_id="step")]
for i in range(1, n):
outputs.append(workflow.get_output_async(workflow_id, name=f"step_{i}"))
outputs.append(workflow.get_output_async(workflow_id, task_id=f"step_{i}"))
assert ray.get(ret) == n - 1
assert ray.get(outputs) == list(range(n))
Expand Down
16 changes: 9 additions & 7 deletions python/ray/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,19 @@ def get_output(workflow_id: str, *, name: Optional[str] = None) -> Any:
Returns:
The output of the workflow task.
"""
return ray.get(get_output_async(workflow_id, name=name))
return ray.get(get_output_async(workflow_id, task_id=name))


@PublicAPI(stability="beta")
def get_output_async(workflow_id: str, *, name: Optional[str] = None) -> ray.ObjectRef:
def get_output_async(
workflow_id: str, *, task_id: Optional[str] = None
) -> ray.ObjectRef:
"""Get the output of a running workflow asynchronously.
Args:
workflow_id: The workflow to get the output of.
name: If set, fetch the specific step instead of the output of the
workflow.
task_id: If set, fetch the specific task output instead of the output
of the workflow.
Returns:
An object reference that can be used to retrieve the workflow task result.
Expand All @@ -318,15 +320,15 @@ def get_output_async(workflow_id: str, *, name: Optional[str] = None) -> ray.Obj
try:
# check storage first
wf_store = WorkflowStorage(workflow_id)
tid = wf_store.inspect_output(name)
tid = wf_store.inspect_output(task_id)
if tid is not None:
return workflow_access.load_step_output_from_storage.remote(
workflow_id, name
workflow_id, task_id
)
except ValueError:
pass

return workflow_manager.get_output.remote(workflow_id, name)
return workflow_manager.get_output.remote(workflow_id, task_id)


@PublicAPI(stability="beta")
Expand Down
24 changes: 13 additions & 11 deletions python/ray/workflow/tests/test_basic_workflows_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def recursive(n):
workflow_id=workflow_id,
)

outputs = [workflow.get_output_async(workflow_id, name=str(i)) for i in range(11)]
outputs = [
workflow.get_output_async(workflow_id, task_id=str(i)) for i in range(11)
]
outputs.append(obj)

import time
Expand Down Expand Up @@ -161,8 +163,8 @@ def double(v):
inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result = workflow.run_async(outer_task, workflow_id="double")
inner = workflow.get_output_async("double", name="inner")
outer = workflow.get_output_async("double", name="outer")
inner = workflow.get_output_async("double", task_id="inner")
outer = workflow.get_output_async("double", task_id="outer")

assert ray.get(inner) == 2
assert ray.get(outer) == 4
Expand All @@ -178,8 +180,8 @@ def double_2(s):
workflow_id = "double_2"
result = workflow.run_async(outer_task, workflow_id=workflow_id)

inner = workflow.get_output_async(workflow_id, name="double")
outer = workflow.get_output_async(workflow_id, name="double_1")
inner = workflow.get_output_async(workflow_id, task_id="double")
outer = workflow.get_output_async(workflow_id, task_id="double_1")

assert ray.get(inner) == 2
assert ray.get(outer) == 4
Expand All @@ -199,8 +201,8 @@ def simple():
with FileLock(lock_path):
dag = simple.options(**workflow.options(name="simple")).bind()
ret = workflow.run_async(dag, workflow_id=workflow_id)
exist = workflow.get_output_async(workflow_id, name="simple")
non_exist = workflow.get_output_async(workflow_id, name="non_exist")
exist = workflow.get_output_async(workflow_id, task_id="simple")
non_exist = workflow.get_output_async(workflow_id, task_id="non_exist")

assert ray.get(ret) == "hello"
assert ray.get(exist) == "hello"
Expand Down Expand Up @@ -245,8 +247,8 @@ def double(v, lock=None):
workflow_id="double-2",
)

inner = workflow.get_output_async("double-2", name="inner")
outer = workflow.get_output_async("double-2", name="outer")
inner = workflow.get_output_async("double-2", task_id="inner")
outer = workflow.get_output_async("double-2", task_id="outer")

@ray.remote
def wait(obj_ref):
Expand All @@ -263,8 +265,8 @@ def wait(obj_ref):
lock.release()
assert [4, 2, 4] == ray.get([output, inner, outer])

inner = workflow.get_output_async("double-2", name="inner")
outer = workflow.get_output_async("double-2", name="outer")
inner = workflow.get_output_async("double-2", task_id="inner")
outer = workflow.get_output_async("double-2", task_id="outer")
assert [2, 4] == ray.get([inner, outer])


Expand Down
4 changes: 2 additions & 2 deletions python/ray/workflow/tests/test_basic_workflows_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def simple(x):

workflow_id = "test_task_id_generation"
ret = workflow.run_async(x, workflow_id=workflow_id)
outputs = [workflow.get_output_async(workflow_id, name="simple")]
outputs = [workflow.get_output_async(workflow_id, task_id="simple")]
for i in range(1, n):
outputs.append(workflow.get_output_async(workflow_id, name=f"simple_{i}"))
outputs.append(workflow.get_output_async(workflow_id, task_id=f"simple_{i}"))
assert ray.get(ret) == n - 1
assert ray.get(outputs) == list(range(n))

Expand Down
18 changes: 9 additions & 9 deletions python/ray/workflow/workflow_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,27 +263,27 @@ def list_non_terminating_workflows(self) -> Dict[WorkflowStatus, List[str]]:
return result

async def get_output(
self, workflow_id: str, name: Optional[TaskID]
self, workflow_id: str, task_id: Optional[TaskID]
) -> ray.ObjectRef:
"""Get the output of a running workflow.
Args:
workflow_id: The ID of a workflow job.
task_id: If set, fetch the specific task output instead of the output
of the workflow.
Returns:
An object reference that can be used to retrieve the
workflow result.
An object reference that can be used to retrieve the workflow result.
"""
# TODO(suquark): Use 'task_id' instead of 'name' for the API.
ref = None
if self.is_workflow_non_terminating(workflow_id):
executor = self._workflow_executors[workflow_id]
if name is None:
name = executor.output_task_id
workflow_ref = await executor.get_task_output_async(name)
name, ref = workflow_ref.task_id, workflow_ref.ref
if task_id is None:
task_id = executor.output_task_id
workflow_ref = await executor.get_task_output_async(task_id)
task_id, ref = workflow_ref.task_id, workflow_ref.ref
if ref is None:
ref = load_step_output_from_storage.remote(workflow_id, name)
ref = load_step_output_from_storage.remote(workflow_id, task_id)
return SelfResolvingObject(ref)

def ready(self) -> None:
Expand Down

0 comments on commit 7c05145

Please sign in to comment.