Skip to content

Commit

Permalink
[core][state] Task backend - Profile events capping (ray-project#33321)
Browse files Browse the repository at this point in the history
This PR restricts the number of profile events to be sent and aggregates task events from the same task attempt on the worker side to reduce the data sent to GCS.
This PR also refactos the metrics tracking to reduce lock contention on the core worker.

Signed-off-by: Jack He <[email protected]>
  • Loading branch information
rickyyx authored and ProjectsByJackHe committed May 4, 2023
1 parent 27daa12 commit d0aa1af
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 148 deletions.
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000)
/// report gRPC call. A task could have more profile events in GCS from multiple
/// report gRPC call.
/// Setting the value to -1 allows unlimited profile events to be sent.
RAY_CONFIG(int64_t, task_events_max_num_profile_events_for_task, 100)
RAY_CONFIG(int64_t, task_events_max_num_profile_events_for_task, 1000)

/// The delay in ms that GCS should mark any running tasks from a job as failed.
/// Setting this value too smaller might result in some finished tasks marked as failed by
Expand Down
170 changes: 110 additions & 60 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TaskProfileEvent::TaskProfileEvent(TaskID task_id,
event_name_(event_name),
start_time_(start_time) {}

void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
bool TaskStatusEvent::ToRpcTaskEventsOrDrop(rpc::TaskEvents *rpc_task_events) {
// Base fields
rpc_task_events->set_task_id(task_id_.Binary());
rpc_task_events->set_job_id(job_id_.Binary());
Expand All @@ -69,7 +69,7 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
gcs::FillTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update);

if (!state_update_.has_value()) {
return;
return false;
}

if (state_update_->node_id_.has_value()) {
Expand All @@ -89,16 +89,34 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
if (state_update_->error_info_.has_value()) {
dst_state_update->set_error_type(state_update_->error_info_->error_type());
}
return false;
}

void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
bool TaskProfileEvent::ToRpcTaskEventsOrDrop(rpc::TaskEvents *rpc_task_events) {
// Rate limit on the number of profiling events from the task. This is especially the
// case if a driver has many profiling events when submitting tasks
auto profile_events = rpc_task_events->mutable_profile_events();
auto profile_event_max_num =
RayConfig::instance().task_events_max_num_profile_events_for_task();
if (profile_event_max_num >= 0 &&
profile_events->events_size() >= profile_event_max_num) {
// Data loss. We are dropping the newly reported profile event.
// This will likely happen on a driver task since the driver has a fixed placeholder
// driver task id and it could generate large number of profile events when submitting
// many tasks.
// We are only dropping this TaskEvent (which corresponds to a single profile event),
// rather than others. We will likely change the GC logic in the future as well.
RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping profiling events for task: " << task_id_
<< ", set a higher value for RAY_task_events_max_num_profile_events_for_task("
<< profile_event_max_num << ").";
return true;
}

// Base fields
rpc_task_events->set_task_id(task_id_.Binary());
rpc_task_events->set_job_id(job_id_.Binary());
rpc_task_events->set_attempt_number(attempt_number_);

// Profile data
auto profile_events = rpc_task_events->mutable_profile_events();
profile_events->set_component_type(std::move(component_type_));
profile_events->set_component_id(std::move(component_id_));
profile_events->set_node_ip_address(std::move(node_ip_address_));
Expand All @@ -107,6 +125,7 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
event_entry->set_start_time(start_time_);
event_entry->set_end_time(end_time_);
event_entry->set_extra_data(std::move(extra_data_));
return false;
}

TaskEventBufferImpl::TaskEventBufferImpl(std::unique_ptr<gcs::GcsClient> gcs_client)
Expand Down Expand Up @@ -187,25 +206,38 @@ void TaskEventBufferImpl::AddTaskEvent(std::unique_ptr<TaskEvent> task_event) {
if (!enabled_) {
return;
}
absl::MutexLock lock(&mutex_);
size_t num_profile_events_dropped = 0;
size_t num_status_events_dropped = 0;
size_t num_add = 0;

if (buffer_.full()) {
const auto &to_evict = buffer_.front();
if (to_evict->IsProfileEvent()) {
num_profile_task_events_dropped_++;
} else {
num_status_task_events_dropped_++;
absl::MutexLock lock(&mutex_);
size_t prev_size = buffer_.size();
{
if (buffer_.full()) {
const auto &to_evict = buffer_.front();
if (to_evict->IsProfileEvent()) {
num_profile_events_dropped++;
} else {
num_status_events_dropped++;
}
}
buffer_.push_back(std::move(task_event));
num_add = buffer_.size() - prev_size;
}
buffer_.push_back(std::move(task_event));

stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskProfileEventDroppedSinceLastFlush,
num_profile_events_dropped);
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush,
num_status_events_dropped);
stats_counter_.Increment(TaskEventBufferCounter::kNumTaskEventsStored, num_add);
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
}
size_t num_status_task_events_dropped = 0;
size_t num_profile_task_events_dropped = 0;
std::vector<std::unique_ptr<TaskEvent>> to_send;
to_send.reserve(RayConfig::instance().task_events_send_batch_size());

Expand Down Expand Up @@ -234,48 +266,74 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
std::make_move_iterator(buffer_.begin()),
std::make_move_iterator(buffer_.begin() + num_to_send));
buffer_.erase(buffer_.begin(), buffer_.begin() + num_to_send);
}

// Aggregate
absl::flat_hash_map<TaskAttempt, rpc::TaskEvents> agg_task_events;
auto to_rpc_event_fn = [this, &agg_task_events](std::unique_ptr<TaskEvent> &event) {
if (!agg_task_events.count(event->GetTaskAttempt())) {
auto inserted =
agg_task_events.insert({event->GetTaskAttempt(), rpc::TaskEvents()});
RAY_CHECK(inserted.second);
}

// Send and reset the counters
num_profile_task_events_dropped = num_profile_task_events_dropped_;
num_profile_task_events_dropped_ = 0;
auto itr = agg_task_events.find(event->GetTaskAttempt());

num_status_task_events_dropped = num_status_task_events_dropped_;
num_status_task_events_dropped_ = 0;
}
if (event->ToRpcTaskEventsOrDrop(&(itr->second))) {
RAY_CHECK(event->IsProfileEvent());
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskProfileEventDroppedSinceLastFlush);
}
};
std::for_each(to_send.begin(), to_send.end(), to_rpc_event_fn);

// Convert to rpc::TaskEventsData
auto data = std::make_unique<rpc::TaskEventData>();
data->set_num_profile_task_events_dropped(num_profile_task_events_dropped);
data->set_num_status_task_events_dropped(num_status_task_events_dropped);

size_t num_task_events = to_send.size();
size_t num_profile_event_to_send = 0;
size_t num_status_event_to_send = 0;
for (auto &task_event : to_send) {
for (auto &[_task_attempt, task_event] : agg_task_events) {
auto events_by_task = data->add_events_by_task();
if (task_event->IsProfileEvent()) {
if (task_event.has_profile_events()) {
num_profile_event_to_send++;
} else {
}
if (task_event.has_state_updates()) {
num_status_event_to_send++;
}
task_event->ToRpcTaskEvents(events_by_task);
*events_by_task = std::move(task_event);
}
size_t data_size = data->ByteSizeLong();

// Send and reset the counters
stats_counter_.Decrement(TaskEventBufferCounter::kNumTaskEventsStored, to_send.size());
size_t num_profile_task_events_dropped = stats_counter_.Get(
TaskEventBufferCounter::kNumTaskProfileEventDroppedSinceLastFlush);
stats_counter_.Decrement(
TaskEventBufferCounter::kNumTaskProfileEventDroppedSinceLastFlush,
num_profile_task_events_dropped);
stats_counter_.Increment(TaskEventBufferCounter::kTotalNumTaskProfileEventDropped,
num_profile_task_events_dropped);

size_t num_status_task_events_dropped = stats_counter_.Get(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush);
stats_counter_.Decrement(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush,
num_status_task_events_dropped);
stats_counter_.Increment(TaskEventBufferCounter::kTotalNumTaskStatusEventDropped,
num_status_task_events_dropped);

data->set_num_profile_task_events_dropped(num_profile_task_events_dropped);
data->set_num_status_task_events_dropped(num_status_task_events_dropped);

gcs::TaskInfoAccessor *task_accessor;
{
// Sending the protobuf to GCS.
absl::MutexLock lock(&mutex_);
// Some debug tracking.
total_num_events_ += num_task_events;
total_events_bytes_ += data_size;
// The flag should be unset when on_complete is invoked.
grpc_in_progress_ = true;
task_accessor = &gcs_client_->Tasks();
}

grpc_in_progress_ = true;
auto on_complete = [this, num_task_events](const Status &status) {
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to push " << num_task_events
<< " task state events to GCS. Data will be lost. [status="
Expand All @@ -286,7 +344,6 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {

auto status = task_accessor->AsyncAddTaskEventData(std::move(data), on_complete);
{
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
// If we couldn't even send the data by invoking client side callbacks, there's
// something seriously wrong, and losing data in this case should not be too
Expand All @@ -297,8 +354,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
grpc_in_progress_ = false;

// Fail to send, currently dropping events.
num_status_task_events_dropped_ += num_status_event_to_send;
num_profile_task_events_dropped_ += num_profile_event_to_send;
stats_counter_.Increment(TaskEventBufferCounter::kTotalNumTaskProfileEventDropped,
num_profile_event_to_send);
stats_counter_.Increment(TaskEventBufferCounter::kTotalNumTaskStatusEventDropped,
num_status_event_to_send);
}
}
}
Expand All @@ -311,31 +370,22 @@ const std::string TaskEventBufferImpl::DebugString() {
return ss.str();
}

bool grpc_in_progress;
size_t num_status_task_events_dropped, num_profile_task_events_dropped,
data_buffer_size;
uint64_t total_events_bytes, total_num_events;

{
absl::MutexLock lock(&mutex_);
grpc_in_progress = grpc_in_progress_;
num_status_task_events_dropped = num_status_task_events_dropped_;
num_profile_task_events_dropped = num_profile_task_events_dropped_;
total_events_bytes = total_events_bytes_;
total_num_events = total_num_events_;
data_buffer_size = buffer_.size();
}

auto stats = stats_counter_.GetAll();
ss << "\nIO Service Stats:\n";
ss << io_service_.stats().StatsString();
ss << "\nOther Stats:"
<< "\n\tgrpc_in_progress:" << grpc_in_progress
<< "\n\tcurrent number of task events in buffer: " << data_buffer_size
<< "\n\ttotal task events sent: " << 1.0 * total_events_bytes / 1024 / 1024 << " MiB"
<< "\n\ttotal number of task events sent: " << total_num_events
<< "\n\tnum status task events dropped: " << num_status_task_events_dropped
<< "\n\tnum profile task events dropped: " << num_profile_task_events_dropped
<< "\n";
<< "\n\tgrpc_in_progress:" << grpc_in_progress_
<< "\n\tcurrent number of task events in buffer: "
<< stats[TaskEventBufferCounter::kNumTaskEventsStored]
<< "\n\ttotal task events sent: "
<< 1.0 * stats[TaskEventBufferCounter::kTotalTaskEventsBytesReported] / 1024 / 1024
<< " MiB"
<< "\n\ttotal number of task events sent: "
<< stats[TaskEventBufferCounter::kTotalTaskEventsReported]
<< "\n\tnum status task events dropped: "
<< stats[TaskEventBufferCounter::kTotalNumTaskProfileEventDropped]
<< "\n\tnum profile task events dropped: "
<< stats[TaskEventBufferCounter::kTotalNumTaskStatusEventDropped] << "\n";

return ss.str();
}
Expand Down
Loading

0 comments on commit d0aa1af

Please sign in to comment.