Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][aDag] Support multi node multi reader #47480

Merged
merged 15 commits into from
Sep 10, 2024
57 changes: 31 additions & 26 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3687,37 +3687,42 @@ cdef class CoreWorker:

def experimental_channel_register_writer(self,
ObjectRef writer_ref,
ObjectRef reader_ref,
writer_node,
reader_node,
ActorID reader,
int64_t num_readers):
remote_reader_refs,
remote_reader_nodes,
remote_readers,
remote_num_readers_per_node):
cdef:
CObjectID c_writer_ref = writer_ref.native()
CObjectID c_reader_ref = reader_ref.native()
CNodeID c_reader_node = CNodeID.FromHex(reader_node)
CNodeID *c_reader_node_id = NULL
CActorID c_reader_actor = reader.native()

if num_readers == 0:
return
if writer_node != reader_node:
c_reader_node_id = &c_reader_node
c_vector[CObjectID] c_remote_reader_refs
c_vector[CNodeID] c_remote_reader_nodes
c_vector[CActorID] c_remote_readers
c_vector[int64_t] c_remote_num_readers

for ref, node, reader, num_readers in zip(
remote_reader_refs,
remote_reader_nodes,
remote_readers,
remote_num_readers_per_node):
c_remote_reader_refs.push_back((<ObjectRef>ref).native())
c_remote_reader_nodes.push_back(CNodeID.FromHex(node))
c_remote_readers.push_back((<ActorID>reader).native())
assert num_readers != 0
c_remote_num_readers.push_back(num_readers)

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.ExperimentalRegisterMutableObjectWriter(c_writer_ref,
c_reader_node_id,
))
if writer_node != reader_node:
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker()
.ExperimentalRegisterMutableObjectReaderRemote(c_writer_ref,
c_reader_actor,
num_readers,
c_reader_ref
))
.ExperimentalRegisterMutableObjectWriter(
c_writer_ref,
c_remote_reader_nodes,
))
check_status(
CCoreWorkerProcess.GetCoreWorker()
.ExperimentalRegisterMutableObjectReaderRemote(
c_writer_ref,
c_remote_readers,
c_remote_num_readers,
c_remote_reader_refs
))

def experimental_channel_register_reader(self, ObjectRef object_ref):
cdef:
Expand Down
1 change: 0 additions & 1 deletion python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,6 @@ def teardown(self, wait: bool):
return

logger.info("Tearing down compiled DAG")

outer._dag_submitter.close()
outer._dag_output_fetcher.close()

Expand Down
54 changes: 0 additions & 54 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import pytest

from ray.exceptions import RayChannelError, RayChannelTimeoutError
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
import ray
import ray._private
import ray.cluster_utils
Expand Down Expand Up @@ -1769,59 +1768,6 @@ def test_driver_and_actor_as_readers(ray_start_cluster):
dag.experimental_compile()


def test_payload_large(ray_start_cluster):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to multi node test suite

cluster = ray_start_cluster
# This node is for the driver (including the CompiledDAG.DAGDriverProxyActor).
first_node_handle = cluster.add_node(num_cpus=1)
# This node is for the reader.
second_node_handle = cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
cluster.wait_for_nodes()

nodes = [first_node_handle.node_id, second_node_handle.node_id]
# We want to check that there are two nodes. Thus, we convert `nodes` to a set and
# then back to a list to remove duplicates. Then we check that the length of `nodes`
# is 2.
nodes = list(set(nodes))
assert len(nodes) == 2

def create_actor(node):
return Actor.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False)
).remote(0)

def get_node_id(self):
return ray.get_runtime_context().get_node_id()

driver_node = get_node_id(None)
nodes.remove(driver_node)

a = create_actor(nodes[0])
a_node = ray.get(a.__ray_call__.remote(get_node_id))
assert a_node == nodes[0]
# Check that the driver and actor are on different nodes.
assert driver_node != a_node

with InputNode() as i:
dag = a.echo.bind(i)

compiled_dag = dag.experimental_compile()

# Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that
# is a bit larger.
size = 1024 * 1024 * 600
val = b"x" * size

for i in range(3):
ref = compiled_dag.execute(val)
result = ray.get(ref)
assert result == val

# Note: must teardown before starting a new Ray session, otherwise you'll get
# a segfault from the dangling monitor thread upon the new Ray init.
compiled_dag.teardown()


def test_event_profiling(ray_start_regular, monkeypatch):
monkeypatch.setattr(ray.dag.constants, "RAY_ADAG_ENABLE_PROFILING", True)

Expand Down
Loading