Skip to content

Commit

Permalink
[observability][export-api] Write actor events (ray-project#47303)
Browse files Browse the repository at this point in the history
Write actor events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false.
Event write is called whenever a value in the actor event data schema is modified. Typically this occurs before writing ActorTableData to the GCS table or publishing the data for the dashboard

Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
nikitavemuri authored and ujjawal-khare committed Oct 15, 2024
1 parent aa3ae33 commit f624ba8
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>
#include <thread>

// clang-format off
#include "gtest/gtest.h"
Expand All @@ -24,6 +26,7 @@
#include "mock/ray/gcs/gcs_server/gcs_kv_manager.h"
#include "mock/ray/gcs/gcs_server/gcs_node_manager.h"
#include "mock/ray/pubsub/publisher.h"
#include "ray/util/event.h"
// clang-format on

namespace ray {
Expand Down Expand Up @@ -118,7 +121,8 @@ class GcsActorManagerTest : public ::testing::Test {
RayConfig::instance().initialize(
R"(
{
"maximum_gcs_destroyed_actor_cached_count": 10
"maximum_gcs_destroyed_actor_cached_count": 10,
"enable_export_api_write": true
}
)");
std::promise<bool> promise;
Expand Down Expand Up @@ -160,11 +164,13 @@ class GcsActorManagerTest : public ::testing::Test {
auto job_id = JobID::FromInt(i);
job_namespace_table_[job_id] = "";
}
log_dir_ = "event_123";
}

virtual ~GcsActorManagerTest() {
io_service_.stop();
thread_io_service_->join();
std::filesystem::remove_all(log_dir_.c_str());
}

void WaitActorCreated(const ActorID &actor_id) {
Expand Down Expand Up @@ -286,9 +292,13 @@ class GcsActorManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
PeriodicalRunner periodical_runner_;
std::string log_dir_;
};

TEST_F(GcsActorManagerTest, TestBasic) {
std::vector<SourceTypeVariant> source_types = {
rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_ACTOR};
RayEventInit(source_types, absl::flat_hash_map<std::string, std::string>(), log_dir_);
auto job_id = JobID::FromInt(1);
auto registered_actor = RegisterActor(job_id);
rpc::CreateActorRequest create_actor_request;
Expand Down Expand Up @@ -323,6 +333,37 @@ TEST_F(GcsActorManagerTest, TestBasic) {
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
RAY_CHECK_EQ(gcs_actor_manager_->CountFor(rpc::ActorTableData::ALIVE, ""), 0);
RAY_CHECK_EQ(gcs_actor_manager_->CountFor(rpc::ActorTableData::DEAD, ""), 1);

// Check correct export events are written for each of the 4 state transitions
int num_retry = 5;
int num_export_events = 4;
std::vector<std::string> expected_states = {
"DEPENDENCIES_UNREADY", "PENDING_CREATION", "ALIVE", "DEAD"};
std::vector<std::string> vc;
for (int i = 0; i < num_retry; i++) {
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log");
if ((int)vc.size() == num_export_events) {
for (int event_idx = 0; event_idx < num_export_events; event_idx++) {
json export_event_as_json = json::parse(vc[event_idx]);
json event_data = export_event_as_json["event_data"].get<json>();
ASSERT_EQ(event_data["state"], expected_states[event_idx]);
if (event_idx == num_export_events - 1) {
// Verify death cause for last actor DEAD event
ASSERT_EQ(
event_data["death_cause"]["actor_died_error_context"]["error_message"],
"The actor is dead because all references to the actor were removed.");
}
}
return;
} else {
// Sleep and retry
std::this_thread::sleep_for(std::chrono::seconds(1));
vc.clear();
}
}
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log");
ASSERT_TRUE(false) << "Export API only wrote " << (int)vc.size()
<< " lines, but expecting 4.\n";
}

TEST_F(GcsActorManagerTest, TestDeadCount) {
Expand Down

0 comments on commit f624ba8

Please sign in to comment.