From 6d9bef9a74b81ec5f3233a913b57855b6dc44024 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 11 Sep 2024 10:12:57 -0700 Subject: [PATCH] Fix windows://:task_event_buffer_test (#47577) Move TestWriteTaskExportEvents to a separate file and skip on Windows. This is ok for the export API feature because we currently aren't supporting on Windows (tests for other resource events written from GCS are also skipped on Windows). This test is failing in postmerge (CI test windows://:task_event_buffer_test is consistently_failing #47523) for Windows due to unknown file: error: C++ exception with description "remove_all: The process cannot access the file because it is being used by another process.: "event_123"" thrown in TearDown(). in the tear down step. This is the same error raised for other tests that clean up created directories with remove_all() in Windows (eg: //src/ray/util/tests:event_test). These tests are also skipped on Windows. Signed-off-by: Nikita Vemuri Co-authored-by: Nikita Vemuri Signed-off-by: ujjawal-khare --- .../task_event_buffer_export_event_test.cc | 8 +- .../test/task_event_buffer_test.cc | 121 ------------------ 2 files changed, 3 insertions(+), 126 deletions(-) diff --git a/src/ray/core_worker/test/task_event_buffer_export_event_test.cc b/src/ray/core_worker/test/task_event_buffer_export_event_test.cc index f9e241d3eef6..76e6c5f0360b 100644 --- a/src/ray/core_worker/test/task_event_buffer_export_event_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_export_event_test.cc @@ -168,9 +168,8 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { std::vector vc; for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { task_event_buffer_->FlushEvents(true); - ReadContentFromFile(vc, - log_dir_ + "/export_events/event_EXPORT_TASK_" + - std::to_string(getpid()) + ".log"); + ReadContentFromFile( + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); vc.clear(); } @@ -179,8 +178,7 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker vc.clear(); ReadContentFromFile( - vc, - log_dir_ + "/export_events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); for (size_t i = 0; i < max_export_events_on_buffer; i++) { json export_event_as_json = json::parse(vc[i]); diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index ea78bf5b4ba3..7e6684333685 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -58,7 +58,6 @@ class TaskEventBufferTest : public ::testing::Test { virtual void TearDown() { if (task_event_buffer_) task_event_buffer_->Stop(); - std::filesystem::remove_all(log_dir_.c_str()); }; std::vector GenTaskIDs(size_t num_tasks) { @@ -129,7 +128,6 @@ class TaskEventBufferTest : public ::testing::Test { } std::unique_ptr task_event_buffer_ = nullptr; - std::string log_dir_ = "event_123"; }; class TaskEventBufferTestManualStart : public TaskEventBufferTest { @@ -180,24 +178,6 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest { } }; -class TaskEventTestWriteExport : public TaskEventBufferTest { - public: - TaskEventTestWriteExport() : TaskEventBufferTest() { - RayConfig::instance().initialize( - R"( -{ - "task_events_report_interval_ms": 1000, - "task_events_max_num_status_events_buffer_on_worker": 10, - "task_events_max_num_profile_events_buffer_on_worker": 5, - "task_events_send_batch_size": 100, - "export_task_events_write_batch_size": 1, - "task_events_max_num_export_status_events_buffer_on_worker": 15, - "enable_export_api_write": true -} - )"); - } -}; - void ReadContentFromFile(std::vector &vc, std::string log_file, std::string filter = "") { @@ -287,107 +267,6 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); } -TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { - /* - Test writing task events to event_EXPORT_TASK_123.log as part of the export API. - This test verifies the following cases: - 1. Task events that are dropped from being sent to GCS (because more events - added than task_events_max_num_status_events_buffer_on_worker) will still - be written for the export API, if the number of events is less than - task_events_max_num_export_status_events_buffer_on_worker. - 2. Task events over task_events_max_num_status_events_buffer_on_worker - will be dropped and not written for the export API. - 3. Each Flush() call only writes a max of export_task_events_write_batch_size - export events. - In this test, 20 events are added which is greater than the max of 10 status events - that can be stored in buffer. The max export status events in buffer is 15, - so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush() - call will write 1 new event because the batch size is 1. - */ - - // {"task_events_max_num_status_events_buffer_on_worker": 10} and - // {"task_events_max_num_export_status_events_buffer_on_worker": 15} - // in TaskEventTestWriteExport so set num_events > 15. - size_t num_events = 20; - // Value of export_task_events_write_batch_size - size_t batch_size = 1; - // Value of task_events_max_num_export_status_events_buffer_on_worker - size_t max_export_events_on_buffer = 15; - auto task_ids = GenTaskIDs(num_events); - google::protobuf::util::JsonPrintOptions options; - options.preserve_proto_field_names = true; - options.always_print_primitive_fields = true; - - std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - - std::vector> task_events; - for (const auto &task_id : task_ids) { - task_events.push_back(GenStatusTaskEvent(task_id, 0)); - } - - // Convert all task_events that were added with AddTaskEvent - // to the ExportTaskEventData proto message - std::vector> task_event_data_protos; - for (const auto &task_event : task_events) { - std::shared_ptr event = - std::make_shared(); - task_event->ToRpcTaskExportEvents(event); - task_event_data_protos.push_back(event); - } - - // Add all num_events tasks - for (auto &task_event : task_events) { - task_event_buffer_->AddTaskEvent(std::move(task_event)); - } - - // Verify that batch_size events are being written for each flush - std::vector vc; - for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { - task_event_buffer_->FlushEvents(true); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); - vc.clear(); - } - - // Verify that all max_export_events_on_buffer events are written to file even though - // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker - vc.clear(); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); - for (size_t i = 0; i < max_export_events_on_buffer; i++) { - json export_event_as_json = json::parse(vc[i]); - EXPECT_EQ(export_event_as_json["source_type"].get(), "EXPORT_TASK"); - EXPECT_EQ(export_event_as_json.contains("event_id"), true); - EXPECT_EQ(export_event_as_json.contains("timestamp"), true); - EXPECT_EQ(export_event_as_json.contains("event_data"), true); - - json event_data = export_event_as_json["event_data"].get(); - - // The events written are the last max_export_events_on_buffer added - // in AddTaskEvent because (num_events-max_export_events_on_buffer) - // were dropped. - std::string expected_event_data_str; - RAY_CHECK(google::protobuf::util::MessageToJsonString( - *task_event_data_protos[i + (num_events - max_export_events_on_buffer)], - &expected_event_data_str, - options) - .ok()); - json expected_event_data = json::parse(expected_event_data_str); - EXPECT_EQ(event_data, expected_event_data); - } - - // Expect no more events. - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); -} - TEST_F(TaskEventBufferTest, TestFailedFlush) { size_t num_status_events = 20; size_t num_profile_events = 20;