Skip to content

Commit

Permalink
Fix windows://:task_event_buffer_test (ray-project#47577)
Browse files Browse the repository at this point in the history
Move TestWriteTaskExportEvents to a separate file and skip on Windows. This is ok for the export API feature because we currently aren't supporting on Windows (tests for other resource events written from GCS are also skipped on Windows).
This test is failing in postmerge (CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523) for Windows due to unknown file: error: C++ exception with description "remove_all: The process cannot access the file because it is being used by another process.: "event_123"" thrown in TearDown(). in the tear down step.
This is the same error raised for other tests that clean up created directories with remove_all() in Windows (eg: //src/ray/util/tests:event_test). These tests are also skipped on Windows.

Signed-off-by: Nikita Vemuri <[email protected]>
Co-authored-by: Nikita Vemuri <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
2 people authored and ujjawal-khare committed Oct 15, 2024
1 parent 197e784 commit 6d9bef9
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) {
std::vector<std::string> vc;
for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) {
task_event_buffer_->FlushEvents(true);
ReadContentFromFile(vc,
log_dir_ + "/export_events/event_EXPORT_TASK_" +
std::to_string(getpid()) + ".log");
ReadContentFromFile(
vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
EXPECT_EQ((int)vc.size(), (i + 1) * batch_size);
vc.clear();
}
Expand All @@ -179,8 +178,7 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) {
// max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker
vc.clear();
ReadContentFromFile(
vc,
log_dir_ + "/export_events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
EXPECT_EQ((int)vc.size(), max_export_events_on_buffer);
for (size_t i = 0; i < max_export_events_on_buffer; i++) {
json export_event_as_json = json::parse(vc[i]);
Expand Down
121 changes: 0 additions & 121 deletions src/ray/core_worker/test/task_event_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class TaskEventBufferTest : public ::testing::Test {

virtual void TearDown() {
if (task_event_buffer_) task_event_buffer_->Stop();
std::filesystem::remove_all(log_dir_.c_str());
};

std::vector<TaskID> GenTaskIDs(size_t num_tasks) {
Expand Down Expand Up @@ -129,7 +128,6 @@ class TaskEventBufferTest : public ::testing::Test {
}

std::unique_ptr<TaskEventBufferImpl> task_event_buffer_ = nullptr;
std::string log_dir_ = "event_123";
};

class TaskEventBufferTestManualStart : public TaskEventBufferTest {
Expand Down Expand Up @@ -180,24 +178,6 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest {
}
};

class TaskEventTestWriteExport : public TaskEventBufferTest {
public:
TaskEventTestWriteExport() : TaskEventBufferTest() {
RayConfig::instance().initialize(
R"(
{
"task_events_report_interval_ms": 1000,
"task_events_max_num_status_events_buffer_on_worker": 10,
"task_events_max_num_profile_events_buffer_on_worker": 5,
"task_events_send_batch_size": 100,
"export_task_events_write_batch_size": 1,
"task_events_max_num_export_status_events_buffer_on_worker": 15,
"enable_export_api_write": true
}
)");
}
};

void ReadContentFromFile(std::vector<std::string> &vc,
std::string log_file,
std::string filter = "") {
Expand Down Expand Up @@ -287,107 +267,6 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) {
ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0);
}

TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) {
/*
Test writing task events to event_EXPORT_TASK_123.log as part of the export API.
This test verifies the following cases:
1. Task events that are dropped from being sent to GCS (because more events
added than task_events_max_num_status_events_buffer_on_worker) will still
be written for the export API, if the number of events is less than
task_events_max_num_export_status_events_buffer_on_worker.
2. Task events over task_events_max_num_status_events_buffer_on_worker
will be dropped and not written for the export API.
3. Each Flush() call only writes a max of export_task_events_write_batch_size
export events.
In this test, 20 events are added which is greater than the max of 10 status events
that can be stored in buffer. The max export status events in buffer is 15,
so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush()
call will write 1 new event because the batch size is 1.
*/

// {"task_events_max_num_status_events_buffer_on_worker": 10} and
// {"task_events_max_num_export_status_events_buffer_on_worker": 15}
// in TaskEventTestWriteExport so set num_events > 15.
size_t num_events = 20;
// Value of export_task_events_write_batch_size
size_t batch_size = 1;
// Value of task_events_max_num_export_status_events_buffer_on_worker
size_t max_export_events_on_buffer = 15;
auto task_ids = GenTaskIDs(num_events);
google::protobuf::util::JsonPrintOptions options;
options.preserve_proto_field_names = true;
options.always_print_primitive_fields = true;

std::vector<SourceTypeVariant> source_types = {
rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
RayEventInit_(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir_,
"warning",
false);

std::vector<std::unique_ptr<TaskEvent>> task_events;
for (const auto &task_id : task_ids) {
task_events.push_back(GenStatusTaskEvent(task_id, 0));
}

// Convert all task_events that were added with AddTaskEvent
// to the ExportTaskEventData proto message
std::vector<std::shared_ptr<rpc::ExportTaskEventData>> task_event_data_protos;
for (const auto &task_event : task_events) {
std::shared_ptr<rpc::ExportTaskEventData> event =
std::make_shared<rpc::ExportTaskEventData>();
task_event->ToRpcTaskExportEvents(event);
task_event_data_protos.push_back(event);
}

// Add all num_events tasks
for (auto &task_event : task_events) {
task_event_buffer_->AddTaskEvent(std::move(task_event));
}

// Verify that batch_size events are being written for each flush
std::vector<std::string> vc;
for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) {
task_event_buffer_->FlushEvents(true);
ReadContentFromFile(
vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
EXPECT_EQ((int)vc.size(), (i + 1) * batch_size);
vc.clear();
}

// Verify that all max_export_events_on_buffer events are written to file even though
// max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker
vc.clear();
ReadContentFromFile(
vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
EXPECT_EQ((int)vc.size(), max_export_events_on_buffer);
for (size_t i = 0; i < max_export_events_on_buffer; i++) {
json export_event_as_json = json::parse(vc[i]);
EXPECT_EQ(export_event_as_json["source_type"].get<std::string>(), "EXPORT_TASK");
EXPECT_EQ(export_event_as_json.contains("event_id"), true);
EXPECT_EQ(export_event_as_json.contains("timestamp"), true);
EXPECT_EQ(export_event_as_json.contains("event_data"), true);

json event_data = export_event_as_json["event_data"].get<json>();

// The events written are the last max_export_events_on_buffer added
// in AddTaskEvent because (num_events-max_export_events_on_buffer)
// were dropped.
std::string expected_event_data_str;
RAY_CHECK(google::protobuf::util::MessageToJsonString(
*task_event_data_protos[i + (num_events - max_export_events_on_buffer)],
&expected_event_data_str,
options)
.ok());
json expected_event_data = json::parse(expected_event_data_str);
EXPECT_EQ(event_data, expected_event_data);
}

// Expect no more events.
ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0);
}

TEST_F(TaskEventBufferTest, TestFailedFlush) {
size_t num_status_events = 20;
size_t num_profile_events = 20;
Expand Down

0 comments on commit 6d9bef9

Please sign in to comment.