From caec581e84b2962e3ff81a65198fef07afc23eed Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 5 Sep 2024 16:21:14 -0700 Subject: [PATCH] [observability][export-api] Write task events (#47193) Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare --- src/ray/util/event.cc | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 6703c4f167c0..cbba851c5802 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -206,14 +206,8 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) { auto element = export_log_reporter_map_.find(export_event.source_type()); - if (element != export_log_reporter_map_.end()) { - (element->second)->ReportExportEvent(export_event); - } else { - RAY_LOG(FATAL) - << "RayEventInit wasn't called with the necessary source type " - << ExportEvent_SourceType_Name(export_event.source_type()) - << ". This indicates a bug in the code, and the event will be dropped."; - } + RAY_CHECK(element != export_log_reporter_map_.end()); + (element->second)->ReportExportEvent(export_event); } void EventManager::AddReporter(std::shared_ptr reporter) { @@ -430,7 +424,6 @@ void RayExportEvent::SendEvent() { rpc::ExportEvent export_event; export_event.set_event_id(event_id); export_event.set_timestamp(current_sys_time_s()); - if (auto ptr_to_task_event_data_ptr = std::get_if>(&event_data_ptr_)) { export_event.mutable_task_event_data()->CopyFrom(*(*ptr_to_task_event_data_ptr)); @@ -458,7 +451,6 @@ void RayExportEvent::SendEvent() { RAY_LOG(FATAL) << "Invalid event_data type."; return; } - EventManager::Instance().PublishExportEvent(export_event); }