From e00eb0d140f82a8acf0d37f200f8c0256d5b977e Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 8 Dec 2022 10:13:55 -0800 Subject: [PATCH] Events drop count (#30953) Signed-off-by: tmynn --- src/ray/core_worker/task_event_buffer.cc | 36 +++- src/ray/core_worker/task_event_buffer.h | 17 +- .../test/task_event_buffer_test.cc | 170 ++++++++++++------ src/ray/protobuf/gcs.proto | 6 +- 4 files changed, 159 insertions(+), 70 deletions(-) diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 32f603a5179d..10bc43028b94 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -102,9 +102,14 @@ void TaskEventBufferImpl::AddTaskEvent(rpc::TaskEvents task_events) { auto limit = RayConfig::instance().task_events_max_num_task_events_in_buffer(); if (limit > 0 && buffer_.size() >= static_cast(limit)) { // Too many task events, start overriding older ones. + if (buffer_[next_idx_to_overwrite_].has_profile_events()) { + num_profile_task_events_dropped_++; + } else { + num_status_task_events_dropped_++; + } + buffer_[next_idx_to_overwrite_] = std::move(task_events); next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % limit; - num_task_events_dropped_++; return; } buffer_.push_back(std::move(task_events)); @@ -115,7 +120,8 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { return; } std::vector task_events; - size_t num_task_events_dropped = 0; + size_t num_status_task_events_dropped = 0; + size_t num_profile_task_events_dropped = 0; { absl::MutexLock lock(&mutex_); @@ -123,7 +129,8 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { << "Pushed task state events to GCS. [total_bytes=" << (1.0 * total_events_bytes_) / 1024 / 1024 << "MiB][total_count=" << total_num_events_ - << "][total_task_events_dropped=" << num_task_events_dropped_ + << "][total_status_task_events_dropped=" << num_status_task_events_dropped_ + << "][total_profile_task_events_dropped=" << num_profile_task_events_dropped_ << "][cur_buffer_size=" << buffer_.size() << "]."; // Skip if GCS hasn't finished processing the previous message. @@ -144,13 +151,25 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { RayConfig::instance().task_events_max_num_task_events_in_buffer()); buffer_.swap(task_events); next_idx_to_overwrite_ = 0; - num_task_events_dropped = num_task_events_dropped_; - num_task_events_dropped_ = 0; + + num_profile_task_events_dropped = num_profile_task_events_dropped_; + num_profile_task_events_dropped_ = 0; + + num_status_task_events_dropped = num_status_task_events_dropped_; + num_status_task_events_dropped_ = 0; } // Merge multiple events from a single task attempt run into one task event. absl::flat_hash_map, rpc::TaskEvents> task_events_map; + + size_t num_profile_event_to_send = 0; + size_t num_status_event_to_send = 0; for (auto event : task_events) { + if (event.has_profile_events()) { + num_profile_event_to_send++; + } else { + num_status_event_to_send++; + } auto &task_events_itr = task_events_map[std::make_pair(event.task_id(), event.attempt_number())]; task_events_itr.MergeFrom(event); @@ -158,7 +177,9 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { // Convert to rpc::TaskEventsData auto data = std::make_unique(); - data->set_num_task_events_dropped(num_task_events_dropped); + data->set_num_profile_task_events_dropped(num_profile_task_events_dropped); + data->set_num_status_task_events_dropped(num_status_task_events_dropped); + auto num_task_events = task_events_map.size(); for (auto itr : task_events_map) { auto events_by_task = data->add_events_by_task(); @@ -198,7 +219,8 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { grpc_in_progress_ = false; // Fail to send, currently dropping events. - num_task_events_dropped_ += num_task_events; + num_status_task_events_dropped_ += num_status_event_to_send; + num_profile_task_events_dropped_ += num_profile_event_to_send; } } } diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index d45815db50ff..145b8908e225 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -126,9 +126,15 @@ class TaskEventBufferImpl : public TaskEventBuffer { } /// Test only functions. - size_t GetNumTaskEventsDropped() LOCKS_EXCLUDED(mutex_) { + size_t GetNumStatusTaskEventsDropped() LOCKS_EXCLUDED(mutex_) { absl::MutexLock lock(&mutex_); - return num_task_events_dropped_; + return num_status_task_events_dropped_; + } + + /// Test only functions. + size_t GetNumProfileTaskEventsDropped() LOCKS_EXCLUDED(mutex_) { + absl::MutexLock lock(&mutex_); + return num_profile_task_events_dropped_; } /// Test only functions. @@ -164,8 +170,11 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// A iterator into buffer_ that determines which element to be overwritten. size_t next_idx_to_overwrite_ GUARDED_BY(mutex_) = 0; - /// Number of task events dropped since the last report flush. - size_t num_task_events_dropped_ GUARDED_BY(mutex_) = 0; + /// Number of profile task events dropped since the last report flush. + size_t num_profile_task_events_dropped_ GUARDED_BY(mutex_) = 0; + + /// Number of status task events dropped since the last report flush. + size_t num_status_task_events_dropped_ GUARDED_BY(mutex_) = 0; /// True if there's a pending gRPC call. It's a simple way to prevent overloading /// GCS with too many calls. There is no point sending more events if GCS could not diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index 41aaeddae0ab..3a4d116daaec 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -58,13 +58,32 @@ class TaskEventBufferTest : public ::testing::Test { return task_ids; } - rpc::TaskEvents GenTaskEvents(TaskID task_id, uint64_t attempt_num) { + rpc::TaskEvents GenStatusTaskEvents(TaskID task_id, + uint64_t attempt_num, + int64_t running_ts = 1) { rpc::TaskEvents task_events; task_events.set_task_id(task_id.Binary()); task_events.set_attempt_number(attempt_num); + auto status_update = task_events.mutable_state_updates(); + status_update->set_running_ts(running_ts); return task_events; } + rpc::TaskEvents GenProfileTaskEvents(TaskID task_id, uint64_t attempt_num) { + rpc::TaskEvents task_events; + task_events.set_task_id(task_id.Binary()); + task_events.set_attempt_number(attempt_num); + auto profile_events = task_events.mutable_profile_events(); + auto event = profile_events->add_events(); + event->set_event_name("test_event"); + + return task_events; + } + + static bool SortTaskEvents(const rpc::TaskEvents &a, const rpc::TaskEvents &b) { + return a.task_id() < b.task_id() || a.attempt_number() < b.attempt_number(); + } + std::unique_ptr task_event_buffer_ = nullptr; }; @@ -97,11 +116,12 @@ TEST_F(TaskEventBufferTest, TestAddEvent) { // Test add status event auto task_id_1 = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id_1, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id_1, 0)); ASSERT_EQ(task_event_buffer_->GetAllTaskEvents().size(), 1); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id_1, 1)); + // Test add profile events + task_event_buffer_->AddTaskEvent(GenProfileTaskEvents(task_id_1, 1)); ASSERT_EQ(task_event_buffer_->GetAllTaskEvents().size(), 2); } @@ -109,8 +129,13 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { size_t num_events = 20; auto task_ids = GenTaskIDs(num_events); + std::vector task_events; for (const auto &task_id : task_ids) { - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + task_events.push_back(GenStatusTaskEvents(task_id, 0)); + } + + for (const auto &task_event : task_events) { + task_event_buffer_->AddTaskEvent(task_event); } ASSERT_EQ(task_event_buffer_->GetAllTaskEvents().size(), num_events); @@ -122,21 +147,26 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { // Expect data flushed match rpc::TaskEventData expected_data; - expected_data.set_num_task_events_dropped(0); - for (const auto &task_id : task_ids) { + expected_data.set_num_profile_task_events_dropped(0); + expected_data.set_num_status_task_events_dropped(0); + for (const auto &task_event : task_events) { auto event = expected_data.add_events_by_task(); - event->set_task_id(task_id.Binary()); - event->set_attempt_number(0); + event->CopyFrom(task_event); } EXPECT_CALL(*task_gcs_accessor, AsyncAddTaskEventData(_, _)) .WillOnce([&](std::unique_ptr actual_data, ray::gcs::StatusCallback callback) { - if (google::protobuf::util::MessageDifferencer::Equals(*actual_data, - expected_data)) { - return Status::OK(); - } - return Status::UnknownError(""); + // Sort and compare + std::sort(actual_data->mutable_events_by_task()->begin(), + actual_data->mutable_events_by_task()->end(), + SortTaskEvents); + std::sort(expected_data.mutable_events_by_task()->begin(), + expected_data.mutable_events_by_task()->end(), + SortTaskEvents); + EXPECT_TRUE(google::protobuf::util::MessageDifferencer::Equals(*actual_data, + expected_data)); + return Status::OK(); }); task_event_buffer_->FlushEvents(false); @@ -146,11 +176,16 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { } TEST_F(TaskEventBufferTest, TestFailedFlush) { - size_t num_events = 20; + size_t num_status_events = 20; + size_t num_profile_events = 20; // Adding some events - for (size_t i = 0; i < num_events; ++i) { + for (size_t i = 0; i < num_status_events + num_profile_events; ++i) { auto task_id = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + if (i % 2 == 0) { + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 0)); + } else { + task_event_buffer_->AddTaskEvent(GenProfileTaskEvents(task_id, 0)); + } } auto task_gcs_accessor = @@ -167,17 +202,23 @@ TEST_F(TaskEventBufferTest, TestFailedFlush) { task_event_buffer_->FlushEvents(false); // Expect the number of dropped events incremented. - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsDropped(), num_events); + ASSERT_EQ(task_event_buffer_->GetNumStatusTaskEventsDropped(), num_status_events); + ASSERT_EQ(task_event_buffer_->GetNumProfileTaskEventsDropped(), num_profile_events); // Adding some more events - for (size_t i = 0; i < num_events; ++i) { + for (size_t i = 0; i < num_status_events + num_profile_events; ++i) { auto task_id = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + if (i % 2 == 0) { + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 1)); + } else { + task_event_buffer_->AddTaskEvent(GenProfileTaskEvents(task_id, 1)); + } } // Flush successfully will reset the num events dropped. task_event_buffer_->FlushEvents(false); - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsDropped(), 0); + ASSERT_EQ(task_event_buffer_->GetNumStatusTaskEventsDropped(), 0); + ASSERT_EQ(task_event_buffer_->GetNumProfileTaskEventsDropped(), 0); } TEST_F(TaskEventBufferTest, TestBackPressure) { @@ -185,7 +226,7 @@ TEST_F(TaskEventBufferTest, TestBackPressure) { // Adding some events for (size_t i = 0; i < num_events; ++i) { auto task_id = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 0)); } auto task_gcs_accessor = @@ -197,11 +238,11 @@ TEST_F(TaskEventBufferTest, TestBackPressure) { task_event_buffer_->FlushEvents(false); auto task_id_1 = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id_1, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id_1, 0)); task_event_buffer_->FlushEvents(false); auto task_id_2 = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id_2, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id_2, 0)); task_event_buffer_->FlushEvents(false); } @@ -210,7 +251,7 @@ TEST_F(TaskEventBufferTest, TestForcedFlush) { // Adding some events for (size_t i = 0; i < num_events; ++i) { auto task_id = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 0)); } auto task_gcs_accessor = @@ -221,69 +262,84 @@ TEST_F(TaskEventBufferTest, TestForcedFlush) { EXPECT_CALL(*task_gcs_accessor, AsyncAddTaskEventData).Times(2); auto task_id_1 = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id_1, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id_1, 0)); task_event_buffer_->FlushEvents(false); auto task_id_2 = RandomTaskId(); - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id_2, 0)); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id_2, 0)); task_event_buffer_->FlushEvents(true); } TEST_F(TaskEventBufferTest, TestBufferSizeLimit) { size_t num_limit = 100; // Synced with test setup - size_t num_batch1 = 100; - size_t num_batch2 = 100; - - auto task_ids1 = GenTaskIDs(num_batch1); - auto task_ids2 = GenTaskIDs(num_batch2); - - // Adding them - for (auto &task_id : task_ids1) { - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + size_t num_profile = 50; + size_t num_status = 50; + + // Generate 2 batches of events each, where batch 1 will be evicted by batch 2. + std::vector profile_events_1; + std::vector status_events_1; + std::vector profile_events_2; + std::vector status_events_2; + + // Generate data + for (size_t i = 0; i < 50; ++i) { + status_events_1.push_back(GenStatusTaskEvents(RandomTaskId(), 0)); + status_events_2.push_back(GenStatusTaskEvents(RandomTaskId(), 0)); + profile_events_1.push_back(GenProfileTaskEvents(RandomTaskId(), 0)); + profile_events_2.push_back(GenProfileTaskEvents(RandomTaskId(), 0)); } - for (auto &task_id : task_ids2) { - task_event_buffer_->AddTaskEvent(GenTaskEvents(task_id, 0)); + auto data = {profile_events_1, status_events_1, profile_events_2, status_events_2}; + for (auto &events : data) { + for (auto &event : events) { + task_event_buffer_->AddTaskEvent(event); + } } - // Expect only limit. + // Expect only limit in buffer. ASSERT_EQ(task_event_buffer_->GetAllTaskEvents().size(), num_limit); - // Expect task events match. - std::unordered_set task_ids2_set(task_ids2.begin(), task_ids2.end()); - for (auto &task_event : task_event_buffer_->GetAllTaskEvents()) { - auto task_id = TaskID::FromBinary(task_event.task_id()); - EXPECT_EQ(task_ids2_set.count(task_id), 1); - } - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsDropped(), num_batch1); - - // Expect the reported data to contain number of dropped events. + // Expect the reported data to match. auto task_gcs_accessor = static_cast(task_event_buffer_->GetGcsClient()) ->mock_task_accessor; rpc::TaskEventData expected_data; - expected_data.set_num_task_events_dropped(num_batch1); - for (const auto &task_id : task_ids2) { - auto event = expected_data.add_events_by_task(); - event->set_task_id(task_id.Binary()); - event->set_attempt_number(0); + expected_data.set_num_profile_task_events_dropped(num_profile); + expected_data.set_num_status_task_events_dropped(num_status); + for (const auto &event : profile_events_2) { + auto expect_event = expected_data.add_events_by_task(); + expect_event->CopyFrom(event); + } + for (const auto &event : status_events_2) { + auto expect_event = expected_data.add_events_by_task(); + expect_event->CopyFrom(event); } EXPECT_CALL(*task_gcs_accessor, AsyncAddTaskEventData(_, _)) .WillOnce([&](std::unique_ptr actual_data, ray::gcs::StatusCallback callback) { - if (google::protobuf::util::MessageDifferencer::Equals(*actual_data, - expected_data)) { - return Status::OK(); - } - return Status::UnknownError(""); + // Sort and compare + std::sort(actual_data->mutable_events_by_task()->begin(), + actual_data->mutable_events_by_task()->end(), + SortTaskEvents); + std::sort(expected_data.mutable_events_by_task()->begin(), + expected_data.mutable_events_by_task()->end(), + SortTaskEvents); + + EXPECT_TRUE(google::protobuf::util::MessageDifferencer::Equals(*actual_data, + expected_data)); + return Status::OK(); }); + ASSERT_EQ(task_event_buffer_->GetNumProfileTaskEventsDropped(), num_profile); + ASSERT_EQ(task_event_buffer_->GetNumStatusTaskEventsDropped(), num_status); task_event_buffer_->FlushEvents(false); // Expect data flushed. ASSERT_EQ(task_event_buffer_->GetAllTaskEvents().size(), 0); + ASSERT_EQ(task_event_buffer_->GetNumProfileTaskEventsDropped(), 0); + ASSERT_EQ(task_event_buffer_->GetNumStatusTaskEventsDropped(), 0); } } // namespace worker diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index d3cd002c43d8..b967485d1d66 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -263,8 +263,10 @@ message TaskEvents { message TaskEventData { // A batch of task state change events. repeated TaskEvents events_by_task = 1; - // Number of dropped task events due to buffer size limit on workers. - int32 num_task_events_dropped = 2; + // Number of dropped profile task events due to buffer size limit on workers. + int32 num_profile_task_events_dropped = 3; + // Number of dropped status task events due to buffer size limit on workers. + int32 num_status_task_events_dropped = 4; } message ResourceTableData {