diff --git a/src/mock/ray/gcs/gcs_server/gcs_task_manager.h b/src/mock/ray/gcs/gcs_server/gcs_task_manager.h index 58df0d350160..67601dfd56a7 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_task_manager.h +++ b/src/mock/ray/gcs/gcs_server/gcs_task_manager.h @@ -25,6 +25,13 @@ class MockGcsTaskManager : public GcsTaskManager { rpc::AddTaskEventDataReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); + + MOCK_METHOD(void, + HandleGetTaskEvents, + (rpc::GetTaskEventsRequest request, + rpc::GetTaskEventsReply *reply, + rpc::SendReplyCallback send_reply_callback), + (override)); }; } // namespace gcs diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 503a9d6736b3..7e6ddbac14d4 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2276,6 +2276,8 @@ Status CoreWorker::ExecuteTask( rpc::TaskEvents task_event; task_event.set_task_id(task_spec.TaskId().Binary()); task_event.set_attempt_number(task_spec.AttemptNumber()); + task_event.set_job_id(task_spec.JobId().Binary()); + auto state_updates = task_event.mutable_state_updates(); state_updates->set_running_ts(absl::GetCurrentTimeNanos()); task_event_buffer_->AddTaskEvent(std::move(task_event)); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 057f6d39e9cc..42328cc1b74f 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -887,6 +887,7 @@ void TaskManager::RecordTaskStatusEvent(const TaskEntry &task_entry, // Make task event rpc::TaskEvents task_event; task_event.set_task_id(task_entry.spec.TaskId().Binary()); + task_event.set_job_id(task_entry.spec.JobId().Binary()); task_event.set_attempt_number(task_entry.spec.AttemptNumber()); auto state_updates = task_event.mutable_state_updates(); switch (status) { diff --git a/src/ray/gcs/gcs_server/gcs_task_manager.cc b/src/ray/gcs/gcs_server/gcs_task_manager.cc index 7ffdf9353e0c..ef6250553401 100644 --- a/src/ray/gcs/gcs_server/gcs_task_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_task_manager.cc @@ -27,10 +27,53 @@ void GcsTaskManager::Stop() { } } +std::vector GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents() + const { + return task_events_; +} + +std::vector GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents( + JobID job_id) const { + auto task_attempts_itr = job_to_task_attempt_index_.find(job_id); + if (task_attempts_itr == job_to_task_attempt_index_.end()) { + // Not found any tasks related to this job. + return {}; + } + return GetTaskEvents(task_attempts_itr->second); +} + +std::vector GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents( + const absl::flat_hash_set &task_ids) const { + absl::flat_hash_set select_task_attempts; + for (const auto &task_id : task_ids) { + auto task_attempts_itr = task_to_task_attempt_index_.find(task_id); + if (task_attempts_itr != task_to_task_attempt_index_.end()) { + select_task_attempts.insert(task_attempts_itr->second.begin(), + task_attempts_itr->second.end()); + } + } + + return GetTaskEvents(select_task_attempts); +} + +std::vector GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents( + const absl::flat_hash_set &task_attempts) const { + std::vector result; + for (const auto &task_attempt : task_attempts) { + auto idx_itr = task_attempt_index_.find(task_attempt); + if (idx_itr != task_attempt_index_.end()) { + result.push_back(task_events_.at(idx_itr->second)); + } + } + + return result; +} + absl::optional GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent( rpc::TaskEvents &&events_by_task) { TaskID task_id = TaskID::FromBinary(events_by_task.task_id()); + JobID job_id = JobID::FromBinary(events_by_task.job_id()); int32_t attempt_number = events_by_task.attempt_number(); TaskAttempt task_attempt = std::make_pair<>(task_id, attempt_number); @@ -53,6 +96,8 @@ GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent( // A new task event, add to storage and index. // If limit enforced, replace one. + // TODO(rickyx): Optimize this to per job limit with bounded FIFO map. + // https://github.com/ray-project/ray/issues/31071 if (max_num_task_events_ > 0 && task_events_.size() >= max_num_task_events_) { RAY_LOG_EVERY_MS(WARNING, 10000) << "Max number of tasks event (" << max_num_task_events_ @@ -68,12 +113,30 @@ GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent( std::swap(to_replaced, events_by_task); auto replaced = std::move(events_by_task); - // Update index. + // Update task_attempt -> buffer index mapping. TaskAttempt replaced_attempt = std::make_pair<>( TaskID::FromBinary(replaced.task_id()), replaced.attempt_number()); + + // Update task attempt -> idx mapping. task_attempt_index_.erase(replaced_attempt); task_attempt_index_[task_attempt] = next_idx_to_overwrite_; + // Update the job -> task attempt mapping. + auto replaced_job_id = JobID::FromBinary(replaced.job_id()); + job_to_task_attempt_index_[job_id].erase(replaced_attempt); + if (job_to_task_attempt_index_[replaced_job_id].empty()) { + job_to_task_attempt_index_.erase(replaced_job_id); + } + job_to_task_attempt_index_[job_id].insert(task_attempt); + + // Update the task -> task attempt mapping. + auto replaced_task_id = TaskID::FromBinary(replaced.task_id()); + task_to_task_attempt_index_[replaced_task_id].erase(replaced_attempt); + if (task_to_task_attempt_index_[replaced_task_id].empty()) { + task_to_task_attempt_index_.erase(replaced_task_id); + } + task_to_task_attempt_index_[task_id].insert(task_attempt); + // Update iter. next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % max_num_task_events_; @@ -82,11 +145,46 @@ GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent( // Add to index. task_attempt_index_[task_attempt] = task_events_.size(); + job_to_task_attempt_index_[job_id].insert(task_attempt); + task_to_task_attempt_index_[task_id].insert(task_attempt); // Add a new task events. task_events_.push_back(std::move(events_by_task)); return absl::nullopt; } +void GcsTaskManager::HandleGetTaskEvents(rpc::GetTaskEventsRequest request, + rpc::GetTaskEventsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_LOG(DEBUG) << "Getting task status:" << request.ShortDebugString(); + absl::MutexLock lock(&mutex_); + + // Select candidate events by indexing. + std::vector task_events; + if (request.has_task_ids()) { + absl::flat_hash_set task_ids; + for (const auto &task_id_str : request.task_ids().vals()) { + task_ids.insert(TaskID::FromBinary(task_id_str)); + } + task_events = task_event_storage_->GetTaskEvents(task_ids); + } else if (request.has_job_id()) { + task_events = task_event_storage_->GetTaskEvents(JobID::FromBinary(request.job_id())); + } else { + task_events = task_event_storage_->GetTaskEvents(); + } + + // Populate reply. + for (auto &task_event : task_events) { + auto events = reply->add_events_by_task(); + events->Swap(&task_event); + } + + reply->set_num_profile_task_events_dropped(total_num_profile_task_events_dropped_); + reply->set_num_status_task_events_dropped(total_num_status_task_events_dropped_); + + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + return; +} + void GcsTaskManager::HandleAddTaskEventData(rpc::AddTaskEventDataRequest request, rpc::AddTaskEventDataReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/gcs/gcs_server/gcs_task_manager.h b/src/ray/gcs/gcs_server/gcs_task_manager.h index bc21f857839e..2e3fa8738742 100644 --- a/src/ray/gcs/gcs_server/gcs_task_manager.h +++ b/src/ray/gcs/gcs_server/gcs_task_manager.h @@ -60,6 +60,16 @@ class GcsTaskManager : public rpc::TaskInfoHandler { rpc::SendReplyCallback send_reply_callback) LOCKS_EXCLUDED(mutex_) override; + /// Handle GetTaskEvent request. + /// + /// \param request gRPC Request. + /// \param reply gRPC Reply. + /// \param send_reply_callback Callback to invoke when sending reply. + void HandleGetTaskEvents(rpc::GetTaskEventsRequest request, + rpc::GetTaskEventsReply *reply, + rpc::SendReplyCallback send_reply_callback) + LOCKS_EXCLUDED(mutex_) override; + /// Stops the event loop and the thread of the task event handler. /// /// After this is called, no more requests will be handled. @@ -111,13 +121,30 @@ class GcsTaskManager : public rpc::TaskInfoHandler { /// replaced task event. absl::optional AddOrReplaceTaskEvent(rpc::TaskEvents &&task_event); - /// Get task events. + /// Get task events from job. + /// + /// \param job_id Job ID to filter task events. + /// \return task events of `job_id`. + std::vector GetTaskEvents(JobID job_id) const; + + /// Get all task events. + /// + /// \return all task events stored. + std::vector GetTaskEvents() const; + + /// Get task events from tasks corresponding to `task_ids`. /// - /// \param job_id Getting task events from this `job_id` only if not nullopt. - /// Otherwise all task events will be returned. - /// \return A vector of task events. + /// \param task_ids Task ids of the tasks. + /// \return task events from the `task_ids`. std::vector GetTaskEvents( - absl::optional job_id = absl::nullopt) = delete; + const absl::flat_hash_set &task_ids) const; + + /// Get task events of task attempt. + /// + /// \param task_attempts Task attempts (task ids + attempt number). + /// \return task events from the `task_attempts`. + std::vector GetTaskEvents( + const absl::flat_hash_set &task_attempts) const; /// Get the number of task events stored. size_t GetTaskEventsCount() const { return task_events_.size(); } @@ -128,15 +155,25 @@ class GcsTaskManager : public rpc::TaskInfoHandler { /// Max number of task events allowed in the storage. const size_t max_num_task_events_ = 0; - /// Current task events stored. - std::vector task_events_; - /// A iterator into task_events_ that determines which element to be overwritten. size_t next_idx_to_overwrite_ = 0; - /// Index from task attempt to the index of the corresponding task event. + /// TODO(rickyx): Refactor this into LRI(least recently inserted) buffer: + /// https://github.com/ray-project/ray/issues/31158 + /// Current task events stored. + std::vector task_events_; + + /// Index from task attempt to the corresponding task attempt in the buffer + /// `task_events_`. absl::flat_hash_map task_attempt_index_; + /// Secondary index from task id to task attempts. + absl::flat_hash_map> + task_to_task_attempt_index_; + /// Secondary index from job id to task attempts of the job. + absl::flat_hash_map> + job_to_task_attempt_index_; + /// Counter for tracking the size of task event. This assumes tasks events are never /// removed actively. uint64_t num_bytes_task_events_ = 0; diff --git a/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc index dafbcd6b2554..3c9a6be983cf 100644 --- a/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc @@ -80,6 +80,36 @@ class GcsTaskManagerTest : public ::testing::Test { return reply; } + rpc::GetTaskEventsReply SyncGetTaskEvents( + absl::flat_hash_set task_ids, + absl::optional job_id = absl::nullopt) { + rpc::GetTaskEventsRequest request; + rpc::GetTaskEventsReply reply; + std::promise promise; + + if (!task_ids.empty()) { + for (const auto &task_id : task_ids) { + request.mutable_task_ids()->add_vals(task_id.Binary()); + } + } + + if (job_id) { + request.set_job_id(job_id->Binary()); + } + + task_manager->HandleGetTaskEvents( + request, + &reply, + [&promise](Status, std::function, std::function) { + promise.set_value(true); + }); + + promise.get_future().get(); + + EXPECT_EQ(StatusCode(reply.status().code()), StatusCode::OK); + return reply; + } + static rpc::TaskInfoEntry GenTaskInfo(JobID job_id) { rpc::TaskInfoEntry task_info; task_info.set_job_id(job_id.Binary()); @@ -105,7 +135,8 @@ class GcsTaskManagerTest : public ::testing::Test { static std::vector GenTaskEvents( const std::vector &task_ids, - int32_t attempt_number, + int32_t attempt_number = 0, + int32_t job_id = 0, absl::optional profile_events = absl::nullopt, absl::optional state_update = absl::nullopt, absl::optional task_info = absl::nullopt) { @@ -113,6 +144,7 @@ class GcsTaskManagerTest : public ::testing::Test { for (auto const &task_id : task_ids) { rpc::TaskEvents events; events.set_task_id(task_id.Binary()); + events.set_job_id(JobID::FromInt(job_id).Binary()); events.set_attempt_number(attempt_number); if (state_update.has_value()) { @@ -195,7 +227,7 @@ TEST_F(GcsTaskManagerTest, TestMergeTaskEventsSameTaskAttempt) { int32_t attempt_number = 0; for (size_t i = 0; i < num_task_events; ++i) { auto profile_events = GenProfileEvents("event", i, i); - auto events = GenTaskEvents(task_ids, attempt_number, profile_events); + auto events = GenTaskEvents(task_ids, attempt_number, 0, profile_events); auto events_data = Mocker::GenTaskEventsData(events); auto reply = SyncAddTaskEventData(events_data); @@ -223,6 +255,144 @@ TEST_F(GcsTaskManagerTest, TestMergeTaskEventsSameTaskAttempt) { } } +TEST_F(GcsTaskManagerTest, TestGetTaskEvents) { + // Add events + size_t num_profile_events = 10; + size_t num_status_events = 20; + size_t num_both_events = 30; + size_t num_profile_task_events_dropped = 10; + size_t num_status_task_events_dropped = 20; + + std::vector events_with_profile; + std::vector events_with_status; + std::vector events_with_both; + + { + auto task_ids1 = GenTaskIDs(num_profile_events); + auto task_ids2 = GenTaskIDs(num_status_events); + auto task_ids3 = GenTaskIDs(num_both_events); + + auto profile_events = GenProfileEvents("event", /*start*/ 1, /*end*/ 1); + auto status_update = GenStateUpdate(); + + events_with_profile = + GenTaskEvents(task_ids1, /*attempt_number*/ 0, /* job_id */ 0, profile_events); + events_with_status = + GenTaskEvents(task_ids2, 0, 0, /*profile_events*/ absl::nullopt, status_update); + events_with_both = GenTaskEvents(task_ids3, 0, 0, profile_events, status_update); + + auto all_events = {events_with_profile, events_with_status, events_with_both}; + for (auto &events : all_events) { + auto data = Mocker::GenTaskEventsData(events); + SyncAddTaskEventData(data); + } + } + + { + // Add drop counter. + auto data = Mocker::GenTaskEventsData( + {}, num_profile_task_events_dropped, num_status_task_events_dropped); + SyncAddTaskEventData(data); + } + + // Test get all events + { + auto reply = SyncGetTaskEvents(/* task_ids */ {}); + // Expect all events + std::vector expected_events = + ConcatTaskEvents({events_with_status, events_with_profile, events_with_both}); + + auto expected_data = Mocker::GenTaskEventsData(expected_events); + // Expect match events + ExpectTaskEventsEq(expected_data.mutable_events_by_task(), + reply.mutable_events_by_task()); + + EXPECT_EQ(reply.num_profile_task_events_dropped(), num_profile_task_events_dropped); + EXPECT_EQ(reply.num_status_task_events_dropped(), num_status_task_events_dropped); + } +} + +TEST_F(GcsTaskManagerTest, TestGetTaskEventsByTaskIDs) { + int32_t num_events_task_1 = 10; + int32_t num_events_task_2 = 20; + + rpc::TaskEventData events_data_task1; + auto task_id1 = RandomTaskId(); + { + std::vector> all_events; + for (int32_t attempt_num = 0; attempt_num < num_events_task_1; ++attempt_num) { + all_events.push_back(GenTaskEvents({task_id1}, attempt_num)); + } + auto events_task1 = ConcatTaskEvents(all_events); + events_data_task1 = Mocker::GenTaskEventsData(events_task1); + SyncAddTaskEventData(events_data_task1); + } + + rpc::TaskEventData events_data_task2; + auto task_id2 = RandomTaskId(); + { + std::vector> all_events; + for (int32_t attempt_num = 0; attempt_num < num_events_task_2; ++attempt_num) { + all_events.push_back(GenTaskEvents({task_id2}, attempt_num)); + } + auto events_task2 = ConcatTaskEvents(all_events); + events_data_task2 = Mocker::GenTaskEventsData(events_task2); + SyncAddTaskEventData(events_data_task2); + } + + auto reply_task1 = SyncGetTaskEvents({task_id1}); + auto reply_task2 = SyncGetTaskEvents({task_id2}); + + // Check matched + ExpectTaskEventsEq(events_data_task1.mutable_events_by_task(), + reply_task1.mutable_events_by_task()); + ExpectTaskEventsEq(events_data_task2.mutable_events_by_task(), + reply_task2.mutable_events_by_task()); +} + +TEST_F(GcsTaskManagerTest, TestGetTaskEventsByJob) { + size_t num_task_job1 = 10; + size_t num_task_job2 = 20; + + rpc::TaskEventData events_data_job1; + { + auto task_ids = GenTaskIDs(num_task_job1); + auto task_info = GenTaskInfo(JobID::FromInt(1)); + auto events = GenTaskEvents(task_ids, + /* attempt_number */ 0, + /* job_id */ 1, + absl::nullopt, + absl::nullopt, + task_info); + events_data_job1 = Mocker::GenTaskEventsData(events); + SyncAddTaskEventData(events_data_job1); + } + + rpc::TaskEventData events_data_job2; + { + auto task_ids = GenTaskIDs(num_task_job2); + auto task_info = GenTaskInfo(JobID::FromInt(2)); + auto events = GenTaskEvents(task_ids, + /* attempt_number */ + 0, + /* job_id */ 2, + absl::nullopt, + absl::nullopt, + task_info); + events_data_job2 = Mocker::GenTaskEventsData(events); + SyncAddTaskEventData(events_data_job2); + } + + auto reply_job1 = SyncGetTaskEvents(/* task_ids */ {}, JobID::FromInt(1)); + auto reply_job2 = SyncGetTaskEvents({}, JobID::FromInt(2)); + + // Check matched + ExpectTaskEventsEq(events_data_job1.mutable_events_by_task(), + reply_job1.mutable_events_by_task()); + ExpectTaskEventsEq(events_data_job2.mutable_events_by_task(), + reply_job2.mutable_events_by_task()); +} + TEST_F(GcsTaskManagerMemoryLimitedTest, TestLimitTaskEvents) { size_t num_limit = 100; // synced with test config @@ -235,16 +405,21 @@ TEST_F(GcsTaskManagerMemoryLimitedTest, TestLimitTaskEvents) { size_t num_status_events_dropped_on_worker = 22; { // Add profile event. - auto events = GenTaskEvents( - GenTaskIDs(num_profile_events_to_drop), 0, GenProfileEvents("event", 1, 1)); + auto events = GenTaskEvents(GenTaskIDs(num_profile_events_to_drop), + /* attempt_number */ 0, + /* job_id */ 0, + GenProfileEvents("event", 1, 1)); auto events_data = Mocker::GenTaskEventsData(events, num_profile_events_dropped_on_worker); SyncAddTaskEventData(events_data); } { // Add status update events. - auto events = GenTaskEvents( - GenTaskIDs(num_status_events_to_drop), 0, absl::nullopt, GenStateUpdate()); + auto events = GenTaskEvents(GenTaskIDs(num_status_events_to_drop), + /* attempt_number*/ 0, + /* job_id */ 0, + /* profile_events */ absl::nullopt, + GenStateUpdate()); auto events_data = Mocker::GenTaskEventsData(events, /*num_profile_task_events_dropped*/ 0, num_status_events_dropped_on_worker); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index fa7752e6db5e..e575d6f3162c 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -244,13 +244,6 @@ message TaskStateUpdate { optional int64 failed_ts = 7; } -enum TaskEventType { - // Profile events. - PROFILE_EVENT = 0; - // Status update events. - STATUS_EVENT = 1; -} - // Represents events and state changes from a single task run. message TaskEvents { // Metadata shared by all event types. @@ -263,6 +256,8 @@ message TaskEvents { optional TaskStateUpdate state_updates = 4; // Task profiling events. optional ProfileEvents profile_events = 5; + // Job id of the task + bytes job_id = 6; } // Represents a compact list of task state events by different tasks, diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 3b0d4ddc52d9..c1fd403e8999 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -624,6 +624,7 @@ service NodeInfoGcsService { } message AddTaskEventDataRequest { + // Task event data from core worker. TaskEventData data = 1; } @@ -631,10 +632,35 @@ message AddTaskEventDataReply { GcsStatus status = 1; } +message GetTaskEventsRequest { + message TaskIDs { + repeated string vals = 1; + } + oneof select_by { + // Get task events from a job. + string job_id = 1; + // Get task events from a set of tasks. + TaskIDs task_ids = 2; + } +} + +message GetTaskEventsReply { + GcsStatus status = 1; + // Task events returned. + repeated TaskEvents events_by_task = 2; + // Number of profile events dropped at GCS and worker for the queried events. + int32 num_profile_task_events_dropped = 3; + // Number of status events dropped at GCS and worker for the queried events. + int32 num_status_task_events_dropped = 4; +} + // Service for task info access. service TaskInfoGcsService { // Add task event data to GCS. rpc AddTaskEventData(AddTaskEventDataRequest) returns (AddTaskEventDataReply); + + // Get task events. + rpc GetTaskEvents(GetTaskEventsRequest) returns (GetTaskEventsReply); } /////////////////////////////////////////////////////////////////////////////// diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 280b3f79e349..f09105d56f46 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -585,6 +585,10 @@ class TaskInfoGcsServiceHandler { virtual void HandleAddTaskEventData(AddTaskEventDataRequest request, AddTaskEventDataReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetTaskEvents(rpc::GetTaskEventsRequest request, + rpc::GetTaskEventsReply *reply, + rpc::SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `TaskInfoGcsService`. @@ -605,6 +609,7 @@ class TaskInfoGrpcService : public GrpcService { const std::unique_ptr &cq, std::vector> *server_call_factories) override { TASK_INFO_SERVICE_RPC_HANDLER(AddTaskEventData); + TASK_INFO_SERVICE_RPC_HANDLER(GetTaskEvents); } private: