diff --git a/doc/source/ray-core/actors.rst b/doc/source/ray-core/actors.rst index 08509634958e..6f34e8b42954 100644 --- a/doc/source/ray-core/actors.rst +++ b/doc/source/ray-core/actors.rst @@ -373,4 +373,3 @@ More about Ray Actors actors/actor-utils.rst actors/out-of-band-communication.rst actors/task-orders.rst - actors/patterns/index.rst diff --git a/doc/source/ray-core/actors/patterns/concurrent-operations-async-actor.rst b/doc/source/ray-core/actors/patterns/concurrent-operations-async-actor.rst deleted file mode 100644 index 62fbc2b51ac6..000000000000 --- a/doc/source/ray-core/actors/patterns/concurrent-operations-async-actor.rst +++ /dev/null @@ -1,79 +0,0 @@ -Pattern: Concurrent operations with async actor -=============================================== - -Sometimes, we'd like to have IO operations to other actors/tasks/components (e.g., DB) periodically within an actor (long polling). Imagine a process queue actor that needs to fetch data from other actors or DBs. - -This is problematic because actors are running within a single thread. One of the solutions is to use a background thread within an actor, but you can also achieve this by using Ray's async actors APIs. - -Let's see why it is difficult by looking at an example. - -Code example ------------- - -.. code-block:: python - - @ray.remote - class LongPollingActor: - def __init__(self, data_store_actor): - self.data_store_actor = data_store_actor - - def run(self): - while True: - data = ray.get(self.data_store_actor.fetch.remote()) - self._process(data) - - def other_task(self): - return True - - def _process(self, data): - # Do process here... - pass - -There are 2 issues here. - -1) Since a long polling actor has a run method that runs forever with while True, it cannot run any other actor task (because the thread is occupied by the while loop). That says - -.. code-block:: python - - l = LongPollingActor.remote(data_store_actor) - # Actor runs a while loop - l.run.remote() - # This won't be processed forever because the actor thread is occupied by the run method. - ray.get(l.other_task.remote()) - -2) Since we need to call :ref:`ray.get within a loop `, the loop is blocked until ray.get returns (it is because ``ray.get`` is a blocking API). - -We can make this better if we use Ray's async APIs. Here is a documentation about ray's async APIs and async actors. - -First, let's create an async actor. - -.. code-block:: python - - @ray.remote - class LongPollingActorAsync: - def __init__(self, data_store_actor): - self.data_store_actor = data_store_actor - - async def run(self): - while True: - # Coroutine will switch context when "await" is called. - data = await self.data_store_actor.fetch.remote() - self._process(data) - - def _process(self, data): - pass - - async def other_task(self): - return True - -Now, it will work if you run the same code we used before. - -.. code-block:: python - - l = LongPollingActorAsync.remote(data_store_actor) - l.run.remote() - ray.get(l.other_task.remote()) - -Now, let's learn why this works. When an actor contains async methods, the actor will be converted to async actors. This means all the ray's tasks will run as a coroutine. That says, when it meets the ``await`` keyword, the actor will switch to a different coroutine, which is a coroutine that runs ``other_task`` method. - -You can implement interesting actors using this pattern. Note that it is also possible to switch context easily if you use await ``asyncio.sleep(0)`` without any delay. diff --git a/doc/source/ray-core/actors/patterns/index.rst b/doc/source/ray-core/actors/patterns/index.rst deleted file mode 100644 index 86be341848e3..000000000000 --- a/doc/source/ray-core/actors/patterns/index.rst +++ /dev/null @@ -1,16 +0,0 @@ -.. _actor-patterns: - -Actor Design Patterns -===================== - -This section is a collection of common design patterns (and anti-patterns) for Ray actors. It is meant as a handbook for both: - -- New users trying to understand how to get started with Ray, and -- Advanced users trying to optimize their use of Ray actors - -You may also be interested in visiting the design patterns section for :ref:`tasks `. - -.. toctree:: - :maxdepth: -1 - - concurrent-operations-async-actor diff --git a/doc/source/ray-core/doc_code/pattern_async_actor.py b/doc/source/ray-core/doc_code/pattern_async_actor.py new file mode 100644 index 000000000000..b45198c6049c --- /dev/null +++ b/doc/source/ray-core/doc_code/pattern_async_actor.py @@ -0,0 +1,70 @@ +# __sync_actor_start__ +import ray + + +@ray.remote +class TaskStore: + def get_next_task(self): + return "task" + + +@ray.remote +class TaskExecutor: + def __init__(self, task_store): + self.task_store = task_store + self.num_executed_tasks = 0 + + def run(self): + while True: + task = ray.get(task_store.get_next_task.remote()) + self._execute_task(task) + + def _execute_task(self, task): + # Executing the task + self.num_executed_tasks = self.num_executed_tasks + 1 + + def get_num_executed_tasks(self): + return self.num_executed_tasks + + +task_store = TaskStore.remote() +task_executor = TaskExecutor.remote(task_store) +task_executor.run.remote() +try: + # This will timeout since task_executor.run occupies the entire actor thread + # and get_num_executed_tasks cannot run. + ray.get(task_executor.get_num_executed_tasks.remote(), timeout=5) +except ray.exceptions.GetTimeoutError: + print("get_num_executed_tasks didn't finish in 5 seconds") +# __sync_actor_end__ + + +# __async_actor_start__ +@ray.remote +class AsyncTaskExecutor: + def __init__(self, task_store): + self.task_store = task_store + self.num_executed_tasks = 0 + + async def run(self): + while True: + # Here we use await instead of ray.get() to + # wait for the next task and it will yield + # the control while waiting. + task = await task_store.get_next_task.remote() + self._execute_task(task) + + def _execute_task(self, task): + # Executing the task + self.num_executed_tasks = self.num_executed_tasks + 1 + + def get_num_executed_tasks(self): + return self.num_executed_tasks + + +async_task_executor = AsyncTaskExecutor.remote(task_store) +async_task_executor.run.remote() +# We are able to run get_num_executed_tasks while run method is running. +num_executed_tasks = ray.get(async_task_executor.get_num_executed_tasks.remote()) +print(f"num of executed tasks so far: {num_executed_tasks}") +# __async_actor_end__ diff --git a/doc/source/ray-core/patterns/concurrent-operations-async-actor.rst b/doc/source/ray-core/patterns/concurrent-operations-async-actor.rst new file mode 100644 index 000000000000..0c1162f07301 --- /dev/null +++ b/doc/source/ray-core/patterns/concurrent-operations-async-actor.rst @@ -0,0 +1,34 @@ +Pattern: Using asyncio to run actor methods concurrently +======================================================== + +By default, a Ray :ref:`actor ` runs in a single thread and +actor method calls are executed sequentially. This means that a long running method call blocks all the following ones. +In this pattern, we use ``await`` to yield control from the long running method call so other method calls can run concurrently. +Normally the control is yielded when the method is doing IO operations but you can also use ``await asyncio.sleep(0)`` to yield control explicitly. + +.. note:: + You can also use :ref:`threaded actors ` to achieve concurrency. + +Example use case +---------------- + +You have an actor with a long polling method that continuously fetches tasks from the remote store and executes them. +You also want to query the number of tasks executed while the long polling method is running. + +With the default actor, the code will look like this: + +.. literalinclude:: ../doc_code/pattern_async_actor.py + :language: python + :start-after: __sync_actor_start__ + :end-before: __sync_actor_end__ + +This is problematic because ``TaskExecutor.run`` method runs forever and never yield the control to run other methods. +We can solve this problem by using :ref:`async actors ` and use ``await`` to yield control: + +.. literalinclude:: ../doc_code/pattern_async_actor.py + :language: python + :start-after: __async_actor_start__ + :end-before: __async_actor_end__ + +Here, instead of using the blocking :func:`ray.get() ` to get the value of an ObjectRef, we use ``await`` so it can yield the control while we are waiting for the object to be fetched. + diff --git a/doc/source/ray-core/patterns/index.rst b/doc/source/ray-core/patterns/index.rst index c1b8aeb78d07..c94eb0aa44b9 100644 --- a/doc/source/ray-core/patterns/index.rst +++ b/doc/source/ray-core/patterns/index.rst @@ -12,6 +12,7 @@ This section is a collection of common design patterns and anti-patterns for wri generators limit-pending-tasks limit-running-tasks + concurrent-operations-async-actor actor-sync tree-of-actors pipelining