Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fishbone committed May 9, 2022
1 parent 263fcec commit d18834a
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/ray/common/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class RaySyncer {
/// it'll have a global view of the cluster.
///
///
/// \param message_type The component to sync.
/// \param message_type The message type of the component.
/// \param reporter The local component to be broadcasted.
/// \param receiver The consumer of the sync message sent by the other nodes in the
/// cluster.
Expand Down
45 changes: 22 additions & 23 deletions src/ray/common/test/ray_syncer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,29 @@ class RaySyncerTest : public ::testing::Test {

TEST_F(RaySyncerTest, NodeStateCreateSyncMessage) {
auto node_status = std::make_unique<NodeState>();
node_status->SetComponent(MessageType::RESOURCE_MANAGER, nullptr, nullptr);
ASSERT_EQ(std::nullopt, node_status->CreateSyncMessage(MessageType::RESOURCE_MANAGER));
node_status->SetComponent(MessageType::RESOURCE_VIEW, nullptr, nullptr);
ASSERT_EQ(std::nullopt, node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW));

auto reporter = std::make_unique<MockReporterInterface>();
ASSERT_TRUE(node_status->SetComponent(MessageType::RESOURCE_MANAGER,
GetReporter(MessageType::RESOURCE_MANAGER),
nullptr));
ASSERT_TRUE(node_status->SetComponent(
MessageType::RESOURCE_VIEW, GetReporter(MessageType::RESOURCE_VIEW), nullptr));

// Take a snapshot
auto msg = node_status->CreateSyncMessage(MessageType::RESOURCE_MANAGER);
ASSERT_EQ(LocalVersion(MessageType::RESOURCE_MANAGER), msg->version());
auto msg = node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW);
ASSERT_EQ(LocalVersion(MessageType::RESOURCE_VIEW), msg->version());
// Revert one version back.
LocalVersion(MessageType::RESOURCE_MANAGER) -= 1;
msg = node_status->CreateSyncMessage(MessageType::RESOURCE_MANAGER);
LocalVersion(MessageType::RESOURCE_VIEW) -= 1;
msg = node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW);
ASSERT_EQ(std::nullopt, msg);
}

TEST_F(RaySyncerTest, NodeStateConsume) {
auto node_status = std::make_unique<NodeState>();
node_status->SetComponent(
MessageType::RESOURCE_MANAGER, nullptr, GetReceiver(MessageType::RESOURCE_MANAGER));
MessageType::RESOURCE_VIEW, nullptr, GetReceiver(MessageType::RESOURCE_VIEW));
auto from_node_id = NodeID::FromRandom();
// The first time receiver the message
auto msg = MakeMessage(MessageType::RESOURCE_MANAGER, 0, from_node_id);
auto msg = MakeMessage(MessageType::RESOURCE_VIEW, 0, from_node_id);
ASSERT_TRUE(node_status->ConsumeSyncMessage(std::make_shared<RaySyncMessage>(msg)));
ASSERT_FALSE(node_status->ConsumeSyncMessage(std::make_shared<RaySyncMessage>(msg)));

Expand All @@ -154,17 +153,17 @@ TEST_F(RaySyncerTest, NodeSyncConnection) {
node_id.Binary(),
[](std::shared_ptr<ray::rpc::syncer::RaySyncMessage>) {});
auto from_node_id = NodeID::FromRandom();
auto msg = MakeMessage(MessageType::RESOURCE_MANAGER, 0, from_node_id);
auto msg = MakeMessage(MessageType::RESOURCE_VIEW, 0, from_node_id);

// First push will succeed and the second one will be deduplicated.
ASSERT_TRUE(sync_connection.PushToSendingQueue(std::make_shared<RaySyncMessage>(msg)));
ASSERT_FALSE(sync_connection.PushToSendingQueue(std::make_shared<RaySyncMessage>(msg)));
ASSERT_EQ(1, sync_connection.sending_buffer_.size());
ASSERT_EQ(0, sync_connection.sending_buffer_.begin()->second->version());
ASSERT_EQ(1, sync_connection.node_versions_.size());
ASSERT_EQ(0,
sync_connection
.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_MANAGER]);
ASSERT_EQ(
0,
sync_connection.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_VIEW]);

msg.set_version(2);
ASSERT_TRUE(sync_connection.PushToSendingQueue(std::make_shared<RaySyncMessage>(msg)));
Expand All @@ -173,9 +172,9 @@ TEST_F(RaySyncerTest, NodeSyncConnection) {
ASSERT_EQ(1, sync_connection.sending_buffer_.size());
ASSERT_EQ(1, sync_connection.node_versions_.size());
ASSERT_EQ(2, sync_connection.sending_buffer_.begin()->second->version());
ASSERT_EQ(2,
sync_connection
.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_MANAGER]);
ASSERT_EQ(
2,
sync_connection.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_VIEW]);
}

struct SyncerServerTest {
Expand Down Expand Up @@ -375,10 +374,10 @@ TEST(SyncerTest, Test1To1) {
auto s2 = SyncerServerTest("19991");

// Make sure the setup is correct
ASSERT_NE(nullptr, s1.receivers[MessageType::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s2.receivers[MessageType::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s1.reporters[MessageType::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s2.reporters[MessageType::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s1.receivers[MessageType::RESOURCE_VIEW]);
ASSERT_NE(nullptr, s2.receivers[MessageType::RESOURCE_VIEW]);
ASSERT_NE(nullptr, s1.reporters[MessageType::RESOURCE_VIEW]);
ASSERT_NE(nullptr, s2.reporters[MessageType::RESOURCE_VIEW]);

auto channel_to_s2 = MakeChannel("19991");

Expand Down Expand Up @@ -419,7 +418,7 @@ TEST(SyncerTest, Test1To1) {
// Make sure s2 send the new message to s1.
ASSERT_TRUE(s1.WaitUntil(
[&s1, node_id = s2.syncer->GetLocalNodeID()]() {
return s1.GetReceivedVersions(node_id)[MessageType::RESOURCE_MANAGER] == 1 &&
return s1.GetReceivedVersions(node_id)[MessageType::RESOURCE_VIEW] == 1 &&
s1.GetNumConsumedMessages(node_id) == 2;
},
5));
Expand Down
7 changes: 3 additions & 4 deletions src/ray/common/test/syncer_service_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class LocalNode : public ReporterInterface {
return std::nullopt;
}
ray::rpc::syncer::RaySyncMessage msg;
msg.set_message_type(ray::rpc::syncer::MessageType::RESOURCE_MANAGER);
msg.set_message_type(ray::rpc::syncer::MessageType::RESOURCE_VIEW);
msg.set_version(version_);
msg.set_sync_message(
std::string(reinterpret_cast<const char *>(&state_), sizeof(state_)));
Expand Down Expand Up @@ -103,9 +103,8 @@ int main(int argc, char *argv[]) {
std::unique_ptr<RaySyncerService> service;
std::unique_ptr<grpc::Server> server;
std::shared_ptr<grpc::Channel> channel;
syncer.Register(ray::rpc::syncer::MessageType::RESOURCE_MANAGER,
local_node.get(),
remote_node.get());
syncer.Register(
ray::rpc::syncer::MessageType::RESOURCE_VIEW, local_node.get(), remote_node.get());
if (server_port != ".") {
RAY_LOG(INFO) << "Start server on port " << server_port;
auto server_address = "0.0.0.0:" + server_port;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void GcsResourceManager::ConsumeSyncMessage(
rpc::ResourcesData resources;
resources.ParseFromString(message->sync_message());
resources.set_node_id(message->node_id());
RAY_CHECK(message->message_type() == syncer::MessageType::RESOURCE_MANAGER);
RAY_CHECK(message->message_type() == syncer::MessageType::RESOURCE_VIEW);
UpdateFromResourceReport(resources);
},
"GcsResourceManager::Update");
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
ray_syncer_ = std::make_unique<syncer::RaySyncer>(ray_syncer_io_context_,
gcs_node_id_.Binary());
ray_syncer_->Register(
syncer::MessageType::RESOURCE_MANAGER, nullptr, gcs_resource_manager_.get());
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_thread_ = std::make_unique<std::thread>([this]() {
boost::asio::io_service::work work(ray_syncer_io_context_);
ray_syncer_io_context_.run();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/ray_syncer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ option cc_enable_arenas = true;
package ray.rpc.syncer;

enum MessageType {
RESOURCE_MANAGER = 0;
RESOURCE_VIEW = 0;
COMMANDS = 1;
}

Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ ray::Status NodeManager::RegisterGcs() {
if (RayConfig::instance().use_ray_syncer()) {
// Register resource manager and scheduler
ray_syncer_.Register(
/* message_type */ syncer::MessageType::RESOURCE_MANAGER,
/* message_type */ syncer::MessageType::RESOURCE_VIEW,
/* reporter */ &cluster_resource_scheduler_->GetLocalResourceManager(),
/* receiver */ this,
/* pull_from_reporter_interval_ms */
Expand Down Expand Up @@ -1814,7 +1814,7 @@ void NodeManager::HandleCommitBundleResources(
placement_group_resource_manager_->CommitBundles(bundle_specs);
if (RayConfig::instance().use_ray_syncer()) {
// To reduce the lag, we trigger a broadcasting immediately.
RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_MANAGER));
RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW));
}
send_reply_callback(Status::OK(), nullptr, nullptr);

Expand Down Expand Up @@ -1855,7 +1855,7 @@ void NodeManager::HandleCancelResourceReserve(
placement_group_resource_manager_->ReturnBundle(bundle_spec);
if (RayConfig::instance().use_ray_syncer()) {
// To reduce the lag, we trigger a broadcasting immediately.
RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_MANAGER));
RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW));
}
cluster_task_manager_->ScheduleAndDispatchTasks();
send_reply_callback(Status::OK(), nullptr, nullptr);
Expand Down Expand Up @@ -2694,7 +2694,7 @@ void NodeManager::RecordMetrics() {

void NodeManager::ConsumeSyncMessage(
std::shared_ptr<const syncer::RaySyncMessage> message) {
if (message->message_type() == syncer::MessageType::RESOURCE_MANAGER) {
if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) {
rpc::ResourcesData data;
data.ParseFromString(message->sync_message());
NodeID node_id = NodeID::FromBinary(data.node_id());
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ double LocalResourceManager::GetLocalAvailableCpus() const {

std::optional<syncer::RaySyncMessage> LocalResourceManager::CreateSyncMessage(
int64_t after_version, syncer::MessageType message_type) const {
RAY_CHECK(message_type == syncer::MessageType::RESOURCE_MANAGER);
RAY_CHECK(message_type == syncer::MessageType::RESOURCE_VIEW);
// We check the memory inside version, so version is not a const function.
// Ideally, we need to move the memory check somewhere else.
// TODO(iycheng): Make version as a const function.
Expand Down

0 comments on commit d18834a

Please sign in to comment.