Skip to content

Commit

Permalink
[core][state] Task events backend - gcs read path [3/n] (#30934)
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyyx authored and AmeerHajAli committed Jan 12, 2023
1 parent c948bcb commit 3c0619d
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 23 deletions.
7 changes: 7 additions & 0 deletions src/mock/ray/gcs/gcs_server/gcs_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ class MockGcsTaskManager : public GcsTaskManager {
rpc::AddTaskEventDataReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));

MOCK_METHOD(void,
HandleGetTaskEvents,
(rpc::GetTaskEventsRequest request,
rpc::GetTaskEventsReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
};

} // namespace gcs
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,8 @@ Status CoreWorker::ExecuteTask(
rpc::TaskEvents task_event;
task_event.set_task_id(task_spec.TaskId().Binary());
task_event.set_attempt_number(task_spec.AttemptNumber());
task_event.set_job_id(task_spec.JobId().Binary());

auto state_updates = task_event.mutable_state_updates();
state_updates->set_running_ts(absl::GetCurrentTimeNanos());
task_event_buffer_->AddTaskEvent(std::move(task_event));
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ void TaskManager::RecordTaskStatusEvent(const TaskEntry &task_entry,
// Make task event
rpc::TaskEvents task_event;
task_event.set_task_id(task_entry.spec.TaskId().Binary());
task_event.set_job_id(task_entry.spec.JobId().Binary());
task_event.set_attempt_number(task_entry.spec.AttemptNumber());
auto state_updates = task_event.mutable_state_updates();
switch (status) {
Expand Down
100 changes: 99 additions & 1 deletion src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,53 @@ void GcsTaskManager::Stop() {
}
}

std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents()
const {
return task_events_;
}

std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents(
JobID job_id) const {
auto task_attempts_itr = job_to_task_attempt_index_.find(job_id);
if (task_attempts_itr == job_to_task_attempt_index_.end()) {
// Not found any tasks related to this job.
return {};
}
return GetTaskEvents(task_attempts_itr->second);
}

std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents(
const absl::flat_hash_set<TaskID> &task_ids) const {
absl::flat_hash_set<TaskAttempt> select_task_attempts;
for (const auto &task_id : task_ids) {
auto task_attempts_itr = task_to_task_attempt_index_.find(task_id);
if (task_attempts_itr != task_to_task_attempt_index_.end()) {
select_task_attempts.insert(task_attempts_itr->second.begin(),
task_attempts_itr->second.end());
}
}

return GetTaskEvents(select_task_attempts);
}

std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents(
const absl::flat_hash_set<TaskAttempt> &task_attempts) const {
std::vector<rpc::TaskEvents> result;
for (const auto &task_attempt : task_attempts) {
auto idx_itr = task_attempt_index_.find(task_attempt);
if (idx_itr != task_attempt_index_.end()) {
result.push_back(task_events_.at(idx_itr->second));
}
}

return result;
}

absl::optional<rpc::TaskEvents>
GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent(
rpc::TaskEvents &&events_by_task) {
TaskID task_id = TaskID::FromBinary(events_by_task.task_id());
JobID job_id = JobID::FromBinary(events_by_task.job_id());
int32_t attempt_number = events_by_task.attempt_number();
TaskAttempt task_attempt = std::make_pair<>(task_id, attempt_number);

Expand All @@ -53,6 +96,8 @@ GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent(
// A new task event, add to storage and index.

// If limit enforced, replace one.
// TODO(rickyx): Optimize this to per job limit with bounded FIFO map.
// https://github.com/ray-project/ray/issues/31071
if (max_num_task_events_ > 0 && task_events_.size() >= max_num_task_events_) {
RAY_LOG_EVERY_MS(WARNING, 10000)
<< "Max number of tasks event (" << max_num_task_events_
Expand All @@ -68,12 +113,30 @@ GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent(
std::swap(to_replaced, events_by_task);
auto replaced = std::move(events_by_task);

// Update index.
// Update task_attempt -> buffer index mapping.
TaskAttempt replaced_attempt = std::make_pair<>(
TaskID::FromBinary(replaced.task_id()), replaced.attempt_number());

// Update task attempt -> idx mapping.
task_attempt_index_.erase(replaced_attempt);
task_attempt_index_[task_attempt] = next_idx_to_overwrite_;

// Update the job -> task attempt mapping.
auto replaced_job_id = JobID::FromBinary(replaced.job_id());
job_to_task_attempt_index_[job_id].erase(replaced_attempt);
if (job_to_task_attempt_index_[replaced_job_id].empty()) {
job_to_task_attempt_index_.erase(replaced_job_id);
}
job_to_task_attempt_index_[job_id].insert(task_attempt);

// Update the task -> task attempt mapping.
auto replaced_task_id = TaskID::FromBinary(replaced.task_id());
task_to_task_attempt_index_[replaced_task_id].erase(replaced_attempt);
if (task_to_task_attempt_index_[replaced_task_id].empty()) {
task_to_task_attempt_index_.erase(replaced_task_id);
}
task_to_task_attempt_index_[task_id].insert(task_attempt);

// Update iter.
next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % max_num_task_events_;

Expand All @@ -82,11 +145,46 @@ GcsTaskManager::GcsTaskManagerStorage::AddOrReplaceTaskEvent(

// Add to index.
task_attempt_index_[task_attempt] = task_events_.size();
job_to_task_attempt_index_[job_id].insert(task_attempt);
task_to_task_attempt_index_[task_id].insert(task_attempt);
// Add a new task events.
task_events_.push_back(std::move(events_by_task));
return absl::nullopt;
}

void GcsTaskManager::HandleGetTaskEvents(rpc::GetTaskEventsRequest request,
rpc::GetTaskEventsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting task status:" << request.ShortDebugString();
absl::MutexLock lock(&mutex_);

// Select candidate events by indexing.
std::vector<rpc::TaskEvents> task_events;
if (request.has_task_ids()) {
absl::flat_hash_set<TaskID> task_ids;
for (const auto &task_id_str : request.task_ids().vals()) {
task_ids.insert(TaskID::FromBinary(task_id_str));
}
task_events = task_event_storage_->GetTaskEvents(task_ids);
} else if (request.has_job_id()) {
task_events = task_event_storage_->GetTaskEvents(JobID::FromBinary(request.job_id()));
} else {
task_events = task_event_storage_->GetTaskEvents();
}

// Populate reply.
for (auto &task_event : task_events) {
auto events = reply->add_events_by_task();
events->Swap(&task_event);
}

reply->set_num_profile_task_events_dropped(total_num_profile_task_events_dropped_);
reply->set_num_status_task_events_dropped(total_num_status_task_events_dropped_);

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
return;
}

void GcsTaskManager::HandleAddTaskEventData(rpc::AddTaskEventDataRequest request,
rpc::AddTaskEventDataReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
55 changes: 46 additions & 9 deletions src/ray/gcs/gcs_server/gcs_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ class GcsTaskManager : public rpc::TaskInfoHandler {
rpc::SendReplyCallback send_reply_callback)
LOCKS_EXCLUDED(mutex_) override;

/// Handle GetTaskEvent request.
///
/// \param request gRPC Request.
/// \param reply gRPC Reply.
/// \param send_reply_callback Callback to invoke when sending reply.
void HandleGetTaskEvents(rpc::GetTaskEventsRequest request,
rpc::GetTaskEventsReply *reply,
rpc::SendReplyCallback send_reply_callback)
LOCKS_EXCLUDED(mutex_) override;

/// Stops the event loop and the thread of the task event handler.
///
/// After this is called, no more requests will be handled.
Expand Down Expand Up @@ -111,13 +121,30 @@ class GcsTaskManager : public rpc::TaskInfoHandler {
/// replaced task event.
absl::optional<rpc::TaskEvents> AddOrReplaceTaskEvent(rpc::TaskEvents &&task_event);

/// Get task events.
/// Get task events from job.
///
/// \param job_id Job ID to filter task events.
/// \return task events of `job_id`.
std::vector<rpc::TaskEvents> GetTaskEvents(JobID job_id) const;

/// Get all task events.
///
/// \return all task events stored.
std::vector<rpc::TaskEvents> GetTaskEvents() const;

/// Get task events from tasks corresponding to `task_ids`.
///
/// \param job_id Getting task events from this `job_id` only if not nullopt.
/// Otherwise all task events will be returned.
/// \return A vector of task events.
/// \param task_ids Task ids of the tasks.
/// \return task events from the `task_ids`.
std::vector<rpc::TaskEvents> GetTaskEvents(
absl::optional<JobID> job_id = absl::nullopt) = delete;
const absl::flat_hash_set<TaskID> &task_ids) const;

/// Get task events of task attempt.
///
/// \param task_attempts Task attempts (task ids + attempt number).
/// \return task events from the `task_attempts`.
std::vector<rpc::TaskEvents> GetTaskEvents(
const absl::flat_hash_set<TaskAttempt> &task_attempts) const;

/// Get the number of task events stored.
size_t GetTaskEventsCount() const { return task_events_.size(); }
Expand All @@ -128,15 +155,25 @@ class GcsTaskManager : public rpc::TaskInfoHandler {
/// Max number of task events allowed in the storage.
const size_t max_num_task_events_ = 0;

/// Current task events stored.
std::vector<rpc::TaskEvents> task_events_;

/// A iterator into task_events_ that determines which element to be overwritten.
size_t next_idx_to_overwrite_ = 0;

/// Index from task attempt to the index of the corresponding task event.
/// TODO(rickyx): Refactor this into LRI(least recently inserted) buffer:
/// https://github.com/ray-project/ray/issues/31158
/// Current task events stored.
std::vector<rpc::TaskEvents> task_events_;

/// Index from task attempt to the corresponding task attempt in the buffer
/// `task_events_`.
absl::flat_hash_map<TaskAttempt, size_t> task_attempt_index_;

/// Secondary index from task id to task attempts.
absl::flat_hash_map<TaskID, absl::flat_hash_set<TaskAttempt>>
task_to_task_attempt_index_;
/// Secondary index from job id to task attempts of the job.
absl::flat_hash_map<JobID, absl::flat_hash_set<TaskAttempt>>
job_to_task_attempt_index_;

/// Counter for tracking the size of task event. This assumes tasks events are never
/// removed actively.
uint64_t num_bytes_task_events_ = 0;
Expand Down
Loading

0 comments on commit 3c0619d

Please sign in to comment.