Skip to content

Commit

Permalink
Events drop count (ray-project#30953)
Browse files Browse the repository at this point in the history
Signed-off-by: tmynn <[email protected]>
  • Loading branch information
rickyyx authored and tamohannes committed Jan 25, 2023
1 parent 03a09df commit e00eb0d
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 70 deletions.
36 changes: 29 additions & 7 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,14 @@ void TaskEventBufferImpl::AddTaskEvent(rpc::TaskEvents task_events) {
auto limit = RayConfig::instance().task_events_max_num_task_events_in_buffer();
if (limit > 0 && buffer_.size() >= static_cast<size_t>(limit)) {
// Too many task events, start overriding older ones.
if (buffer_[next_idx_to_overwrite_].has_profile_events()) {
num_profile_task_events_dropped_++;
} else {
num_status_task_events_dropped_++;
}

buffer_[next_idx_to_overwrite_] = std::move(task_events);
next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % limit;
num_task_events_dropped_++;
return;
}
buffer_.push_back(std::move(task_events));
Expand All @@ -115,15 +120,17 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
return;
}
std::vector<rpc::TaskEvents> task_events;
size_t num_task_events_dropped = 0;
size_t num_status_task_events_dropped = 0;
size_t num_profile_task_events_dropped = 0;
{
absl::MutexLock lock(&mutex_);

RAY_LOG_EVERY_MS(INFO, 15000)
<< "Pushed task state events to GCS. [total_bytes="
<< (1.0 * total_events_bytes_) / 1024 / 1024
<< "MiB][total_count=" << total_num_events_
<< "][total_task_events_dropped=" << num_task_events_dropped_
<< "][total_status_task_events_dropped=" << num_status_task_events_dropped_
<< "][total_profile_task_events_dropped=" << num_profile_task_events_dropped_
<< "][cur_buffer_size=" << buffer_.size() << "].";

// Skip if GCS hasn't finished processing the previous message.
Expand All @@ -144,21 +151,35 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
RayConfig::instance().task_events_max_num_task_events_in_buffer());
buffer_.swap(task_events);
next_idx_to_overwrite_ = 0;
num_task_events_dropped = num_task_events_dropped_;
num_task_events_dropped_ = 0;

num_profile_task_events_dropped = num_profile_task_events_dropped_;
num_profile_task_events_dropped_ = 0;

num_status_task_events_dropped = num_status_task_events_dropped_;
num_status_task_events_dropped_ = 0;
}

// Merge multiple events from a single task attempt run into one task event.
absl::flat_hash_map<std::pair<std::string, int>, rpc::TaskEvents> task_events_map;

size_t num_profile_event_to_send = 0;
size_t num_status_event_to_send = 0;
for (auto event : task_events) {
if (event.has_profile_events()) {
num_profile_event_to_send++;
} else {
num_status_event_to_send++;
}
auto &task_events_itr =
task_events_map[std::make_pair(event.task_id(), event.attempt_number())];
task_events_itr.MergeFrom(event);
}

// Convert to rpc::TaskEventsData
auto data = std::make_unique<rpc::TaskEventData>();
data->set_num_task_events_dropped(num_task_events_dropped);
data->set_num_profile_task_events_dropped(num_profile_task_events_dropped);
data->set_num_status_task_events_dropped(num_status_task_events_dropped);

auto num_task_events = task_events_map.size();
for (auto itr : task_events_map) {
auto events_by_task = data->add_events_by_task();
Expand Down Expand Up @@ -198,7 +219,8 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
grpc_in_progress_ = false;

// Fail to send, currently dropping events.
num_task_events_dropped_ += num_task_events;
num_status_task_events_dropped_ += num_status_event_to_send;
num_profile_task_events_dropped_ += num_profile_event_to_send;
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,15 @@ class TaskEventBufferImpl : public TaskEventBuffer {
}

/// Test only functions.
size_t GetNumTaskEventsDropped() LOCKS_EXCLUDED(mutex_) {
size_t GetNumStatusTaskEventsDropped() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
return num_task_events_dropped_;
return num_status_task_events_dropped_;
}

/// Test only functions.
size_t GetNumProfileTaskEventsDropped() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
return num_profile_task_events_dropped_;
}

/// Test only functions.
Expand Down Expand Up @@ -164,8 +170,11 @@ class TaskEventBufferImpl : public TaskEventBuffer {
/// A iterator into buffer_ that determines which element to be overwritten.
size_t next_idx_to_overwrite_ GUARDED_BY(mutex_) = 0;

/// Number of task events dropped since the last report flush.
size_t num_task_events_dropped_ GUARDED_BY(mutex_) = 0;
/// Number of profile task events dropped since the last report flush.
size_t num_profile_task_events_dropped_ GUARDED_BY(mutex_) = 0;

/// Number of status task events dropped since the last report flush.
size_t num_status_task_events_dropped_ GUARDED_BY(mutex_) = 0;

/// True if there's a pending gRPC call. It's a simple way to prevent overloading
/// GCS with too many calls. There is no point sending more events if GCS could not
Expand Down
Loading

0 comments on commit e00eb0d

Please sign in to comment.