-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[observability][export-api] Write task events #47193
[observability][export-api] Write task events #47193
Conversation
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
@@ -180,7 +180,9 @@ EventManager &EventManager::Instance() { | |||
return instance_; | |||
} | |||
|
|||
bool EventManager::IsEmpty() { return reporter_map_.empty(); } | |||
bool EventManager::IsEmpty() { | |||
return reporter_map_.empty() && export_log_reporter_map_.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followup from #47143
Signed-off-by: Nikita Vemuri <[email protected]>
<< ExportEvent_SourceType_Name(export_event.source_type()) | ||
<< ". This indicates a bug in the code, and the event will be dropped."; | ||
} | ||
RAY_CHECK(element != export_log_reporter_map_.end()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followup from #47143
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
…ita/write_task_events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, I am not 100% sure if it is a good idea to aggregate events ourselves because it means we will have indeterministic data in the log depending on when we flush. But I wil just let it go for now (we should definitely talk to customers if it is semantic they want)
/// Convert rpc::TaskLogInfo to rpc::ExportTaskEventData::TaskLogInfo | ||
inline void TaskLogInfoToExport(const rpc::TaskLogInfo &src, | ||
rpc::ExportTaskEventData::TaskLogInfo *dest) { | ||
dest->set_stdout_file(src.stdout_file()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: can we just swap here? Is the caller still using src after it is passed?
dest->Swap(&src)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is called with src being state_update_->task_log_info_.value()
so we probably shouldn't change src because it's a TaskStatusEvent
class attribute
int64_t timestamp, | ||
rpc::ExportTaskEventData::TaskStateUpdate *state_updates) { | ||
if (task_status == rpc::TaskStatus::NIL) { | ||
// Not status change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Not status change. | |
// No status change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Qq; when does it happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like NIL
is the default value when a TaskStatusEvent
is initialized. @rickyyx Do you know if FillTaskStatusUpdateTime
is ever called when the task_status = rpc::TaskStatus::NIL
? This logic is mirrored from that function
@@ -211,6 +289,7 @@ bool TaskEventBufferImpl::Enabled() const { return enabled_; } | |||
|
|||
void TaskEventBufferImpl::GetTaskStatusEventsToSend( | |||
std::vector<std::unique_ptr<TaskEvent>> *status_events_to_send, | |||
std::vector<std::unique_ptr<TaskEvent>> *dropped_status_events_to_write, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of maintaining dropped tasks, can we just maintain a new vector just for export events? And we can just make it 10X original vector. I think maintaining another dropped tasks that are not dropped is not intuitive + it can easily screw up ordering depending on task drop policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I needed to make the buffers for these events contain a shared_ptr
rather than unique_ptr
though so the same object can be moved to multiple buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also can you just run microbenchmark to see if there's any noticeable regression (I think it shouldn't affect but just in case)? lmk if you don't know how to run them!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, we can revisit this logic after this initial prototype. The tradeoff here is minimizing the number of events that get written |
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
okay, microbenhcmark result looks good. is this PR ready to review again |
yes |
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. Signed-off-by: ujjawal-khare <[email protected]>
…7536) Reverts ray-project#47193 Signed-off-by: ujjawal-khare <[email protected]>
- Re add code changes from [observability][export-api] Write task events ray-project#47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing ray-project#47519, CI test windows://:task_event_buffer_test is consistently_failing ray-project#47523 and CI test darwin://:task_event_buffer_test is consistently_failing ray-project#47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri <[email protected]> Co-authored-by: Nikita Vemuri <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Why are these changes needed?
RayConfig::instance().enable_export_api_write()
istrue
. Default value isfalse
.dropped_status_events_for_export_
buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well.dropped_status_events_for_export_
is 10x larger thantask_events_max_num_status_events_buffer_on_worker
to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.