Skip to content

Commit

Permalink
[observability][export-api] Write task events (ray-project#47538)
Browse files Browse the repository at this point in the history
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525
- Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged).

Signed-off-by: Nikita Vemuri <[email protected]>
Co-authored-by: Nikita Vemuri <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
2 people authored and ujjawal-khare committed Oct 15, 2024
1 parent 6b6853b commit 593decd
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 17 deletions.
10 changes: 10 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,20 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60)
/// workers. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000)

/// Max number of task status events that will be stored to export
/// for the export API. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t,
task_events_max_num_export_status_events_buffer_on_worker,
1000 * 1000)

/// Max number of task events to be send in a single message to GCS. This caps both
/// the message size, and also the processing work on GCS.
RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000)

/// Max number of task events to be written in a single flush iteration. This
/// caps the number of file writes per iteration.
RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000)

/// Max number of profile events allowed to be tracked for a single task.
/// Setting the value to -1 allows unlimited profile events to be tracked.
RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000)
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
const std::vector<SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER};
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
options_.log_dir,
Expand Down
187 changes: 171 additions & 16 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/core_worker/task_event_buffer.h"

#include "ray/gcs/pb_util.h"
#include "ray/util/event.h"

namespace ray {
namespace core {
Expand Down Expand Up @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
}
}

void TaskStatusEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);

// Task info.
if (task_spec_) {
gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_);
}

// Task status update.
auto dst_state_update = rpc_task_export_event_data->mutable_state_updates();
gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update);

if (!state_update_.has_value()) {
return;
}

if (state_update_->node_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Node ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_node_id(state_update_->node_id_->Binary());
}

if (state_update_->worker_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Worker ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_worker_id(state_update_->worker_id_->Binary());
}

if (state_update_->error_info_.has_value()) {
auto error_info = dst_state_update->mutable_error_info();
error_info->set_error_message((*state_update_->error_info_).error_message());
error_info->set_error_type((*state_update_->error_info_).error_type());
}

if (state_update_->task_log_info_.has_value()) {
rpc::ExportTaskEventData::TaskLogInfo export_task_log_info;
gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(),
&export_task_log_info);
dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info);
}

if (state_update_->pid_.has_value()) {
dst_state_update->set_worker_pid(state_update_->pid_.value());
}

if (state_update_->is_debugger_paused_.has_value()) {
dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value());
}
}

void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
// Rate limit on the number of profiling events from the task. This is especially the
// case if a driver has many profiling events when submitting tasks
Expand All @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
event_entry->set_extra_data(std::move(extra_data_));
}

void TaskProfileEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
auto profile_events = rpc_task_export_event_data->mutable_profile_events();

// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);
profile_events->set_component_type(std::move(component_type_));
profile_events->set_component_id(std::move(component_id_));
profile_events->set_node_ip_address(std::move(node_ip_address_));
auto event_entry = profile_events->add_events();
event_entry->set_event_name(std::move(event_name_));
event_entry->set_start_time(start_time_);
event_entry->set_end_time(end_time_);
event_entry->set_extra_data(std::move(extra_data_));
}

TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr<gcs::GcsClient> gcs_client)
: work_guard_(boost::asio::make_work_guard(io_service_)),
periodical_runner_(io_service_),
Expand All @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); }

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
export_event_write_enabled_ = RayConfig::instance().enable_export_api_write();
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";

status_events_.set_capacity(
RayConfig::instance().task_events_max_num_status_events_buffer_on_worker());
status_events_for_export_.set_capacity(
RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker());

io_thread_ = std::thread([this]() {
#ifndef _WIN32
Expand Down Expand Up @@ -210,10 +288,27 @@ void TaskEventBufferImpl::Stop() {
bool TaskEventBufferImpl::Enabled() const { return enabled_; }

void TaskEventBufferImpl::GetTaskStatusEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> *status_events_to_write_for_export,
absl::flat_hash_set<TaskAttempt> *dropped_task_attempts_to_send) {
absl::MutexLock lock(&mutex_);

// Get the export events data to write.
if (export_event_write_enabled_) {
size_t num_to_write = std::min(
static_cast<size_t>(RayConfig::instance().export_task_events_write_batch_size()),
static_cast<size_t>(status_events_for_export_.size()));
status_events_to_write_for_export->insert(
status_events_to_write_for_export->end(),
std::make_move_iterator(status_events_for_export_.begin()),
std::make_move_iterator(status_events_for_export_.begin() + num_to_write));
status_events_for_export_.erase(status_events_for_export_.begin(),
status_events_for_export_.begin() + num_to_write);
stats_counter_.Decrement(
TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored,
status_events_to_write_for_export->size());
}

// No data to send.
if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) {
return;
Expand Down Expand Up @@ -252,7 +347,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend(
}

void TaskEventBufferImpl::GetTaskProfileEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *profile_events_to_send) {
std::vector<std::shared_ptr<TaskEvent>> *profile_events_to_send) {
absl::MutexLock lock(&profile_mutex_);

size_t batch_size =
Expand All @@ -278,13 +373,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend(
}

std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
std::vector<std::unique_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> &&profile_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send,
absl::flat_hash_set<TaskAttempt> &&dropped_task_attempts_to_send) {
// Aggregate the task events by TaskAttempt.
absl::flat_hash_map<TaskAttempt, rpc::TaskEvents> agg_task_events;
auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send](
std::unique_ptr<TaskEvent> &event) {
std::shared_ptr<TaskEvent> &event) {
if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) {
// We are marking this as data loss due to some missing task status updates.
// We will not send this event to GCS.
Expand Down Expand Up @@ -331,6 +426,45 @@ std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
return data;
}

void TaskEventBufferImpl::WriteExportData(
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_write_for_export,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send) {
absl::flat_hash_map<TaskAttempt, std::shared_ptr<rpc::ExportTaskEventData>>
agg_task_events;
// Maintain insertion order to agg_task_events so events are written
// in the same order as the buffer.
std::vector<TaskAttempt> agg_task_event_insertion_order;
auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order](
std::shared_ptr<TaskEvent> &event) {
// Aggregate events by task attempt before converting to proto
auto itr = agg_task_events.find(event->GetTaskAttempt());
if (itr == agg_task_events.end()) {
// Insert event into agg_task_events if the task attempt of that
// event wasn't already added.
auto event_for_attempt = std::make_shared<rpc::ExportTaskEventData>();
auto inserted =
agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt});
RAY_CHECK(inserted.second);
agg_task_event_insertion_order.push_back(event->GetTaskAttempt());
event->ToRpcTaskExportEvents(event_for_attempt);
} else {
event->ToRpcTaskExportEvents(itr->second);
}
};

std::for_each(status_events_to_write_for_export.begin(),
status_events_to_write_for_export.end(),
to_rpc_event_fn);
std::for_each(
profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn);

for (const auto &task_attempt : agg_task_event_insertion_order) {
auto it = agg_task_events.find(task_attempt);
RAY_CHECK(it != agg_task_events.end());
RayExportEvent(it->second).SendEvent();
}
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
Expand All @@ -350,13 +484,16 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
}

// Take out status events from the buffer.
std::vector<std::unique_ptr<TaskEvent>> status_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> status_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> status_events_to_write_for_export;
absl::flat_hash_set<TaskAttempt> dropped_task_attempts_to_send;
status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send);
GetTaskStatusEventsToSend(&status_events_to_send,
&status_events_to_write_for_export,
&dropped_task_attempts_to_send);

// Take profile events from the status events.
std::vector<std::unique_ptr<TaskEvent>> profile_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> profile_events_to_send;
profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskProfileEventsToSend(&profile_events_to_send);

Expand All @@ -365,6 +502,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
CreateDataToSend(std::move(status_events_to_send),
std::move(profile_events_to_send),
std::move(dropped_task_attempts_to_send));
if (export_event_write_enabled_) {
WriteExportData(std::move(status_events_to_write_for_export),
std::move(profile_events_to_send));
}

gcs::TaskInfoAccessor *task_accessor;
{
Expand Down Expand Up @@ -438,8 +579,19 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
if (!enabled_) {
return;
}
std::shared_ptr<TaskEvent> status_event_shared_ptr = std::move(status_event);

if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) {
if (export_event_write_enabled_) {
// If status_events_for_export_ is full, the oldest event will be
// dropped in the circular buffer and replaced with the current event.
if (!status_events_for_export_.full()) {
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored);
}
status_events_for_export_.push_back(status_event_shared_ptr);
}
if (dropped_task_attempts_unreported_.count(
status_event_shared_ptr->GetTaskAttempt())) {
// This task attempt has been dropped before, so we drop this event.
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush);
Expand All @@ -454,7 +606,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e

RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping task status events for task: "
<< status_event->GetTaskAttempt().first
<< status_event_shared_ptr->GetTaskAttempt().first
<< ", set a higher value for "
"RAY_task_events_max_num_status_events_buffer_on_worker("
<< RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()
Expand All @@ -466,18 +618,20 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
} else {
stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored);
}
status_events_.push_back(std::move(status_event));
status_events_.push_back(status_event_shared_ptr);
}

void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile_event) {
absl::MutexLock lock(&profile_mutex_);
if (!enabled_) {
return;
}
auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt());
std::shared_ptr<TaskEvent> profile_event_shared_ptr = std::move(profile_event);
auto profile_events_itr =
profile_events_.find(profile_event_shared_ptr->GetTaskAttempt());
if (profile_events_itr == profile_events_.end()) {
auto inserted = profile_events_.insert(
{profile_event->GetTaskAttempt(), std::vector<std::unique_ptr<TaskEvent>>()});
auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(),
std::vector<std::shared_ptr<TaskEvent>>()});
RAY_CHECK(inserted.second);
profile_events_itr = inserted.first;
}
Expand All @@ -501,7 +655,8 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile
// driver task id and it could generate large number of profile events when submitting
// many tasks.
RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first
<< "Dropping profiling events for task: "
<< profile_event_shared_ptr->GetTaskAttempt().first
<< ", set a higher value for RAY_task_events_max_num_profile_events_per_task("
<< max_num_profile_event_per_task
<< "), or RAY_task_events_max_num_profile_events_buffer_on_worker ("
Expand All @@ -510,7 +665,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile
}

stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored);
profile_events_itr->second.push_back(std::move(profile_event));
profile_events_itr->second.push_back(profile_event_shared_ptr);
}

const std::string TaskEventBufferImpl::DebugString() {
Expand Down
Loading

0 comments on commit 593decd

Please sign in to comment.