Skip to content

Commit

Permalink
[core][experimental] AccDag: Support more than one tasks per actor (#…
Browse files Browse the repository at this point in the history
…44259)

Accelerated Dag currently does not support multiple methods within the same actor. This PR adds the support.

In this PR, the methods from the same actor are first grouped together, and then executed in the same execution loop based on their submission (binding) order.
Related issue number

Closes #44194

---------

Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
ruisearch42 authored May 8, 2024
1 parent f1d8862 commit f2b0d91
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 140 deletions.
8 changes: 8 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def _bind(
_generator_backpressure_num_objects=None,
):
from ray.dag.class_node import (
BIND_INDEX_KEY,
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
ClassMethodNode,
Expand All @@ -259,7 +260,9 @@ def _bind(
other_args_to_resolve = {
PARENT_CLASS_NODE_KEY: actor,
PREV_CLASS_METHOD_CALL_KEY: None,
BIND_INDEX_KEY: actor._ray_dag_bind_index,
}
actor._ray_dag_bind_index += 1

node = ClassMethodNode(
self._method_name,
Expand Down Expand Up @@ -1306,6 +1309,11 @@ def __init__(
actor_creation_function_descriptor
)
self._ray_function_descriptor = {}
# This is incremented each time `bind()` is called on an actor handle
# (in Ray DAGs), therefore capturing the bind order of the actor methods.
# TODO: this does not work properly if the caller has two copies of the
# same actor handle, and needs to be fixed.
self._ray_dag_bind_index = 0

if not self._ray_is_cross_language:
assert isinstance(
Expand Down
13 changes: 12 additions & 1 deletion python/ray/dag/class_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from ray.dag.dag_node import DAGNode
from ray.dag.input_node import InputNode
from ray.dag.format_utils import get_dag_node_str
from ray.dag.constants import PARENT_CLASS_NODE_KEY, PREV_CLASS_METHOD_CALL_KEY
from ray.dag.constants import (
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
BIND_INDEX_KEY,
)
from ray.util.annotations import DeveloperAPI

from typing import Any, Dict, List, Union, Tuple, Optional
Expand Down Expand Up @@ -156,6 +160,10 @@ def __init__(
self._prev_class_method_call: Optional[
ClassMethodNode
] = other_args_to_resolve.get(PREV_CLASS_METHOD_CALL_KEY, None)
# The index/order when bind() is called on this class method
self._bind_index: Optional[int] = other_args_to_resolve.get(
BIND_INDEX_KEY, None
)

# The actor creation task dependency is encoded as the first argument,
# and the ordering dependency as the second, which ensures they are
Expand Down Expand Up @@ -203,6 +211,9 @@ def __str__(self) -> str:
def get_method_name(self) -> str:
return self._method_name

def _get_bind_index(self) -> int:
return self._bind_index

def _get_remote_method(self, method_name):
method_body = getattr(self._parent_class_node, method_name)
return method_body
Expand Down
Loading

0 comments on commit f2b0d91

Please sign in to comment.