Skip to content

Commit

Permalink
[observability][export-api] Write task events (ray-project#47193)
Browse files Browse the repository at this point in the history
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false.
All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well.
The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush.
Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second).
Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file.

Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
nikitavemuri authored and ujjawal-khare committed Oct 15, 2024
1 parent 80025e8 commit a6f382c
Showing 1 changed file with 2 additions and 10 deletions.
12 changes: 2 additions & 10 deletions src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,8 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) {

void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) {
auto element = export_log_reporter_map_.find(export_event.source_type());
if (element != export_log_reporter_map_.end()) {
(element->second)->ReportExportEvent(export_event);
} else {
RAY_LOG(FATAL)
<< "RayEventInit wasn't called with the necessary source type "
<< ExportEvent_SourceType_Name(export_event.source_type())
<< ". This indicates a bug in the code, and the event will be dropped.";
}
RAY_CHECK(element != export_log_reporter_map_.end());
(element->second)->ReportExportEvent(export_event);
}

void EventManager::AddReporter(std::shared_ptr<BaseEventReporter> reporter) {
Expand Down Expand Up @@ -430,7 +424,6 @@ void RayExportEvent::SendEvent() {
rpc::ExportEvent export_event;
export_event.set_event_id(event_id);
export_event.set_timestamp(current_sys_time_s());

if (auto ptr_to_task_event_data_ptr =
std::get_if<std::shared_ptr<rpc::ExportTaskEventData>>(&event_data_ptr_)) {
export_event.mutable_task_event_data()->CopyFrom(*(*ptr_to_task_event_data_ptr));
Expand Down Expand Up @@ -458,7 +451,6 @@ void RayExportEvent::SendEvent() {
RAY_LOG(FATAL) << "Invalid event_data type.";
return;
}

EventManager::Instance().PublishExportEvent(export_event);
}

Expand Down

0 comments on commit a6f382c

Please sign in to comment.