From 19e21b0cc79eba708bc0b8098403fcaca30c6c45 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Mon, 9 Sep 2024 10:13:01 -0700 Subject: [PATCH] [observability][export-api] Write actor events (#47529) - Add back code changes from [observability][export-api] Write actor events #47303 - Separate out actor manager export event test into a separate file so we can skip on windows. Update BUILD rule so all tests in src/ray/gcs/gcs_server/test/export_api are skipped on windows Signed-off-by: Nikita Vemuri Co-authored-by: Nikita Vemuri Signed-off-by: ujjawal-khare --- BUILD.bazel | 19 --- src/ray/gcs/gcs_server/gcs_actor_manager.h | 25 ++++ .../gcs_node_manager_export_event_test.cc | 137 ------------------ 3 files changed, 25 insertions(+), 156 deletions(-) delete mode 100644 src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index edbede081333..83660e6aa8ca 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1654,25 +1654,6 @@ ray_cc_test( ], ) -ray_cc_test( - name = "gcs_node_manager_export_event_test", - size = "small", - srcs = [ - "src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc", - ], - tags = [ - "no_windows", - "team:core" - ], - deps = [ - ":gcs_server_lib", - ":gcs_server_test_util", - ":gcs_test_util_lib", - ":ray_mock", - "@com_google_googletest//:gtest_main", - ], -) - ray_cc_test( name = "gcs_job_manager_test", size = "small", diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 2849b64e3578..c05ba9ebd0dc 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -30,6 +30,7 @@ #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/worker/core_worker_client.h" #include "ray/util/counter_map.h" +#include "ray/util/event.h" #include "src/ray/protobuf/gcs_service.pb.h" namespace ray { @@ -188,6 +189,9 @@ class GcsActor { /// Get the mutable ActorTableData of this actor. rpc::ActorTableData *GetMutableActorTableData(); rpc::TaskSpec *GetMutableTaskSpec(); + /// Write an event containing this actor's ActorTableData + /// to file for the Export API. + void WriteActorExportEvent() const; const ResourceRequest &GetAcquiredResources() const; void SetAcquiredResources(ResourceRequest &&resource_request); @@ -214,6 +218,27 @@ class GcsActor { last_metric_state_ = cur_state; } + rpc::ExportActorData::ActorState ConvertActorStateToExport( + rpc::ActorTableData::ActorState actor_state) const { + switch (actor_state) { + case rpc::ActorTableData::DEPENDENCIES_UNREADY: + return rpc::ExportActorData::DEPENDENCIES_UNREADY; + case rpc::ActorTableData::PENDING_CREATION: + return rpc::ExportActorData::PENDING_CREATION; + case rpc::ActorTableData::ALIVE: + return rpc::ExportActorData::ALIVE; + case rpc::ActorTableData::RESTARTING: + return rpc::ExportActorData::RESTARTING; + case rpc::ActorTableData::DEAD: + return rpc::ExportActorData::DEAD; + default: + // Unknown rpc::ActorTableData::ActorState value + RAY_LOG(FATAL) << "Invalid value for rpc::ActorTableData::ActorState" + << rpc::ActorTableData::ActorState_Name(actor_state); + return rpc::ExportActorData::DEAD; + } + } + /// The actor meta data which contains the task specification as well as the state of /// the gcs actor and so on (see gcs.proto). rpc::ActorTableData actor_table_data_; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc deleted file mode 100644 index d43d6825321d..000000000000 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include - -// clang-format off -#include "gtest/gtest.h" -#include "ray/gcs/gcs_server/test/gcs_server_test_util.h" -#include "ray/gcs/test/gcs_test_util.h" -#include "ray/rpc/node_manager/node_manager_client.h" -#include "ray/rpc/node_manager/node_manager_client_pool.h" -#include "mock/ray/pubsub/publisher.h" -#include "ray/util/event.h" -// clang-format on - -namespace ray { - -std::string GenerateLogDir() { - std::string log_dir_generate = std::string(5, ' '); - FillRandom(&log_dir_generate); - std::string log_dir = "event" + StringToHex(log_dir_generate); - return log_dir; -} - -class GcsNodeManagerExportAPITest : public ::testing::Test { - public: - GcsNodeManagerExportAPITest() { - raylet_client_ = std::make_shared(); - client_pool_ = std::make_shared( - [this](const rpc::Address &) { return raylet_client_; }); - gcs_publisher_ = std::make_shared( - std::make_unique()); - gcs_table_storage_ = std::make_shared(io_service_); - - RayConfig::instance().initialize( - R"( -{ - "enable_export_api_write": true -} - )"); - log_dir_ = GenerateLogDir(); - const std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - } - - virtual ~GcsNodeManagerExportAPITest() { - io_service_.stop(); - EventManager::Instance().ClearReporters(); - std::filesystem::remove_all(log_dir_.c_str()); - } - - protected: - std::shared_ptr gcs_table_storage_; - std::shared_ptr raylet_client_; - std::shared_ptr client_pool_; - std::shared_ptr gcs_publisher_; - instrumented_io_context io_service_; - std::string log_dir_; -}; - -TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { - // Test export event is written when a node is added with HandleRegisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); - auto node = Mocker::GenNodeInfo(); - - rpc::RegisterNodeRequest register_request; - register_request.mutable_node_info()->CopyFrom(*node); - rpc::RegisterNodeReply register_reply; - auto send_reply_callback = - [](ray::Status status, std::function f1, std::function f2) {}; - - node_manager.HandleRegisterNode(register_request, ®ister_reply, send_reply_callback); - io_service_.poll(); - - std::vector vc; - Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); - ASSERT_EQ((int)vc.size(), 1); - json event_data = json::parse(vc[0])["event_data"].get(); - ASSERT_EQ(event_data["state"], "ALIVE"); -} - -TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { - // Test export event is written when a node is removed with HandleUnregisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); - auto node = Mocker::GenNodeInfo(); - auto node_id = NodeID::FromBinary(node->node_id()); - node_manager.AddNode(node); - - rpc::UnregisterNodeRequest unregister_request; - unregister_request.set_node_id(node_id.Binary()); - unregister_request.mutable_node_death_info()->set_reason( - rpc::NodeDeathInfo::UNEXPECTED_TERMINATION); - unregister_request.mutable_node_death_info()->set_reason_message("mock reason message"); - rpc::UnregisterNodeReply unregister_reply; - auto send_reply_callback = - [](ray::Status status, std::function f1, std::function f2) {}; - - node_manager.HandleUnregisterNode( - unregister_request, &unregister_reply, send_reply_callback); - io_service_.poll(); - - std::vector vc; - Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); - ASSERT_EQ((int)vc.size(), 1); - json event_data = json::parse(vc[0])["event_data"].get(); - ASSERT_EQ(event_data["state"], "DEAD"); - // Verify death cause for last node DEAD event - ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION"); - ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message"); -} - -} // namespace ray - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file