Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[observability][export-api] Write task events #47193

Merged
merged 19 commits into from
Sep 5, 2024
10 changes: 10 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,20 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60)
/// workers. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000)

/// Max number of task status events that will be stored to export
/// for the export API. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t,
task_events_max_num_export_status_events_buffer_on_worker,
1000 * 1000)

/// Max number of task events to be send in a single message to GCS. This caps both
/// the message size, and also the processing work on GCS.
RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000)

/// Max number of task events to be written in a single flush iteration. This
/// caps the number of file writes per iteration.
RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000)

/// Max number of profile events allowed to be tracked for a single task.
/// Setting the value to -1 allows unlimited profile events to be tracked.
RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000)
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
const std::vector<SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER};
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
options_.log_dir,
Expand Down
187 changes: 171 additions & 16 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/core_worker/task_event_buffer.h"

#include "ray/gcs/pb_util.h"
#include "ray/util/event.h"

namespace ray {
namespace core {
Expand Down Expand Up @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
}
}

void TaskStatusEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);

// Task info.
if (task_spec_) {
gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_);
}

// Task status update.
auto dst_state_update = rpc_task_export_event_data->mutable_state_updates();
gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update);

if (!state_update_.has_value()) {
return;
}

if (state_update_->node_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Node ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_node_id(state_update_->node_id_->Binary());
}

if (state_update_->worker_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Worker ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_worker_id(state_update_->worker_id_->Binary());
}

if (state_update_->error_info_.has_value()) {
auto error_info = dst_state_update->mutable_error_info();
error_info->set_error_message((*state_update_->error_info_).error_message());
error_info->set_error_type((*state_update_->error_info_).error_type());
}

if (state_update_->task_log_info_.has_value()) {
rpc::ExportTaskEventData::TaskLogInfo export_task_log_info;
gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(),
&export_task_log_info);
dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info);
}

if (state_update_->pid_.has_value()) {
dst_state_update->set_worker_pid(state_update_->pid_.value());
}

if (state_update_->is_debugger_paused_.has_value()) {
dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value());
}
}

void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
// Rate limit on the number of profiling events from the task. This is especially the
// case if a driver has many profiling events when submitting tasks
Expand All @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
event_entry->set_extra_data(std::move(extra_data_));
}

void TaskProfileEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
auto profile_events = rpc_task_export_event_data->mutable_profile_events();

// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);
profile_events->set_component_type(std::move(component_type_));
profile_events->set_component_id(std::move(component_id_));
profile_events->set_node_ip_address(std::move(node_ip_address_));
auto event_entry = profile_events->add_events();
event_entry->set_event_name(std::move(event_name_));
event_entry->set_start_time(start_time_);
event_entry->set_end_time(end_time_);
event_entry->set_extra_data(std::move(extra_data_));
}

TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr<gcs::GcsClient> gcs_client)
: work_guard_(boost::asio::make_work_guard(io_service_)),
periodical_runner_(io_service_),
Expand All @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); }

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
export_event_write_enabled_ = RayConfig::instance().enable_export_api_write();
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";

status_events_.set_capacity(
RayConfig::instance().task_events_max_num_status_events_buffer_on_worker());
status_events_for_export_.set_capacity(
RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker());

io_thread_ = std::thread([this]() {
#ifndef _WIN32
Expand Down Expand Up @@ -210,10 +288,27 @@ void TaskEventBufferImpl::Stop() {
bool TaskEventBufferImpl::Enabled() const { return enabled_; }

void TaskEventBufferImpl::GetTaskStatusEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> *status_events_to_write_for_export,
absl::flat_hash_set<TaskAttempt> *dropped_task_attempts_to_send) {
absl::MutexLock lock(&mutex_);

// Get the export events data to write.
if (export_event_write_enabled_) {
size_t num_to_write = std::min(
static_cast<size_t>(RayConfig::instance().export_task_events_write_batch_size()),
static_cast<size_t>(status_events_for_export_.size()));
status_events_to_write_for_export->insert(
status_events_to_write_for_export->end(),
std::make_move_iterator(status_events_for_export_.begin()),
std::make_move_iterator(status_events_for_export_.begin() + num_to_write));
status_events_for_export_.erase(status_events_for_export_.begin(),
status_events_for_export_.begin() + num_to_write);
stats_counter_.Decrement(
TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored,
status_events_to_write_for_export->size());
}

// No data to send.
if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) {
return;
Expand Down Expand Up @@ -252,7 +347,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend(
}

void TaskEventBufferImpl::GetTaskProfileEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *profile_events_to_send) {
std::vector<std::shared_ptr<TaskEvent>> *profile_events_to_send) {
absl::MutexLock lock(&profile_mutex_);

size_t batch_size =
Expand All @@ -278,13 +373,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend(
}

std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
std::vector<std::unique_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> &&profile_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send,
absl::flat_hash_set<TaskAttempt> &&dropped_task_attempts_to_send) {
// Aggregate the task events by TaskAttempt.
absl::flat_hash_map<TaskAttempt, rpc::TaskEvents> agg_task_events;
auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send](
std::unique_ptr<TaskEvent> &event) {
std::shared_ptr<TaskEvent> &event) {
if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) {
// We are marking this as data loss due to some missing task status updates.
// We will not send this event to GCS.
Expand Down Expand Up @@ -331,6 +426,45 @@ std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
return data;
}

void TaskEventBufferImpl::WriteExportData(
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_write_for_export,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send) {
absl::flat_hash_map<TaskAttempt, std::shared_ptr<rpc::ExportTaskEventData>>
agg_task_events;
// Maintain insertion order to agg_task_events so events are written
// in the same order as the buffer.
std::vector<TaskAttempt> agg_task_event_insertion_order;
auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order](
std::shared_ptr<TaskEvent> &event) {
// Aggregate events by task attempt before converting to proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

auto itr = agg_task_events.find(event->GetTaskAttempt());
if (itr == agg_task_events.end()) {
          auto inserted = agg_task_events.insert(
          {event->GetTaskAttempt(), std::make_shared<rpc::ExportTaskEventData>()});
      RAY_CHECK(inserted.second);
      agg_task_event_insertion_order.push_back(event->GetTaskAttempt());
}
...

auto itr = agg_task_events.find(event->GetTaskAttempt());
if (itr == agg_task_events.end()) {
// Insert event into agg_task_events if the task attempt of that
// event wasn't already added.
auto event_for_attempt = std::make_shared<rpc::ExportTaskEventData>();
auto inserted =
agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt});
RAY_CHECK(inserted.second);
agg_task_event_insertion_order.push_back(event->GetTaskAttempt());
event->ToRpcTaskExportEvents(event_for_attempt);
} else {
event->ToRpcTaskExportEvents(itr->second);
}
};

std::for_each(status_events_to_write_for_export.begin(),
status_events_to_write_for_export.end(),
to_rpc_event_fn);
std::for_each(
profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn);

for (const auto &task_attempt : agg_task_event_insertion_order) {
auto it = agg_task_events.find(task_attempt);
RAY_CHECK(it != agg_task_events.end());
RayExportEvent(it->second).SendEvent();
}
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
Expand All @@ -350,13 +484,16 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
}

// Take out status events from the buffer.
std::vector<std::unique_ptr<TaskEvent>> status_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> status_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> status_events_to_write_for_export;
absl::flat_hash_set<TaskAttempt> dropped_task_attempts_to_send;
status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send);
GetTaskStatusEventsToSend(&status_events_to_send,
&status_events_to_write_for_export,
&dropped_task_attempts_to_send);

// Take profile events from the status events.
std::vector<std::unique_ptr<TaskEvent>> profile_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> profile_events_to_send;
profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskProfileEventsToSend(&profile_events_to_send);

Expand All @@ -365,6 +502,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
CreateDataToSend(std::move(status_events_to_send),
std::move(profile_events_to_send),
std::move(dropped_task_attempts_to_send));
if (export_event_write_enabled_) {
WriteExportData(std::move(status_events_to_write_for_export),
std::move(profile_events_to_send));
}

gcs::TaskInfoAccessor *task_accessor;
{
Expand Down Expand Up @@ -438,8 +579,19 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
if (!enabled_) {
return;
}
std::shared_ptr<TaskEvent> status_event_shared_ptr = std::move(status_event);

if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) {
if (export_event_write_enabled_) {
// If status_events_for_export_ is full, the oldest event will be
// dropped in the circular buffer and replaced with the current event.
if (!status_events_for_export_.full()) {
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored);
}
status_events_for_export_.push_back(status_event_shared_ptr);
}
if (dropped_task_attempts_unreported_.count(
status_event_shared_ptr->GetTaskAttempt())) {
// This task attempt has been dropped before, so we drop this event.
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush);
Expand All @@ -454,7 +606,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e

RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping task status events for task: "
<< status_event->GetTaskAttempt().first
<< status_event_shared_ptr->GetTaskAttempt().first
<< ", set a higher value for "
"RAY_task_events_max_num_status_events_buffer_on_worker("
<< RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()
Expand All @@ -466,18 +618,20 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
} else {
stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored);
}
status_events_.push_back(std::move(status_event));
status_events_.push_back(status_event_shared_ptr);
}

void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile_event) {
absl::MutexLock lock(&profile_mutex_);
if (!enabled_) {
return;
}
auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt());
std::shared_ptr<TaskEvent> profile_event_shared_ptr = std::move(profile_event);
auto profile_events_itr =
profile_events_.find(profile_event_shared_ptr->GetTaskAttempt());
if (profile_events_itr == profile_events_.end()) {
auto inserted = profile_events_.insert(
{profile_event->GetTaskAttempt(), std::vector<std::unique_ptr<TaskEvent>>()});
auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(),
std::vector<std::shared_ptr<TaskEvent>>()});
RAY_CHECK(inserted.second);
profile_events_itr = inserted.first;
}
Expand All @@ -501,7 +655,8 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile
// driver task id and it could generate large number of profile events when submitting
// many tasks.
RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first
<< "Dropping profiling events for task: "
<< profile_event_shared_ptr->GetTaskAttempt().first
<< ", set a higher value for RAY_task_events_max_num_profile_events_per_task("
<< max_num_profile_event_per_task
<< "), or RAY_task_events_max_num_profile_events_buffer_on_worker ("
Expand All @@ -510,7 +665,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile
}

stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored);
profile_events_itr->second.push_back(std::move(profile_event));
profile_events_itr->second.push_back(profile_event_shared_ptr);
}

const std::string TaskEventBufferImpl::DebugString() {
Expand Down
Loading