diff --git a/src/ray/core_worker/test/task_event_buffer_export_event_test.cc b/src/ray/core_worker/test/task_event_buffer_export_event_test.cc index f9e241d3eef6..76e6c5f0360b 100644 --- a/src/ray/core_worker/test/task_event_buffer_export_event_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_export_event_test.cc @@ -168,9 +168,8 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { std::vector vc; for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { task_event_buffer_->FlushEvents(true); - ReadContentFromFile(vc, - log_dir_ + "/export_events/event_EXPORT_TASK_" + - std::to_string(getpid()) + ".log"); + ReadContentFromFile( + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); vc.clear(); } @@ -179,8 +178,7 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker vc.clear(); ReadContentFromFile( - vc, - log_dir_ + "/export_events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); for (size_t i = 0; i < max_export_events_on_buffer; i++) { json export_event_as_json = json::parse(vc[i]); diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index ea78bf5b4ba3..7e6684333685 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -58,7 +58,6 @@ class TaskEventBufferTest : public ::testing::Test { virtual void TearDown() { if (task_event_buffer_) task_event_buffer_->Stop(); - std::filesystem::remove_all(log_dir_.c_str()); }; std::vector GenTaskIDs(size_t num_tasks) { @@ -129,7 +128,6 @@ class TaskEventBufferTest : public ::testing::Test { } std::unique_ptr task_event_buffer_ = nullptr; - std::string log_dir_ = "event_123"; }; class TaskEventBufferTestManualStart : public TaskEventBufferTest { @@ -180,24 +178,6 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest { } }; -class TaskEventTestWriteExport : public TaskEventBufferTest { - public: - TaskEventTestWriteExport() : TaskEventBufferTest() { - RayConfig::instance().initialize( - R"( -{ - "task_events_report_interval_ms": 1000, - "task_events_max_num_status_events_buffer_on_worker": 10, - "task_events_max_num_profile_events_buffer_on_worker": 5, - "task_events_send_batch_size": 100, - "export_task_events_write_batch_size": 1, - "task_events_max_num_export_status_events_buffer_on_worker": 15, - "enable_export_api_write": true -} - )"); - } -}; - void ReadContentFromFile(std::vector &vc, std::string log_file, std::string filter = "") { @@ -287,107 +267,6 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); } -TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { - /* - Test writing task events to event_EXPORT_TASK_123.log as part of the export API. - This test verifies the following cases: - 1. Task events that are dropped from being sent to GCS (because more events - added than task_events_max_num_status_events_buffer_on_worker) will still - be written for the export API, if the number of events is less than - task_events_max_num_export_status_events_buffer_on_worker. - 2. Task events over task_events_max_num_status_events_buffer_on_worker - will be dropped and not written for the export API. - 3. Each Flush() call only writes a max of export_task_events_write_batch_size - export events. - In this test, 20 events are added which is greater than the max of 10 status events - that can be stored in buffer. The max export status events in buffer is 15, - so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush() - call will write 1 new event because the batch size is 1. - */ - - // {"task_events_max_num_status_events_buffer_on_worker": 10} and - // {"task_events_max_num_export_status_events_buffer_on_worker": 15} - // in TaskEventTestWriteExport so set num_events > 15. - size_t num_events = 20; - // Value of export_task_events_write_batch_size - size_t batch_size = 1; - // Value of task_events_max_num_export_status_events_buffer_on_worker - size_t max_export_events_on_buffer = 15; - auto task_ids = GenTaskIDs(num_events); - google::protobuf::util::JsonPrintOptions options; - options.preserve_proto_field_names = true; - options.always_print_primitive_fields = true; - - std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - - std::vector> task_events; - for (const auto &task_id : task_ids) { - task_events.push_back(GenStatusTaskEvent(task_id, 0)); - } - - // Convert all task_events that were added with AddTaskEvent - // to the ExportTaskEventData proto message - std::vector> task_event_data_protos; - for (const auto &task_event : task_events) { - std::shared_ptr event = - std::make_shared(); - task_event->ToRpcTaskExportEvents(event); - task_event_data_protos.push_back(event); - } - - // Add all num_events tasks - for (auto &task_event : task_events) { - task_event_buffer_->AddTaskEvent(std::move(task_event)); - } - - // Verify that batch_size events are being written for each flush - std::vector vc; - for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { - task_event_buffer_->FlushEvents(true); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); - vc.clear(); - } - - // Verify that all max_export_events_on_buffer events are written to file even though - // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker - vc.clear(); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); - for (size_t i = 0; i < max_export_events_on_buffer; i++) { - json export_event_as_json = json::parse(vc[i]); - EXPECT_EQ(export_event_as_json["source_type"].get(), "EXPORT_TASK"); - EXPECT_EQ(export_event_as_json.contains("event_id"), true); - EXPECT_EQ(export_event_as_json.contains("timestamp"), true); - EXPECT_EQ(export_event_as_json.contains("event_data"), true); - - json event_data = export_event_as_json["event_data"].get(); - - // The events written are the last max_export_events_on_buffer added - // in AddTaskEvent because (num_events-max_export_events_on_buffer) - // were dropped. - std::string expected_event_data_str; - RAY_CHECK(google::protobuf::util::MessageToJsonString( - *task_event_data_protos[i + (num_events - max_export_events_on_buffer)], - &expected_event_data_str, - options) - .ok()); - json expected_event_data = json::parse(expected_event_data_str); - EXPECT_EQ(event_data, expected_event_data); - } - - // Expect no more events. - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); -} - TEST_F(TaskEventBufferTest, TestFailedFlush) { size_t num_status_events = 20; size_t num_profile_events = 20;