Skip to content

Commit

Permalink
[core][scalability] Change ray syncer from unary call to streaming ca…
Browse files Browse the repository at this point in the history
…ll (#30460)

To handle the failure of resource broadcasting, it's hard to do fault tolerance since the status need to be maintained.

This PR updated the communication protocol to streaming.

There are several things changed for the protocol:

Once we received the message, it'll be pushed immediately. But it'll be buffered (512kb), so the cost is not big.
If there is no more message or it exceeded the buffer, it'll flush.
The PR has been tested with 2k nodes (2 cpus per node) and 14k actors.
  • Loading branch information
fishbone authored Jan 26, 2023
1 parent 25e1ffa commit 1f3226e
Show file tree
Hide file tree
Showing 18 changed files with 811 additions and 588 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2631,6 +2631,7 @@ cc_test(
copts = COPTS,
tags = ["team:core"],
deps = [
":grpc_common_lib",
":ray_common",
":ray_mock",
"@com_google_googletest//:gtest",
Expand Down
15 changes: 9 additions & 6 deletions release/nightly_tests/many_nodes_tests/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ def foo(self):
return actors


def test_actor_ready(actors):
remaining = [actor.foo.remote() for actor in actors]
ray.get(remaining)


def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--cpus-per-actor", type=float, default=0.2)
Expand All @@ -43,7 +38,15 @@ def main():
sleep(10)
return
actor_ready_start = perf_counter()
test_actor_ready(actors)
total_actors = len(actors)
objs = [actor.foo.remote() for actor in actors]

while len(objs) != 0:
objs_ready, objs = ray.wait(objs, timeout=10)
print(
f"Status: {total_actors - len(objs)}/{total_actors}, "
f"{perf_counter() - actor_ready_start}"
)
actor_ready_end = perf_counter()
actor_ready_time = actor_ready_end - actor_ready_start

Expand Down
20 changes: 17 additions & 3 deletions src/mock/ray/common/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,24 @@ class MockReceiverInterface : public ReceiverInterface {
namespace ray {
namespace syncer {

class MockNodeSyncConnection : public NodeSyncConnection {
class MockRaySyncerBidiReactor : public RaySyncerBidiReactor {
public:
using NodeSyncConnection::NodeSyncConnection;
MOCK_METHOD(void, DoSend, (), (override));
using RaySyncerBidiReactor::RaySyncerBidiReactor;

MOCK_METHOD(void, Disconnect, (), (override));

MOCK_METHOD(bool,
PushToSendingQueue,
(std::shared_ptr<const RaySyncMessage>),
(override));
};

template <typename T>
class MockRaySyncerBidiReactorBase : public RaySyncerBidiReactorBase<T> {
public:
using RaySyncerBidiReactorBase<T>::RaySyncerBidiReactorBase;

MOCK_METHOD(void, Disconnect, (), (override));
};

} // namespace syncer
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/id.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,7 @@ ID_OSTREAM_OPERATOR(ActorID);
ID_OSTREAM_OPERATOR(TaskID);
ID_OSTREAM_OPERATOR(ObjectID);
ID_OSTREAM_OPERATOR(PlacementGroupID);

const NodeID kGCSNodeID = NodeID::FromBinary(std::string(kUniqueIDSize, 0));

} // namespace ray
4 changes: 4 additions & 0 deletions src/ray/common/id.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,7 @@ DEFINE_UNIQUE_ID(PlacementGroupID);

#undef DEFINE_UNIQUE_ID
} // namespace std

namespace ray {
extern const NodeID kGCSNodeID;
}
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ RAY_CONFIG(int64_t, grpc_client_keepalive_time_ms, 300000)
/// grpc keepalive timeout for client.
RAY_CONFIG(int64_t, grpc_client_keepalive_timeout_ms, 120000)

/// grpc streaming buffer size
/// Set it to 512kb
RAY_CONFIG(int64_t, grpc_stream_buffer_size, 512 * 1024);

/// Whether to use log reporter in event framework
RAY_CONFIG(bool, event_log_reporter_enabled, false)

Expand Down
Loading

0 comments on commit 1f3226e

Please sign in to comment.