From e983f0c723c3eb148c8cf682f4b897b6f7887a74 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Mon, 9 Sep 2024 10:13:21 -0700 Subject: [PATCH] [observability][export-api] Write task events (#47538) - Re add code changes from [observability][export-api] Write task events #47193, which was previous reverted due to CI test linux://:task_event_buffer_test is consistently_failing #47519, CI test windows://:task_event_buffer_test is consistently_failing #47523 and CI test darwin://:task_event_buffer_test is consistently_failing #47525 - Was able to reproduce the failures locally and fixed test in 07efa6f. Failure was due to logical merge conflict (previous PR wasn't re-based off latest master after other event PRs were merged). Signed-off-by: Nikita Vemuri Co-authored-by: Nikita Vemuri Signed-off-by: ujjawal-khare --- src/ray/common/ray_config_def.h | 10 ++ src/ray/core_worker/core_worker_process.cc | 3 +- src/ray/core_worker/task_event_buffer.cc | 187 +++++++++++++++++++-- src/ray/gcs/pb_util.h | 82 +++++++++ 4 files changed, 265 insertions(+), 17 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 3ba00f7b776e..bcc960158a07 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -498,10 +498,20 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60) /// workers. Events will be evicted based on a FIFO order. RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000) +/// Max number of task status events that will be stored to export +/// for the export API. Events will be evicted based on a FIFO order. +RAY_CONFIG(uint64_t, + task_events_max_num_export_status_events_buffer_on_worker, + 1000 * 1000) + /// Max number of task events to be send in a single message to GCS. This caps both /// the message size, and also the processing work on GCS. RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000) +/// Max number of task events to be written in a single flush iteration. This +/// caps the number of file writes per iteration. +RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000) + /// Max number of profile events allowed to be tracked for a single task. /// Setting the value to -1 allows unlimited profile events to be tracked. RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index d587fcee8b7e..191788e7e045 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) { const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER}; + ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER, + ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; RayEventInit(source_types, absl::flat_hash_map(), options_.log_dir, diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 115811d3a999..1971b5efd563 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -15,6 +15,7 @@ #include "ray/core_worker/task_event_buffer.h" #include "ray/gcs/pb_util.h" +#include "ray/util/event.h" namespace ray { namespace core { @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { } } +void TaskStatusEvent::ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) { + // Base fields + rpc_task_export_event_data->set_task_id(task_id_.Binary()); + rpc_task_export_event_data->set_job_id(job_id_.Binary()); + rpc_task_export_event_data->set_attempt_number(attempt_number_); + + // Task info. + if (task_spec_) { + gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_); + } + + // Task status update. + auto dst_state_update = rpc_task_export_event_data->mutable_state_updates(); + gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update); + + if (!state_update_.has_value()) { + return; + } + + if (state_update_->node_id_.has_value()) { + RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) + << "Node ID should be included when task status changes to " + "SUBMITTED_TO_WORKER."; + dst_state_update->set_node_id(state_update_->node_id_->Binary()); + } + + if (state_update_->worker_id_.has_value()) { + RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) + << "Worker ID should be included when task status changes to " + "SUBMITTED_TO_WORKER."; + dst_state_update->set_worker_id(state_update_->worker_id_->Binary()); + } + + if (state_update_->error_info_.has_value()) { + auto error_info = dst_state_update->mutable_error_info(); + error_info->set_error_message((*state_update_->error_info_).error_message()); + error_info->set_error_type((*state_update_->error_info_).error_type()); + } + + if (state_update_->task_log_info_.has_value()) { + rpc::ExportTaskEventData::TaskLogInfo export_task_log_info; + gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(), + &export_task_log_info); + dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info); + } + + if (state_update_->pid_.has_value()) { + dst_state_update->set_worker_pid(state_update_->pid_.value()); + } + + if (state_update_->is_debugger_paused_.has_value()) { + dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value()); + } +} + void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { // Rate limit on the number of profiling events from the task. This is especially the // case if a driver has many profiling events when submitting tasks @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { event_entry->set_extra_data(std::move(extra_data_)); } +void TaskProfileEvent::ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) { + auto profile_events = rpc_task_export_event_data->mutable_profile_events(); + + // Base fields + rpc_task_export_event_data->set_task_id(task_id_.Binary()); + rpc_task_export_event_data->set_job_id(job_id_.Binary()); + rpc_task_export_event_data->set_attempt_number(attempt_number_); + profile_events->set_component_type(std::move(component_type_)); + profile_events->set_component_id(std::move(component_id_)); + profile_events->set_node_ip_address(std::move(node_ip_address_)); + auto event_entry = profile_events->add_events(); + event_entry->set_event_name(std::move(event_name_)); + event_entry->set_start_time(start_time_); + event_entry->set_end_time(end_time_); + event_entry->set_extra_data(std::move(extra_data_)); +} + TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr gcs_client) : work_guard_(boost::asio::make_work_guard(io_service_)), periodical_runner_(io_service_), @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); } Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); + export_event_write_enabled_ = RayConfig::instance().enable_export_api_write(); auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms(); RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; status_events_.set_capacity( RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()); + status_events_for_export_.set_capacity( + RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker()); io_thread_ = std::thread([this]() { #ifndef _WIN32 @@ -210,10 +288,27 @@ void TaskEventBufferImpl::Stop() { bool TaskEventBufferImpl::Enabled() const { return enabled_; } void TaskEventBufferImpl::GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, + std::vector> *status_events_to_send, + std::vector> *status_events_to_write_for_export, absl::flat_hash_set *dropped_task_attempts_to_send) { absl::MutexLock lock(&mutex_); + // Get the export events data to write. + if (export_event_write_enabled_) { + size_t num_to_write = std::min( + static_cast(RayConfig::instance().export_task_events_write_batch_size()), + static_cast(status_events_for_export_.size())); + status_events_to_write_for_export->insert( + status_events_to_write_for_export->end(), + std::make_move_iterator(status_events_for_export_.begin()), + std::make_move_iterator(status_events_for_export_.begin() + num_to_write)); + status_events_for_export_.erase(status_events_for_export_.begin(), + status_events_for_export_.begin() + num_to_write); + stats_counter_.Decrement( + TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored, + status_events_to_write_for_export->size()); + } + // No data to send. if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) { return; @@ -252,7 +347,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend( } void TaskEventBufferImpl::GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) { + std::vector> *profile_events_to_send) { absl::MutexLock lock(&profile_mutex_); size_t batch_size = @@ -278,13 +373,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend( } std::unique_ptr TaskEventBufferImpl::CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send) { // Aggregate the task events by TaskAttempt. absl::flat_hash_map agg_task_events; auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send]( - std::unique_ptr &event) { + std::shared_ptr &event) { if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) { // We are marking this as data loss due to some missing task status updates. // We will not send this event to GCS. @@ -331,6 +426,45 @@ std::unique_ptr TaskEventBufferImpl::CreateDataToSend( return data; } +void TaskEventBufferImpl::WriteExportData( + std::vector> &&status_events_to_write_for_export, + std::vector> &&profile_events_to_send) { + absl::flat_hash_map> + agg_task_events; + // Maintain insertion order to agg_task_events so events are written + // in the same order as the buffer. + std::vector agg_task_event_insertion_order; + auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order]( + std::shared_ptr &event) { + // Aggregate events by task attempt before converting to proto + auto itr = agg_task_events.find(event->GetTaskAttempt()); + if (itr == agg_task_events.end()) { + // Insert event into agg_task_events if the task attempt of that + // event wasn't already added. + auto event_for_attempt = std::make_shared(); + auto inserted = + agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt}); + RAY_CHECK(inserted.second); + agg_task_event_insertion_order.push_back(event->GetTaskAttempt()); + event->ToRpcTaskExportEvents(event_for_attempt); + } else { + event->ToRpcTaskExportEvents(itr->second); + } + }; + + std::for_each(status_events_to_write_for_export.begin(), + status_events_to_write_for_export.end(), + to_rpc_event_fn); + std::for_each( + profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn); + + for (const auto &task_attempt : agg_task_event_insertion_order) { + auto it = agg_task_events.find(task_attempt); + RAY_CHECK(it != agg_task_events.end()); + RayExportEvent(it->second).SendEvent(); + } +} + void TaskEventBufferImpl::FlushEvents(bool forced) { if (!enabled_) { return; @@ -350,13 +484,16 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { } // Take out status events from the buffer. - std::vector> status_events_to_send; + std::vector> status_events_to_send; + std::vector> status_events_to_write_for_export; absl::flat_hash_set dropped_task_attempts_to_send; status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); - GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send); + GetTaskStatusEventsToSend(&status_events_to_send, + &status_events_to_write_for_export, + &dropped_task_attempts_to_send); // Take profile events from the status events. - std::vector> profile_events_to_send; + std::vector> profile_events_to_send; profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); GetTaskProfileEventsToSend(&profile_events_to_send); @@ -365,6 +502,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { CreateDataToSend(std::move(status_events_to_send), std::move(profile_events_to_send), std::move(dropped_task_attempts_to_send)); + if (export_event_write_enabled_) { + WriteExportData(std::move(status_events_to_write_for_export), + std::move(profile_events_to_send)); + } gcs::TaskInfoAccessor *task_accessor; { @@ -438,8 +579,19 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e if (!enabled_) { return; } + std::shared_ptr status_event_shared_ptr = std::move(status_event); - if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) { + if (export_event_write_enabled_) { + // If status_events_for_export_ is full, the oldest event will be + // dropped in the circular buffer and replaced with the current event. + if (!status_events_for_export_.full()) { + stats_counter_.Increment( + TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored); + } + status_events_for_export_.push_back(status_event_shared_ptr); + } + if (dropped_task_attempts_unreported_.count( + status_event_shared_ptr->GetTaskAttempt())) { // This task attempt has been dropped before, so we drop this event. stats_counter_.Increment( TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush); @@ -454,7 +606,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e RAY_LOG_EVERY_N(WARNING, 100000) << "Dropping task status events for task: " - << status_event->GetTaskAttempt().first + << status_event_shared_ptr->GetTaskAttempt().first << ", set a higher value for " "RAY_task_events_max_num_status_events_buffer_on_worker(" << RayConfig::instance().task_events_max_num_status_events_buffer_on_worker() @@ -466,7 +618,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e } else { stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored); } - status_events_.push_back(std::move(status_event)); + status_events_.push_back(status_event_shared_ptr); } void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile_event) { @@ -474,10 +626,12 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile if (!enabled_) { return; } - auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt()); + std::shared_ptr profile_event_shared_ptr = std::move(profile_event); + auto profile_events_itr = + profile_events_.find(profile_event_shared_ptr->GetTaskAttempt()); if (profile_events_itr == profile_events_.end()) { - auto inserted = profile_events_.insert( - {profile_event->GetTaskAttempt(), std::vector>()}); + auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(), + std::vector>()}); RAY_CHECK(inserted.second); profile_events_itr = inserted.first; } @@ -501,7 +655,8 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile // driver task id and it could generate large number of profile events when submitting // many tasks. RAY_LOG_EVERY_N(WARNING, 100000) - << "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first + << "Dropping profiling events for task: " + << profile_event_shared_ptr->GetTaskAttempt().first << ", set a higher value for RAY_task_events_max_num_profile_events_per_task(" << max_num_profile_event_per_task << "), or RAY_task_events_max_num_profile_events_buffer_on_worker (" @@ -510,7 +665,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile } stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored); - profile_events_itr->second.push_back(std::move(profile_event)); + profile_events_itr->second.push_back(profile_event_shared_ptr); } const std::string TaskEventBufferImpl::DebugString() { diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index b97c0c75bf2c..cb3c518072b2 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -21,6 +21,7 @@ #include "ray/common/ray_config.h" #include "ray/common/task/task_spec.h" #include "src/ray/protobuf/autoscaler.pb.h" +#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -256,6 +257,59 @@ inline void FillTaskInfo(rpc::TaskInfoEntry *task_info, } } +// Fill task_info for the export API with task specification from task_spec +inline void FillExportTaskInfo(rpc::ExportTaskEventData::TaskInfoEntry *task_info, + const TaskSpecification &task_spec) { + rpc::TaskType type; + if (task_spec.IsNormalTask()) { + type = rpc::TaskType::NORMAL_TASK; + } else if (task_spec.IsDriverTask()) { + type = rpc::TaskType::DRIVER_TASK; + } else if (task_spec.IsActorCreationTask()) { + type = rpc::TaskType::ACTOR_CREATION_TASK; + task_info->set_actor_id(task_spec.ActorCreationId().Binary()); + } else { + RAY_CHECK(task_spec.IsActorTask()); + type = rpc::TaskType::ACTOR_TASK; + task_info->set_actor_id(task_spec.ActorId().Binary()); + } + task_info->set_type(type); + task_info->set_language(task_spec.GetLanguage()); + task_info->set_func_or_class_name(task_spec.FunctionDescriptor()->CallString()); + + task_info->set_task_id(task_spec.TaskId().Binary()); + // NOTE: we set the parent task id of a task to be submitter's task id, where + // the submitter depends on the owner coreworker's: + // - if the owner coreworker runs a normal task, the submitter's task id is the task id. + // - if the owner coreworker runs an actor, the submitter's task id will be the actor's + // creation task id. + task_info->set_parent_task_id(task_spec.SubmitterTaskId().Binary()); + const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); + task_info->mutable_required_resources()->insert(resources_map.begin(), + resources_map.end()); + + auto export_runtime_env_info = task_info->mutable_runtime_env_info(); + export_runtime_env_info->set_serialized_runtime_env( + task_spec.RuntimeEnvInfo().serialized_runtime_env()); + auto export_runtime_env_uris = export_runtime_env_info->mutable_uris(); + export_runtime_env_uris->set_working_dir_uri( + task_spec.RuntimeEnvInfo().uris().working_dir_uri()); + export_runtime_env_uris->mutable_py_modules_uris()->CopyFrom( + task_spec.RuntimeEnvInfo().uris().py_modules_uris()); + auto export_runtime_env_config = export_runtime_env_info->mutable_runtime_env_config(); + export_runtime_env_config->set_setup_timeout_seconds( + task_spec.RuntimeEnvInfo().runtime_env_config().setup_timeout_seconds()); + export_runtime_env_config->set_eager_install( + task_spec.RuntimeEnvInfo().runtime_env_config().eager_install()); + export_runtime_env_config->mutable_log_files()->CopyFrom( + task_spec.RuntimeEnvInfo().runtime_env_config().log_files()); + + const auto &pg_id = task_spec.PlacementGroupBundleId().first; + if (!pg_id.IsNil()) { + task_info->set_placement_group_id(pg_id.Binary()); + } +} + /// Generate a RayErrorInfo from ErrorType inline rpc::RayErrorInfo GetRayErrorInfo(const rpc::ErrorType &error_type, const std::string &error_msg = "") { @@ -336,6 +390,34 @@ inline void FillTaskStatusUpdateTime(const ray::rpc::TaskStatus &task_status, (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; } +/// Fill the rpc::ExportTaskEventData::TaskStateUpdate with the timestamps +/// according to the status change. +/// +/// \param task_status The task status. +/// \param timestamp The timestamp. +/// \param[out] state_updates The state updates with timestamp to be updated. +inline void FillExportTaskStatusUpdateTime( + const ray::rpc::TaskStatus &task_status, + int64_t timestamp, + rpc::ExportTaskEventData::TaskStateUpdate *state_updates) { + if (task_status == rpc::TaskStatus::NIL) { + // Not status change. + return; + } + (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; +} + +/// Convert rpc::TaskLogInfo to rpc::ExportTaskEventData::TaskLogInfo +inline void TaskLogInfoToExport(const rpc::TaskLogInfo &src, + rpc::ExportTaskEventData::TaskLogInfo *dest) { + dest->set_stdout_file(src.stdout_file()); + dest->set_stderr_file(src.stderr_file()); + dest->set_stdout_start(src.stdout_start()); + dest->set_stdout_end(src.stdout_end()); + dest->set_stderr_start(src.stderr_start()); + dest->set_stderr_end(src.stderr_end()); +} + inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) { return kPlacementGroupConstraintKeyPrefix + pg_id; }