Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Revamp ray core design patterns doc [19/n]: async actor #33595

Merged
merged 7 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/source/ray-core/actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,3 @@ More about Ray Actors
actors/actor-utils.rst
actors/out-of-band-communication.rst
actors/task-orders.rst
actors/patterns/index.rst

This file was deleted.

16 changes: 0 additions & 16 deletions doc/source/ray-core/actors/patterns/index.rst

This file was deleted.

70 changes: 70 additions & 0 deletions doc/source/ray-core/doc_code/pattern_async_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# __sync_actor_start__
import ray


@ray.remote
class TaskStore:
def get_next_task(self):
return "task"


@ray.remote
class TaskExecutor:
def __init__(self, task_store):
self.task_store = task_store
self.num_executed_tasks = 0

def run(self):
while True:
task = ray.get(task_store.get_next_task.remote())
self._execute_task(task)

def _execute_task(self, task):
# Executing the task
self.num_executed_tasks = self.num_executed_tasks + 1

def get_num_executed_tasks(self):
return self.num_executed_tasks


task_store = TaskStore.remote()
task_executor = TaskExecutor.remote(task_store)
task_executor.run.remote()
try:
# This will timeout since task_executor.run occupies the entire actor thread
# and get_num_executed_tasks cannot run.
ray.get(task_executor.get_num_executed_tasks.remote(), timeout=5)
except ray.exceptions.GetTimeoutError:
print("get_num_executed_tasks didn't finish in 5 seconds")
# __sync_actor_end__


# __async_actor_start__
@ray.remote
class AsyncTaskExecutor:
def __init__(self, task_store):
self.task_store = task_store
self.num_executed_tasks = 0

async def run(self):
while True:
# Here we use await instead of ray.get() to
# wait for the next task and it will yield
# the control while waiting.
task = await task_store.get_next_task.remote()
self._execute_task(task)

def _execute_task(self, task):
# Executing the task
self.num_executed_tasks = self.num_executed_tasks + 1

def get_num_executed_tasks(self):
return self.num_executed_tasks


async_task_executor = AsyncTaskExecutor.remote(task_store)
async_task_executor.run.remote()
# We are able to run get_num_executed_tasks while run method is running.
num_executed_tasks = ray.get(async_task_executor.get_num_executed_tasks.remote())
print(f"num of executed tasks so far: {num_executed_tasks}")
# __async_actor_end__
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Pattern: Using asyncio to run actor methods concurrently
========================================================

By default, a Ray :ref:`actor <ray-remote-classes>` runs in a single thread and
actor method calls are executed sequentially. This means that a long running method call blocks all the following ones.
In this pattern, we use ``await`` to yield control from the long running method call so other method calls can run concurrently.
Normally the control is yielded when the method is doing IO operations but you can also use ``await asyncio.sleep(0)`` to yield control explicitly.

.. note::
You can also use :ref:`threaded actors <threaded-actors>` to achieve concurrency.

Example use case
----------------

You have an actor with a long polling method that continuously fetches tasks from the remote store and executes them.
You also want to query the number of tasks executed while the long polling method is running.

With the default actor, the code will look like this:

.. literalinclude:: ../doc_code/pattern_async_actor.py
:language: python
:start-after: __sync_actor_start__
:end-before: __sync_actor_end__

This is problematic because ``TaskExecutor.run`` method runs forever and never yeild the control to run other methods.
We can solve this problem by using :ref:`async actors <async-actors>` and use ``await`` to yield control:

.. literalinclude:: ../doc_code/pattern_async_actor.py
:language: python
:start-after: __async_actor_start__
:end-before: __async_actor_end__

Here, instead of using the blocking :func:`ray.get() <ray.get>` to get the value of an ObjectRef, we use ``await`` so it can yield the control while we are waiting for the object to be fetched.

1 change: 1 addition & 0 deletions doc/source/ray-core/patterns/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This section is a collection of common design patterns and anti-patterns for wri
generators
limit-pending-tasks
limit-running-tasks
concurrent-operations-async-actor
actor-sync
tree-of-actors
pipelining
Expand Down