Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[observability][export-api] Write task events #47193

Merged
merged 19 commits into from
Sep 5, 2024
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60)
/// workers. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000)

/// Max number of full task status events that were dropped from being sent
/// to GCS that are stored on workers. These events will be written to file
/// as part of the export API. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t,
task_events_max_num_dropped_status_events_buffer_on_worker,
1000 * 1000)

/// Max number of task events to be send in a single message to GCS. This caps both
/// the message size, and also the processing work on GCS.
RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000)
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
const std::vector<SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER};
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
options_.log_dir,
Expand Down
169 changes: 167 additions & 2 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/core_worker/task_event_buffer.h"

#include "ray/gcs/pb_util.h"
#include "ray/util/event.h"

namespace ray {
namespace core {
Expand Down Expand Up @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
}
}

void TaskStatusEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);

// Task info.
if (task_spec_) {
gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_);
}

// Task status update.
auto dst_state_update = rpc_task_export_event_data->mutable_state_updates();
gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update);

if (!state_update_.has_value()) {
return;
}

if (state_update_->node_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Node ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_node_id(state_update_->node_id_->Binary());
}

if (state_update_->worker_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Worker ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_worker_id(state_update_->worker_id_->Binary());
}

if (state_update_->error_info_.has_value()) {
auto error_info = dst_state_update->mutable_error_info();
error_info->set_error_message((*state_update_->error_info_).error_message());
error_info->set_error_type((*state_update_->error_info_).error_type());
}

if (state_update_->task_log_info_.has_value()) {
rpc::ExportTaskEventData::TaskLogInfo export_task_log_info;
gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(),
&export_task_log_info);
dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info);
}

if (state_update_->pid_.has_value()) {
dst_state_update->set_worker_pid(state_update_->pid_.value());
}

if (state_update_->is_debugger_paused_.has_value()) {
dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value());
}
}

void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
// Rate limit on the number of profiling events from the task. This is especially the
// case if a driver has many profiling events when submitting tasks
Expand All @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
event_entry->set_extra_data(std::move(extra_data_));
}

void TaskProfileEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
auto profile_events = rpc_task_export_event_data->mutable_profile_events();

// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);
profile_events->set_component_type(std::move(component_type_));
profile_events->set_component_id(std::move(component_id_));
profile_events->set_node_ip_address(std::move(node_ip_address_));
auto event_entry = profile_events->add_events();
event_entry->set_event_name(std::move(event_name_));
event_entry->set_start_time(start_time_);
event_entry->set_end_time(end_time_);
event_entry->set_extra_data(std::move(extra_data_));
}

TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr<gcs::GcsClient> gcs_client)
: work_guard_(boost::asio::make_work_guard(io_service_)),
periodical_runner_(io_service_),
Expand All @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); }

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
export_event_write_enabled_ = RayConfig::instance().enable_export_api_write();
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";

status_events_.set_capacity(
RayConfig::instance().task_events_max_num_status_events_buffer_on_worker());
dropped_status_events_for_export_.set_capacity(
RayConfig::instance().task_events_max_num_dropped_status_events_buffer_on_worker());

io_thread_ = std::thread([this]() {
#ifndef _WIN32
Expand Down Expand Up @@ -211,6 +289,7 @@ bool TaskEventBufferImpl::Enabled() const { return enabled_; }

void TaskEventBufferImpl::GetTaskStatusEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> *dropped_status_events_to_write,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of maintaining dropped tasks, can we just maintain a new vector just for export events? And we can just make it 10X original vector. I think maintaining another dropped tasks that are not dropped is not intuitive + it can easily screw up ordering depending on task drop policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I needed to make the buffers for these events contain a shared_ptr rather than unique_ptr though so the same object can be moved to multiple buffers.

Copy link
Contributor

@rkooo567 rkooo567 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can you just run microbenchmark to see if there's any noticeable regression (I think it shouldn't affect but just in case)? lmk if you don't know how to run them!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absl::flat_hash_set<TaskAttempt> *dropped_task_attempts_to_send) {
absl::MutexLock lock(&mutex_);

Expand Down Expand Up @@ -245,6 +324,24 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend(
std::make_move_iterator(status_events_.begin() + num_to_send));
status_events_.erase(status_events_.begin(), status_events_.begin() + num_to_send);

// Get the dropped events data to write.
if (export_event_write_enabled_) {
size_t num_to_write =
std::min(static_cast<size_t>(RayConfig::instance().task_events_send_batch_size()),
static_cast<size_t>(dropped_status_events_for_export_.size()));
dropped_status_events_to_write->insert(
dropped_status_events_to_write->end(),
std::make_move_iterator(dropped_status_events_for_export_.begin()),
std::make_move_iterator(dropped_status_events_for_export_.begin() +
num_to_write));
dropped_status_events_for_export_.erase(
dropped_status_events_for_export_.begin(),
dropped_status_events_for_export_.begin() + num_to_write);
stats_counter_.Decrement(
TaskEventBufferCounter::kNumDroppedTaskStatusEventsForExportAPIStored,
dropped_status_events_to_write->size());
}

stats_counter_.Decrement(TaskEventBufferCounter::kNumTaskStatusEventsStored,
status_events_to_send->size());
stats_counter_.Decrement(TaskEventBufferCounter::kNumDroppedTaskAttemptsStored,
Expand Down Expand Up @@ -331,6 +428,48 @@ std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
return data;
}

void TaskEventBufferImpl::WriteExportData(
std::vector<std::unique_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> &&dropped_status_events_to_write,
std::vector<std::unique_ptr<TaskEvent>> &&profile_events_to_send) {
absl::flat_hash_map<TaskAttempt, std::shared_ptr<rpc::ExportTaskEventData>>
agg_task_events;
auto to_rpc_event_fn = [&agg_task_events](std::unique_ptr<TaskEvent> &event) {
// Aggregate events by task attempt before converting to proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

auto itr = agg_task_events.find(event->GetTaskAttempt());
if (itr == agg_task_events.end()) {
          auto inserted = agg_task_events.insert(
          {event->GetTaskAttempt(), std::make_shared<rpc::ExportTaskEventData>()});
      RAY_CHECK(inserted.second);
      agg_task_event_insertion_order.push_back(event->GetTaskAttempt());
}
...

if (!agg_task_events.count(event->GetTaskAttempt())) {
auto inserted = agg_task_events.insert(
{event->GetTaskAttempt(), std::make_shared<rpc::ExportTaskEventData>()});
RAY_CHECK(inserted.second);
}
auto itr = agg_task_events.find(event->GetTaskAttempt());

event->ToRpcTaskExportEvents(itr->second);
};

// Combine status_events_to_send and dropped_status_events_to_write so
// the aggregation logic in to_rpc_event_fn is combined across both.
std::vector<std::unique_ptr<TaskEvent>> all_status_events_to_send;
all_status_events_to_send.reserve(status_events_to_send.size() +
dropped_status_events_to_write.size());
all_status_events_to_send.insert(all_status_events_to_send.end(),
std::make_move_iterator(status_events_to_send.begin()),
std::make_move_iterator(status_events_to_send.end()));
all_status_events_to_send.insert(
all_status_events_to_send.end(),
std::make_move_iterator(dropped_status_events_to_write.begin()),
std::make_move_iterator(dropped_status_events_to_write.end()));

std::for_each(all_status_events_to_send.begin(),
all_status_events_to_send.end(),
to_rpc_event_fn);
std::for_each(
profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn);

for (auto &[_task_attempt, task_event_ptr] : agg_task_events) {
RayExportEvent(task_event_ptr).SendEvent();
};
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
Expand All @@ -351,9 +490,12 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {

// Take out status events from the buffer.
std::vector<std::unique_ptr<TaskEvent>> status_events_to_send;
std::vector<std::unique_ptr<TaskEvent>> dropped_status_events_to_write;
absl::flat_hash_set<TaskAttempt> dropped_task_attempts_to_send;
status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send);
GetTaskStatusEventsToSend(&status_events_to_send,
&dropped_status_events_to_write,
&dropped_task_attempts_to_send);

// Take profile events from the status events.
std::vector<std::unique_ptr<TaskEvent>> profile_events_to_send;
Expand All @@ -365,6 +507,11 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
CreateDataToSend(std::move(status_events_to_send),
std::move(profile_events_to_send),
std::move(dropped_task_attempts_to_send));
if (export_event_write_enabled_) {
WriteExportData(std::move(status_events_to_send),
std::move(dropped_status_events_to_write),
std::move(profile_events_to_send));
}

gcs::TaskInfoAccessor *task_accessor;
{
Expand Down Expand Up @@ -443,11 +590,20 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
// This task attempt has been dropped before, so we drop this event.
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush);
if (export_event_write_enabled_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why don't we just move it to line 577 so that it always happens?

// If dropped_status_events_for_export_ is full, the oldest event will be
// dropped in the circular buffer and replaced with the current event.
if (!dropped_status_events_for_export_.full()) {
stats_counter_.Increment(
TaskEventBufferCounter::kNumDroppedTaskStatusEventsForExportAPIStored);
}
dropped_status_events_for_export_.push_back(std::move(status_event));
}
return;
}

if (status_events_.full()) {
const auto &to_evict = status_events_.front();
auto &to_evict = status_events_.front();
auto inserted = dropped_task_attempts_unreported_.insert(to_evict->GetTaskAttempt());
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush);
Expand All @@ -463,6 +619,15 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
if (inserted.second) {
stats_counter_.Increment(TaskEventBufferCounter::kNumDroppedTaskAttemptsStored);
}
if (export_event_write_enabled_) {
// If dropped_status_events_for_export_ is full, the oldest event will be
// dropped in the circular buffer and replaced with the current event.
if (!dropped_status_events_for_export_.full()) {
stats_counter_.Increment(
TaskEventBufferCounter::kNumDroppedTaskStatusEventsForExportAPIStored);
}
dropped_status_events_for_export_.push_back(std::move(to_evict));
}
} else {
stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored);
}
Expand Down
40 changes: 40 additions & 0 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/util/counter_map.h"
#include "src/ray/protobuf/export_api/export_task_event.pb.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
Expand Down Expand Up @@ -57,6 +58,14 @@ class TaskEvent {
/// \param[out] rpc_task_events The rpc task event to be filled.
virtual void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) = 0;

/// Convert itself a rpc::ExportTaskEventData
///
/// NOTE: this method will modify internal states by moving fields to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the comment and make sure data is not modified inside this function

/// rpc::ExportTaskEventData.
/// \param[out] rpc_task_export_event_data The rpc export task event data to be filled.
virtual void ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) = 0;

/// If it is a profile event.
virtual bool IsProfileEvent() const = 0;

Expand Down Expand Up @@ -126,6 +135,9 @@ class TaskStatusEvent : public TaskEvent {

void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override;

void ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) override;

bool IsProfileEvent() const override { return false; }

private:
Expand Down Expand Up @@ -153,6 +165,9 @@ class TaskProfileEvent : public TaskEvent {

void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override;

void ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) override;

bool IsProfileEvent() const override { return true; }

void SetEndTime(int64_t end_time) { end_time_ = end_time; }
Expand All @@ -177,6 +192,7 @@ enum TaskEventBufferCounter {
kNumTaskProfileEventsStored,
kNumTaskStatusEventsStored,
kNumDroppedTaskAttemptsStored,
kNumDroppedTaskStatusEventsForExportAPIStored,
kTotalNumTaskProfileEventDropped,
kTotalNumTaskStatusEventDropped,
kTotalNumTaskAttemptsReported,
Expand Down Expand Up @@ -295,10 +311,14 @@ class TaskEventBufferImpl : public TaskEventBuffer {
/// Get data related to task status events to be send to GCS.
///
/// \param[out] status_events_to_send Task status events to be sent.
/// \param[out] dropped_status_events_to_write Full task status events that were dropped
/// and will not be send to GCS. These will only be written to the Export
/// API.
/// \param[out] dropped_task_attempts_to_send Task attempts that were dropped due to
/// status events being dropped.
void GetTaskStatusEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> *dropped_status_events_to_write,
absl::flat_hash_set<TaskAttempt> *dropped_task_attempts_to_send)
ABSL_LOCKS_EXCLUDED(mutex_);

Expand All @@ -321,6 +341,17 @@ class TaskEventBufferImpl : public TaskEventBuffer {
std::vector<std::unique_ptr<TaskEvent>> &&profile_events_to_send,
absl::flat_hash_set<TaskAttempt> &&dropped_task_attempts_to_send);

/// Write task events for the Export API.
///
/// \param status_events_to_send Task status events to be written.
/// \param status_events_to_send Task status events that were dropped and will
/// not be sent to GCS. These will still be written in the Export API.
/// \param profile_events_to_send Task profile events to be written.
void WriteExportData(
std::vector<std::unique_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> &&dropped_status_events_to_write,
std::vector<std::unique_ptr<TaskEvent>> &&profile_events_to_send);

/// Reset the counters during flushing data to GCS.
void ResetCountersForFlush();

Expand Down Expand Up @@ -390,6 +421,11 @@ class TaskEventBufferImpl : public TaskEventBuffer {
boost::circular_buffer<std::unique_ptr<TaskEvent>> status_events_
ABSL_GUARDED_BY(mutex_);

/// Status events that were dropped but will still be written in
/// the export API. Circular buffer to limit memory for these dropped events.
boost::circular_buffer<std::unique_ptr<TaskEvent>> dropped_status_events_for_export_
ABSL_GUARDED_BY(mutex_);

/// Buffered task attempts that were dropped due to status events being dropped.
/// This will be sent to GCS to surface the dropped task attempts.
absl::flat_hash_set<TaskAttempt> dropped_task_attempts_unreported_
Expand All @@ -407,6 +443,9 @@ class TaskEventBufferImpl : public TaskEventBuffer {
/// process them quick enough.
std::atomic<bool> grpc_in_progress_ = false;

/// Contains value of RayConfig::instance().enable_export_api_write()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Contains value of RayConfig::instance().enable_export_api_write()
/// If true, task events are exported for Export API

bool export_event_write_enabled_ = false;

FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail);
FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend);
FRIEND_TEST(TaskEventBufferTest, TestAddEvent);
Expand All @@ -417,6 +456,7 @@ class TaskEventBufferImpl : public TaskEventBuffer {
FRIEND_TEST(TaskEventBufferTestLimitBuffer, TestBufferSizeLimitStatusEvents);
FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestBufferSizeLimitProfileEvents);
FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestLimitProfileEventsPerTask);
FRIEND_TEST(TaskEventTestWriteExport, TestWriteTaskExportEvents);
};

} // namespace worker
Expand Down
Loading