Skip to content

Commit

Permalink
[Doc] Revamp ray core design patterns doc [19/n]: async actor (#33595)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Mar 28, 2023
1 parent 38e1afc commit 3ad40a4
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 96 deletions.
1 change: 0 additions & 1 deletion doc/source/ray-core/actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,3 @@ More about Ray Actors
actors/actor-utils.rst
actors/out-of-band-communication.rst
actors/task-orders.rst
actors/patterns/index.rst

This file was deleted.

16 changes: 0 additions & 16 deletions doc/source/ray-core/actors/patterns/index.rst

This file was deleted.

70 changes: 70 additions & 0 deletions doc/source/ray-core/doc_code/pattern_async_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# __sync_actor_start__
import ray


@ray.remote
class TaskStore:
def get_next_task(self):
return "task"


@ray.remote
class TaskExecutor:
def __init__(self, task_store):
self.task_store = task_store
self.num_executed_tasks = 0

def run(self):
while True:
task = ray.get(task_store.get_next_task.remote())
self._execute_task(task)

def _execute_task(self, task):
# Executing the task
self.num_executed_tasks = self.num_executed_tasks + 1

def get_num_executed_tasks(self):
return self.num_executed_tasks


task_store = TaskStore.remote()
task_executor = TaskExecutor.remote(task_store)
task_executor.run.remote()
try:
# This will timeout since task_executor.run occupies the entire actor thread
# and get_num_executed_tasks cannot run.
ray.get(task_executor.get_num_executed_tasks.remote(), timeout=5)
except ray.exceptions.GetTimeoutError:
print("get_num_executed_tasks didn't finish in 5 seconds")
# __sync_actor_end__


# __async_actor_start__
@ray.remote
class AsyncTaskExecutor:
def __init__(self, task_store):
self.task_store = task_store
self.num_executed_tasks = 0

async def run(self):
while True:
# Here we use await instead of ray.get() to
# wait for the next task and it will yield
# the control while waiting.
task = await task_store.get_next_task.remote()
self._execute_task(task)

def _execute_task(self, task):
# Executing the task
self.num_executed_tasks = self.num_executed_tasks + 1

def get_num_executed_tasks(self):
return self.num_executed_tasks


async_task_executor = AsyncTaskExecutor.remote(task_store)
async_task_executor.run.remote()
# We are able to run get_num_executed_tasks while run method is running.
num_executed_tasks = ray.get(async_task_executor.get_num_executed_tasks.remote())
print(f"num of executed tasks so far: {num_executed_tasks}")
# __async_actor_end__
34 changes: 34 additions & 0 deletions doc/source/ray-core/patterns/concurrent-operations-async-actor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Pattern: Using asyncio to run actor methods concurrently
========================================================

By default, a Ray :ref:`actor <ray-remote-classes>` runs in a single thread and
actor method calls are executed sequentially. This means that a long running method call blocks all the following ones.
In this pattern, we use ``await`` to yield control from the long running method call so other method calls can run concurrently.
Normally the control is yielded when the method is doing IO operations but you can also use ``await asyncio.sleep(0)`` to yield control explicitly.

.. note::
You can also use :ref:`threaded actors <threaded-actors>` to achieve concurrency.

Example use case
----------------

You have an actor with a long polling method that continuously fetches tasks from the remote store and executes them.
You also want to query the number of tasks executed while the long polling method is running.

With the default actor, the code will look like this:

.. literalinclude:: ../doc_code/pattern_async_actor.py
:language: python
:start-after: __sync_actor_start__
:end-before: __sync_actor_end__

This is problematic because ``TaskExecutor.run`` method runs forever and never yield the control to run other methods.
We can solve this problem by using :ref:`async actors <async-actors>` and use ``await`` to yield control:

.. literalinclude:: ../doc_code/pattern_async_actor.py
:language: python
:start-after: __async_actor_start__
:end-before: __async_actor_end__

Here, instead of using the blocking :func:`ray.get() <ray.get>` to get the value of an ObjectRef, we use ``await`` so it can yield the control while we are waiting for the object to be fetched.

1 change: 1 addition & 0 deletions doc/source/ray-core/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This section is a collection of common design patterns and anti-patterns for wri
generators
limit-pending-tasks
limit-running-tasks
concurrent-operations-async-actor
actor-sync
tree-of-actors
pipelining
Expand Down

0 comments on commit 3ad40a4

Please sign in to comment.