From 4bc29b897b10f0dffae58620124d1a8be5e92159 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Mon, 9 Sep 2024 10:13:01 -0700 Subject: [PATCH] [observability][export-api] Write actor events (#47529) - Add back code changes from [observability][export-api] Write actor events #47303 - Separate out actor manager export event test into a separate file so we can skip on windows. Update BUILD rule so all tests in src/ray/gcs/gcs_server/test/export_api are skipped on windows Signed-off-by: Nikita Vemuri Co-authored-by: Nikita Vemuri Signed-off-by: ujjawal-khare --- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 40 ++++++++++++++++++- src/ray/gcs/gcs_server/gcs_actor_manager.h | 25 ++++++++++++ src/ray/gcs/gcs_server/gcs_server_main.cc | 5 ++- .../gcs_actor_manager_export_event_test.cc | 19 +++++---- src/ray/util/event.cc | 4 ++ 5 files changed, 79 insertions(+), 14 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 2e76b1effd63..90c9c090cba9 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -241,6 +241,36 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const { rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } +void GcsActor::WriteActorExportEvent() const { + /// Write actor_table_data_ as a export actor event if + /// enable_export_api_write() is enabled. + if (!RayConfig::instance().enable_export_api_write()) { + return; + } + std::shared_ptr export_actor_data_ptr = + std::make_shared(); + + export_actor_data_ptr->set_actor_id(actor_table_data_.actor_id()); + export_actor_data_ptr->set_job_id(actor_table_data_.job_id()); + export_actor_data_ptr->set_state(ConvertActorStateToExport(actor_table_data_.state())); + export_actor_data_ptr->set_is_detached(actor_table_data_.is_detached()); + export_actor_data_ptr->set_name(actor_table_data_.name()); + export_actor_data_ptr->set_pid(actor_table_data_.pid()); + export_actor_data_ptr->set_ray_namespace(actor_table_data_.ray_namespace()); + export_actor_data_ptr->set_serialized_runtime_env( + actor_table_data_.serialized_runtime_env()); + export_actor_data_ptr->set_class_name(actor_table_data_.class_name()); + export_actor_data_ptr->mutable_death_cause()->CopyFrom(actor_table_data_.death_cause()); + export_actor_data_ptr->mutable_required_resources()->insert( + actor_table_data_.required_resources().begin(), + actor_table_data_.required_resources().end()); + export_actor_data_ptr->set_node_id(actor_table_data_.node_id()); + export_actor_data_ptr->set_placement_group_id(actor_table_data_.placement_group_id()); + export_actor_data_ptr->set_repr_name(actor_table_data_.repr_name()); + + RayExportEvent(export_actor_data_ptr).SendEvent(); +} + rpc::TaskSpec *GcsActor::GetMutableTaskSpec() { return task_spec_.get(); } const ResourceRequest &GcsActor::GetAcquiredResources() const { @@ -770,6 +800,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ [this, actor](const Status &status) { // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); + actor->WriteActorExportEvent(); // If a creator dies before this callback is called, the actor could have // been already destroyed. It is okay not to invoke a callback because we // don't need to reply to the creator as it is already dead. @@ -866,6 +897,7 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, // Pub this state for dashboard showing. RAY_CHECK_OK(gcs_publisher_->PublishActor(actor_id, actor_table_data, nullptr)); + actor->WriteActorExportEvent(); RemoveUnresolvedActor(actor); // Update the registered actor as its creation task specification may have changed due @@ -1075,10 +1107,11 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor_table_data, - [this, actor_id, actor_table_data](Status status) { + [this, actor, actor_id, actor_table_data](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + actor->WriteActorExportEvent(); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); })); @@ -1366,9 +1399,10 @@ void GcsActorManager::RestartActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, - [this, actor_id, mutable_actor_table_data](Status status) { + [this, actor, actor_id, mutable_actor_table_data](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); + actor->WriteActorExportEvent(); })); gcs_actor_scheduler_->Schedule(actor); } else { @@ -1398,6 +1432,7 @@ void GcsActorManager::RestartActor(const ActorID &actor_id, actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); RAY_CHECK_OK( gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + actor->WriteActorExportEvent(); })); // The actor is dead, but we should not remove the entry from the // registered actors yet. If the actor is owned, we will destroy the actor @@ -1505,6 +1540,7 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &ac [this, actor_id, actor_table_data, actor, reply](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr)); + actor->WriteActorExportEvent(); // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from // actor_to_create_callbacks_. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 2849b64e3578..c05ba9ebd0dc 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -30,6 +30,7 @@ #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/worker/core_worker_client.h" #include "ray/util/counter_map.h" +#include "ray/util/event.h" #include "src/ray/protobuf/gcs_service.pb.h" namespace ray { @@ -188,6 +189,9 @@ class GcsActor { /// Get the mutable ActorTableData of this actor. rpc::ActorTableData *GetMutableActorTableData(); rpc::TaskSpec *GetMutableTaskSpec(); + /// Write an event containing this actor's ActorTableData + /// to file for the Export API. + void WriteActorExportEvent() const; const ResourceRequest &GetAcquiredResources() const; void SetAcquiredResources(ResourceRequest &&resource_request); @@ -214,6 +218,27 @@ class GcsActor { last_metric_state_ = cur_state; } + rpc::ExportActorData::ActorState ConvertActorStateToExport( + rpc::ActorTableData::ActorState actor_state) const { + switch (actor_state) { + case rpc::ActorTableData::DEPENDENCIES_UNREADY: + return rpc::ExportActorData::DEPENDENCIES_UNREADY; + case rpc::ActorTableData::PENDING_CREATION: + return rpc::ExportActorData::PENDING_CREATION; + case rpc::ActorTableData::ALIVE: + return rpc::ExportActorData::ALIVE; + case rpc::ActorTableData::RESTARTING: + return rpc::ExportActorData::RESTARTING; + case rpc::ActorTableData::DEAD: + return rpc::ExportActorData::DEAD; + default: + // Unknown rpc::ActorTableData::ActorState value + RAY_LOG(FATAL) << "Invalid value for rpc::ActorTableData::ActorState" + << rpc::ActorTableData::ActorState_Name(actor_state); + return rpc::ExportActorData::DEAD; + } + } + /// The actor meta data which contains the task specification as well as the state of /// the gcs actor and so on (see gcs.proto). rpc::ActorTableData actor_table_data_; diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index e2e3da236c8e..947f1c640ac7 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -88,12 +88,13 @@ int main(int argc, char *argv[]) { // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) { - // This GCS server process emits GCS standard events, and Node export events + // This GCS server process emits GCS standard events, and Node and Actor export events // so the various source types are passed to RayEventInit. The type of an // event is determined by the schema of its event data. const std::vector source_types = { ray::rpc::Event_SourceType::Event_SourceType_GCS, - ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; + ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE, + ray::rpc::ExportEvent_SourceType_EXPORT_ACTOR}; ray::RayEventInit(source_types, absl::flat_hash_map(), log_dir, diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc index 3201a4f7b346..0d2dc48cc763 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc @@ -75,9 +75,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {} - void WaitForActorRefDeleted( - const rpc::WaitForActorRefDeletedRequest &request, - const rpc::ClientCallback &callback) override { + void WaitForActorOutOfScope( + const rpc::WaitForActorOutOfScopeRequest &request, + const rpc::ClientCallback &callback) override { callbacks_.push_back(callback); } @@ -93,12 +93,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // The created_actors_ of gcs actor manager will be modified in io_service thread. // In order to avoid multithreading reading and writing created_actors_, we also - // send the `WaitForActorRefDeleted` callback operation to io_service thread. + // send the `WaitForActorOutOfScope` callback operation to io_service thread. std::promise promise; io_service_.post( [this, status, &promise]() { auto callback = callbacks_.front(); - auto reply = rpc::WaitForActorRefDeletedReply(); + auto reply = rpc::WaitForActorOutOfScopeReply(); callback(status, std::move(reply)); promise.set_value(false); }, @@ -109,7 +109,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { return true; } - std::list> callbacks_; + std::list> callbacks_; std::vector killed_actors_; instrumented_io_context &io_service_; }; @@ -298,7 +298,7 @@ TEST_F(GcsActorManagerTest, TestBasic) { "DEPENDENCIES_UNREADY", "PENDING_CREATION", "ALIVE", "DEAD"}; std::vector vc; for (int i = 0; i < num_retry; i++) { - Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_ACTOR.log"); + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log"); if ((int)vc.size() == num_export_events) { for (int event_idx = 0; event_idx < num_export_events; event_idx++) { json export_event_as_json = json::parse(vc[event_idx]); @@ -308,8 +308,7 @@ TEST_F(GcsActorManagerTest, TestBasic) { // Verify death cause for last actor DEAD event ASSERT_EQ( event_data["death_cause"]["actor_died_error_context"]["error_message"], - "The actor is dead because all references to the actor were removed " - "including lineage ref count."); + "The actor is dead because all references to the actor were removed."); } } return; @@ -319,7 +318,7 @@ TEST_F(GcsActorManagerTest, TestBasic) { vc.clear(); } } - Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_ACTOR.log"); + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log"); std::ostringstream lines; for (auto line : vc) { lines << line << "\n"; diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 90bdf335eb70..57ac75d4a0dd 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -140,6 +140,10 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export RAY_CHECK(google::protobuf::util::MessageToJsonString( export_event.node_event_data(), &event_data_as_string, options) .ok()); + } else if (export_event.has_actor_event_data()) { + RAY_CHECK(google::protobuf::util::MessageToJsonString( + export_event.actor_event_data(), &event_data_as_string, options) + .ok()); } else { RAY_LOG(FATAL) << "event_data missing from export event with id " << export_event.event_id()