Skip to content

Commit

Permalink
[observability][export-api] Write actor events (ray-project#47529)
Browse files Browse the repository at this point in the history
- Add back code changes from [observability][export-api] Write actor events ray-project#47303
- Separate out actor manager export event test into a separate file so we can skip on windows. Update BUILD rule so all tests in src/ray/gcs/gcs_server/test/export_api are skipped on windows

Signed-off-by: Nikita Vemuri <[email protected]>
Co-authored-by: Nikita Vemuri <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
2 people authored and ujjawal-khare committed Oct 15, 2024
1 parent 455d7d3 commit 4bc29b8
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
40 changes: 38 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,36 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const {

rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; }

void GcsActor::WriteActorExportEvent() const {
/// Write actor_table_data_ as a export actor event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
return;
}
std::shared_ptr<rpc::ExportActorData> export_actor_data_ptr =
std::make_shared<rpc::ExportActorData>();

export_actor_data_ptr->set_actor_id(actor_table_data_.actor_id());
export_actor_data_ptr->set_job_id(actor_table_data_.job_id());
export_actor_data_ptr->set_state(ConvertActorStateToExport(actor_table_data_.state()));
export_actor_data_ptr->set_is_detached(actor_table_data_.is_detached());
export_actor_data_ptr->set_name(actor_table_data_.name());
export_actor_data_ptr->set_pid(actor_table_data_.pid());
export_actor_data_ptr->set_ray_namespace(actor_table_data_.ray_namespace());
export_actor_data_ptr->set_serialized_runtime_env(
actor_table_data_.serialized_runtime_env());
export_actor_data_ptr->set_class_name(actor_table_data_.class_name());
export_actor_data_ptr->mutable_death_cause()->CopyFrom(actor_table_data_.death_cause());
export_actor_data_ptr->mutable_required_resources()->insert(
actor_table_data_.required_resources().begin(),
actor_table_data_.required_resources().end());
export_actor_data_ptr->set_node_id(actor_table_data_.node_id());
export_actor_data_ptr->set_placement_group_id(actor_table_data_.placement_group_id());
export_actor_data_ptr->set_repr_name(actor_table_data_.repr_name());

RayExportEvent(export_actor_data_ptr).SendEvent();
}

rpc::TaskSpec *GcsActor::GetMutableTaskSpec() { return task_spec_.get(); }

const ResourceRequest &GcsActor::GetAcquiredResources() const {
Expand Down Expand Up @@ -770,6 +800,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
[this, actor](const Status &status) {
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(status);
actor->WriteActorExportEvent();
// If a creator dies before this callback is called, the actor could have
// been already destroyed. It is okay not to invoke a callback because we
// don't need to reply to the creator as it is already dead.
Expand Down Expand Up @@ -866,6 +897,7 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,

// Pub this state for dashboard showing.
RAY_CHECK_OK(gcs_publisher_->PublishActor(actor_id, actor_table_data, nullptr));
actor->WriteActorExportEvent();
RemoveUnresolvedActor(actor);

// Update the registered actor as its creation task specification may have changed due
Expand Down Expand Up @@ -1075,10 +1107,11 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(),
*actor_table_data,
[this, actor_id, actor_table_data](Status status) {
[this, actor, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr));
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr));
actor->WriteActorExportEvent();
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
}));
Expand Down Expand Up @@ -1366,9 +1399,10 @@ void GcsActorManager::RestartActor(const ActorID &actor_id,
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id,
*mutable_actor_table_data,
[this, actor_id, mutable_actor_table_data](Status status) {
[this, actor, actor_id, mutable_actor_table_data](Status status) {
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
actor->WriteActorExportEvent();
}));
gcs_actor_scheduler_->Schedule(actor);
} else {
Expand Down Expand Up @@ -1398,6 +1432,7 @@ void GcsActorManager::RestartActor(const ActorID &actor_id,
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
RAY_CHECK_OK(
gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr));
actor->WriteActorExportEvent();
}));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
Expand Down Expand Up @@ -1505,6 +1540,7 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &ac
[this, actor_id, actor_table_data, actor, reply](Status status) {
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr));
actor->WriteActorExportEvent();
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_create_callbacks_.
Expand Down
25 changes: 25 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/util/counter_map.h"
#include "ray/util/event.h"
#include "src/ray/protobuf/gcs_service.pb.h"

namespace ray {
Expand Down Expand Up @@ -188,6 +189,9 @@ class GcsActor {
/// Get the mutable ActorTableData of this actor.
rpc::ActorTableData *GetMutableActorTableData();
rpc::TaskSpec *GetMutableTaskSpec();
/// Write an event containing this actor's ActorTableData
/// to file for the Export API.
void WriteActorExportEvent() const;

const ResourceRequest &GetAcquiredResources() const;
void SetAcquiredResources(ResourceRequest &&resource_request);
Expand All @@ -214,6 +218,27 @@ class GcsActor {
last_metric_state_ = cur_state;
}

rpc::ExportActorData::ActorState ConvertActorStateToExport(
rpc::ActorTableData::ActorState actor_state) const {
switch (actor_state) {
case rpc::ActorTableData::DEPENDENCIES_UNREADY:
return rpc::ExportActorData::DEPENDENCIES_UNREADY;
case rpc::ActorTableData::PENDING_CREATION:
return rpc::ExportActorData::PENDING_CREATION;
case rpc::ActorTableData::ALIVE:
return rpc::ExportActorData::ALIVE;
case rpc::ActorTableData::RESTARTING:
return rpc::ExportActorData::RESTARTING;
case rpc::ActorTableData::DEAD:
return rpc::ExportActorData::DEAD;
default:
// Unknown rpc::ActorTableData::ActorState value
RAY_LOG(FATAL) << "Invalid value for rpc::ActorTableData::ActorState"
<< rpc::ActorTableData::ActorState_Name(actor_state);
return rpc::ExportActorData::DEAD;
}
}

/// The actor meta data which contains the task specification as well as the state of
/// the gcs actor and so on (see gcs.proto).
rpc::ActorTableData actor_table_data_;
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ int main(int argc, char *argv[]) {

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
// This GCS server process emits GCS standard events, and Node export events
// This GCS server process emits GCS standard events, and Node and Actor export events
// so the various source types are passed to RayEventInit. The type of an
// event is determined by the schema of its event data.
const std::vector<ray::SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_GCS,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE};
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE,
ray::rpc::ExportEvent_SourceType_EXPORT_ACTOR};
ray::RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {}

void WaitForActorRefDeleted(
const rpc::WaitForActorRefDeletedRequest &request,
const rpc::ClientCallback<rpc::WaitForActorRefDeletedReply> &callback) override {
void WaitForActorOutOfScope(
const rpc::WaitForActorOutOfScopeRequest &request,
const rpc::ClientCallback<rpc::WaitForActorOutOfScopeReply> &callback) override {
callbacks_.push_back(callback);
}

Expand All @@ -93,12 +93,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {

// The created_actors_ of gcs actor manager will be modified in io_service thread.
// In order to avoid multithreading reading and writing created_actors_, we also
// send the `WaitForActorRefDeleted` callback operation to io_service thread.
// send the `WaitForActorOutOfScope` callback operation to io_service thread.
std::promise<bool> promise;
io_service_.post(
[this, status, &promise]() {
auto callback = callbacks_.front();
auto reply = rpc::WaitForActorRefDeletedReply();
auto reply = rpc::WaitForActorOutOfScopeReply();
callback(status, std::move(reply));
promise.set_value(false);
},
Expand All @@ -109,7 +109,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
return true;
}

std::list<rpc::ClientCallback<rpc::WaitForActorRefDeletedReply>> callbacks_;
std::list<rpc::ClientCallback<rpc::WaitForActorOutOfScopeReply>> callbacks_;
std::vector<ActorID> killed_actors_;
instrumented_io_context &io_service_;
};
Expand Down Expand Up @@ -298,7 +298,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
"DEPENDENCIES_UNREADY", "PENDING_CREATION", "ALIVE", "DEAD"};
std::vector<std::string> vc;
for (int i = 0; i < num_retry; i++) {
Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_ACTOR.log");
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log");
if ((int)vc.size() == num_export_events) {
for (int event_idx = 0; event_idx < num_export_events; event_idx++) {
json export_event_as_json = json::parse(vc[event_idx]);
Expand All @@ -308,8 +308,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
// Verify death cause for last actor DEAD event
ASSERT_EQ(
event_data["death_cause"]["actor_died_error_context"]["error_message"],
"The actor is dead because all references to the actor were removed "
"including lineage ref count.");
"The actor is dead because all references to the actor were removed.");
}
}
return;
Expand All @@ -319,7 +318,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
vc.clear();
}
}
Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_ACTOR.log");
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log");
std::ostringstream lines;
for (auto line : vc) {
lines << line << "\n";
Expand Down
4 changes: 4 additions & 0 deletions src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export
RAY_CHECK(google::protobuf::util::MessageToJsonString(
export_event.node_event_data(), &event_data_as_string, options)
.ok());
} else if (export_event.has_actor_event_data()) {
RAY_CHECK(google::protobuf::util::MessageToJsonString(
export_event.actor_event_data(), &event_data_as_string, options)
.ok());
} else {
RAY_LOG(FATAL)
<< "event_data missing from export event with id " << export_event.event_id()
Expand Down

0 comments on commit 4bc29b8

Please sign in to comment.