Skip to content

Commit

Permalink
[Core][aDag] Support multi node multi reader (ray-project#47480)
Browse files Browse the repository at this point in the history
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.
  • Loading branch information
rkooo567 authored Sep 10, 2024
1 parent aa7179a commit 57136b5
Show file tree
Hide file tree
Showing 19 changed files with 491 additions and 325 deletions.
53 changes: 27 additions & 26 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ from ray.includes.libcoreworker cimport (
CFiberEvent,
CActorHandle,
CGeneratorBackpressureWaiter,
CReaderRefInfo,
)

from ray.includes.ray_config cimport RayConfig
Expand Down Expand Up @@ -3645,37 +3646,37 @@ cdef class CoreWorker:

def experimental_channel_register_writer(self,
ObjectRef writer_ref,
ObjectRef reader_ref,
writer_node,
reader_node,
ActorID reader,
int64_t num_readers):
remote_reader_ref_info):
cdef:
CObjectID c_writer_ref = writer_ref.native()
CObjectID c_reader_ref = reader_ref.native()
CNodeID c_reader_node = CNodeID.FromHex(reader_node)
CNodeID *c_reader_node_id = NULL
CActorID c_reader_actor = reader.native()

if num_readers == 0:
return
if writer_node != reader_node:
c_reader_node_id = &c_reader_node
c_vector[CNodeID] c_remote_reader_nodes
c_vector[CReaderRefInfo] c_remote_reader_ref_info
CReaderRefInfo c_reader_ref_info

for node_id, reader_ref_info in remote_reader_ref_info.items():
c_reader_ref_info = CReaderRefInfo()
c_reader_ref_info.reader_ref_id = (
<ObjectRef>reader_ref_info.reader_ref).native()
c_reader_ref_info.owner_reader_actor_id = (
<ActorID>reader_ref_info.ref_owner_actor_id).native()
num_reader_actors = reader_ref_info.num_reader_actors
assert num_reader_actors != 0
c_reader_ref_info.num_reader_actors = num_reader_actors
c_remote_reader_ref_info.push_back(c_reader_ref_info)
c_remote_reader_nodes.push_back(CNodeID.FromHex(node_id))

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.ExperimentalRegisterMutableObjectWriter(c_writer_ref,
c_reader_node_id,
))
if writer_node != reader_node:
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker()
.ExperimentalRegisterMutableObjectReaderRemote(c_writer_ref,
c_reader_actor,
num_readers,
c_reader_ref
))
.ExperimentalRegisterMutableObjectWriter(
c_writer_ref,
c_remote_reader_nodes,
))
check_status(
CCoreWorkerProcess.GetCoreWorker()
.ExperimentalRegisterMutableObjectReaderRemote(
c_writer_ref,
c_remote_reader_ref_info,
))

def experimental_channel_register_reader(self, ObjectRef object_ref):
cdef:
Expand Down
2 changes: 0 additions & 2 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ def do_exec_tasks(
if done:
break
for operation in schedule:
print("SANG-TODO operation: ", operation)
done = tasks[operation.exec_task_idx].exec_operation(
self, operation.type
)
Expand Down Expand Up @@ -1835,7 +1834,6 @@ def teardown(self, wait: bool):
return

logger.info("Tearing down compiled DAG")

outer._dag_submitter.close()
outer._dag_output_fetcher.close()

Expand Down
170 changes: 100 additions & 70 deletions python/ray/dag/tests/experimental/test_multi_node_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import time
import pytest
from ray.dag import InputNode, MultiOutputNode
import ray.remote_function
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray.tests.conftest import * # noqa
from ray.tests.conftest import wait_for_condition

if sys.platform != "linux" and sys.platform != "darwin":
pytest.skip("Skipping, requires Linux or Mac.", allow_module_level=True)
Expand Down Expand Up @@ -69,100 +71,79 @@ def test_readers_on_different_nodes(ray_start_cluster):
cluster = ray_start_cluster
# This node is for the driver (including the CompiledDAG.DAGDriverProxyActor) and
# one of the readers.
first_node_handle = cluster.add_node(num_cpus=2)
# This node is for the other reader.
second_node_handle = cluster.add_node(num_cpus=1)
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
# 2 more nodes for other readers.
cluster.add_node(num_cpus=1)
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()
# Wait until nodes actually start, otherwise the code below will fail.
wait_for_condition(lambda: len(ray.nodes()) == 3)

nodes = [first_node_handle.node_id, second_node_handle.node_id]
# We want to check that the readers are on different nodes. Thus, we convert `nodes`
# to a set and then back to a list to remove duplicates. Then we check that the
# length of `nodes` is 2.
nodes = list(set(nodes))
assert len(nodes) == 2

def create_actor(node):
return Actor.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False)
).remote(0)

a = create_actor(nodes[0])
b = create_actor(nodes[1])
actors = [a, b]
a = Actor.options(num_cpus=1).remote(0)
b = Actor.options(num_cpus=1).remote(0)
c = Actor.options(num_cpus=1).remote(0)
actors = [a, b, c]

def _get_node_id(self) -> "ray.NodeID":
return ray.get_runtime_context().get_node_id()

nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors])
a_node = nodes_check[0]
b_node = nodes_check[1]
assert a_node != b_node
node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors])
assert len(set(node_ids)) == 3

with InputNode() as inp:
x = a.inc.bind(inp)
y = b.inc.bind(inp)
dag = MultiOutputNode([x, y])
z = c.inc.bind(inp)
dag = MultiOutputNode([x, y, z])

with pytest.raises(
ValueError,
match="All reader actors must be on the same node.*",
):
dag.experimental_compile()
adag = dag.experimental_compile()

for i in range(1, 10):
assert ray.get(adag.execute(1)) == [i, i, i]

adag.teardown()


def test_bunch_readers_on_different_nodes(ray_start_cluster):
cluster = ray_start_cluster
# This node is for the driver (including the CompiledDAG.DAGDriverProxyActor) and
# two of the readers.
first_node_handle = cluster.add_node(num_cpus=3)
# This node is for the other two readers.
second_node_handle = cluster.add_node(num_cpus=2)
ACTORS_PER_NODE = 2
NUM_REMOTE_NODES = 2
# driver node
cluster.add_node(num_cpus=ACTORS_PER_NODE)
ray.init(address=cluster.address)
# additional nodes for multi readers in multi nodes
for _ in range(NUM_REMOTE_NODES):
cluster.add_node(num_cpus=ACTORS_PER_NODE)
cluster.wait_for_nodes()

nodes = [first_node_handle.node_id, second_node_handle.node_id]
# We want to check that the readers are on different nodes. Thus, we convert `nodes`
# to a set and then back to a list to remove duplicates. Then we check that the
# length of `nodes` is 2.
nodes = list(set(nodes))
assert len(nodes) == 2

def create_actor(node):
return Actor.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False)
).remote(0)
wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1)

a = create_actor(nodes[0])
b = create_actor(nodes[0])
c = create_actor(nodes[1])
d = create_actor(nodes[1])
actors = [a, b, c, d]
actors = [
Actor.options(num_cpus=1).remote(0)
for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))
]

def _get_node_id(self) -> "ray.NodeID":
return ray.get_runtime_context().get_node_id()

nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors])
a_node = nodes_check[0]
b_node = nodes_check[1]
c_node = nodes_check[2]
d_node = nodes_check[3]
assert a_node == b_node
assert b_node != c_node
assert c_node == d_node
node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors])
assert len(set(node_ids)) == NUM_REMOTE_NODES + 1

with InputNode() as inp:
w = a.inc.bind(inp)
x = b.inc.bind(inp)
y = c.inc.bind(inp)
z = d.inc.bind(inp)
dag = MultiOutputNode([w, x, y, z])
outputs = []
for actor in actors:
outputs.append(actor.inc.bind(inp))
dag = MultiOutputNode(outputs)

adag = dag.experimental_compile()

for i in range(1, 10):
assert ray.get(adag.execute(1)) == [
i for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))
]

with pytest.raises(
ValueError,
match="All reader actors must be on the same node.*",
):
dag.experimental_compile()
adag.teardown()


def test_pp(ray_start_cluster):
Expand Down Expand Up @@ -247,13 +228,10 @@ def get_node_id(self):

compiled_dag = dag.experimental_compile()

# Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that
# is a bit larger.
size = GRPC_MAX_SIZE + (1024 * 1024 * 2)
val = b"x" * size

for i in range(3):
print(f"{i} iteration")
ref = compiled_dag.execute(val)
result = ray.get(ref)
assert result == val
Expand All @@ -263,6 +241,58 @@ def get_node_id(self):
compiled_dag.teardown()


@pytest.mark.parametrize("num_actors", [1, 4])
@pytest.mark.parametrize("num_nodes", [1, 4])
def test_multi_node_multi_reader_large_payload(
ray_start_cluster, num_actors, num_nodes, monkeypatch
):
# Set max grpc size to 5mb.
GRPC_MAX_SIZE = 1024 * 1024 * 5
monkeypatch.setenv("RAY_max_grpc_message_size", str(GRPC_MAX_SIZE))
cluster = ray_start_cluster
ACTORS_PER_NODE = num_actors
NUM_REMOTE_NODES = num_nodes
cluster.add_node(num_cpus=ACTORS_PER_NODE)
ray.init(address=cluster.address)
# This node is for the other two readers.
for _ in range(NUM_REMOTE_NODES):
cluster.add_node(num_cpus=ACTORS_PER_NODE)
cluster.wait_for_nodes()

wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1)

actors = [
Actor.options(num_cpus=1).remote(0)
for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))
]

def _get_node_id(self) -> "ray.NodeID":
return ray.get_runtime_context().get_node_id()

node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors])
assert len(set(node_ids)) == NUM_REMOTE_NODES + 1

with InputNode() as inp:
outputs = []
for actor in actors:
outputs.append(actor.echo.bind(inp))
dag = MultiOutputNode(outputs)

compiled_dag = dag.experimental_compile()

# Set the object size a little bigger than the gRPC size so that
# it triggers chunking and resizing.
size = GRPC_MAX_SIZE + (1024 * 1024 * 2)
val = b"x" * size

for _ in range(3):
ref = compiled_dag.execute(val)
result = ray.get(ref)
assert result == [val for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))]

compiled_dag.teardown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
Loading

0 comments on commit 57136b5

Please sign in to comment.