diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index bcc960158a07..3ba00f7b776e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -498,20 +498,10 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60) /// workers. Events will be evicted based on a FIFO order. RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000) -/// Max number of task status events that will be stored to export -/// for the export API. Events will be evicted based on a FIFO order. -RAY_CONFIG(uint64_t, - task_events_max_num_export_status_events_buffer_on_worker, - 1000 * 1000) - /// Max number of task events to be send in a single message to GCS. This caps both /// the message size, and also the processing work on GCS. RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000) -/// Max number of task events to be written in a single flush iteration. This -/// caps the number of file writes per iteration. -RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000) - /// Max number of profile events allowed to be tracked for a single task. /// Setting the value to -1 allows unlimited profile events to be tracked. RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 191788e7e045..d587fcee8b7e 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -137,8 +137,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) { const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER, - ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; + ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER}; RayEventInit(source_types, absl::flat_hash_map(), options_.log_dir, diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 1971b5efd563..115811d3a999 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -15,7 +15,6 @@ #include "ray/core_worker/task_event_buffer.h" #include "ray/gcs/pb_util.h" -#include "ray/util/event.h" namespace ray { namespace core { @@ -109,62 +108,6 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { } } -void TaskStatusEvent::ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) { - // Base fields - rpc_task_export_event_data->set_task_id(task_id_.Binary()); - rpc_task_export_event_data->set_job_id(job_id_.Binary()); - rpc_task_export_event_data->set_attempt_number(attempt_number_); - - // Task info. - if (task_spec_) { - gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_); - } - - // Task status update. - auto dst_state_update = rpc_task_export_event_data->mutable_state_updates(); - gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update); - - if (!state_update_.has_value()) { - return; - } - - if (state_update_->node_id_.has_value()) { - RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) - << "Node ID should be included when task status changes to " - "SUBMITTED_TO_WORKER."; - dst_state_update->set_node_id(state_update_->node_id_->Binary()); - } - - if (state_update_->worker_id_.has_value()) { - RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) - << "Worker ID should be included when task status changes to " - "SUBMITTED_TO_WORKER."; - dst_state_update->set_worker_id(state_update_->worker_id_->Binary()); - } - - if (state_update_->error_info_.has_value()) { - auto error_info = dst_state_update->mutable_error_info(); - error_info->set_error_message((*state_update_->error_info_).error_message()); - error_info->set_error_type((*state_update_->error_info_).error_type()); - } - - if (state_update_->task_log_info_.has_value()) { - rpc::ExportTaskEventData::TaskLogInfo export_task_log_info; - gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(), - &export_task_log_info); - dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info); - } - - if (state_update_->pid_.has_value()) { - dst_state_update->set_worker_pid(state_update_->pid_.value()); - } - - if (state_update_->is_debugger_paused_.has_value()) { - dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value()); - } -} - void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { // Rate limit on the number of profiling events from the task. This is especially the // case if a driver has many profiling events when submitting tasks @@ -184,24 +127,6 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { event_entry->set_extra_data(std::move(extra_data_)); } -void TaskProfileEvent::ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) { - auto profile_events = rpc_task_export_event_data->mutable_profile_events(); - - // Base fields - rpc_task_export_event_data->set_task_id(task_id_.Binary()); - rpc_task_export_event_data->set_job_id(job_id_.Binary()); - rpc_task_export_event_data->set_attempt_number(attempt_number_); - profile_events->set_component_type(std::move(component_type_)); - profile_events->set_component_id(std::move(component_id_)); - profile_events->set_node_ip_address(std::move(node_ip_address_)); - auto event_entry = profile_events->add_events(); - event_entry->set_event_name(std::move(event_name_)); - event_entry->set_start_time(start_time_); - event_entry->set_end_time(end_time_); - event_entry->set_extra_data(std::move(extra_data_)); -} - TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr gcs_client) : work_guard_(boost::asio::make_work_guard(io_service_)), periodical_runner_(io_service_), @@ -212,15 +137,12 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); } Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); - export_event_write_enabled_ = RayConfig::instance().enable_export_api_write(); auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms(); RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; status_events_.set_capacity( RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()); - status_events_for_export_.set_capacity( - RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker()); io_thread_ = std::thread([this]() { #ifndef _WIN32 @@ -288,27 +210,10 @@ void TaskEventBufferImpl::Stop() { bool TaskEventBufferImpl::Enabled() const { return enabled_; } void TaskEventBufferImpl::GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, - std::vector> *status_events_to_write_for_export, + std::vector> *status_events_to_send, absl::flat_hash_set *dropped_task_attempts_to_send) { absl::MutexLock lock(&mutex_); - // Get the export events data to write. - if (export_event_write_enabled_) { - size_t num_to_write = std::min( - static_cast(RayConfig::instance().export_task_events_write_batch_size()), - static_cast(status_events_for_export_.size())); - status_events_to_write_for_export->insert( - status_events_to_write_for_export->end(), - std::make_move_iterator(status_events_for_export_.begin()), - std::make_move_iterator(status_events_for_export_.begin() + num_to_write)); - status_events_for_export_.erase(status_events_for_export_.begin(), - status_events_for_export_.begin() + num_to_write); - stats_counter_.Decrement( - TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored, - status_events_to_write_for_export->size()); - } - // No data to send. if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) { return; @@ -347,7 +252,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend( } void TaskEventBufferImpl::GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) { + std::vector> *profile_events_to_send) { absl::MutexLock lock(&profile_mutex_); size_t batch_size = @@ -373,13 +278,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend( } std::unique_ptr TaskEventBufferImpl::CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send) { // Aggregate the task events by TaskAttempt. absl::flat_hash_map agg_task_events; auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send]( - std::shared_ptr &event) { + std::unique_ptr &event) { if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) { // We are marking this as data loss due to some missing task status updates. // We will not send this event to GCS. @@ -426,45 +331,6 @@ std::unique_ptr TaskEventBufferImpl::CreateDataToSend( return data; } -void TaskEventBufferImpl::WriteExportData( - std::vector> &&status_events_to_write_for_export, - std::vector> &&profile_events_to_send) { - absl::flat_hash_map> - agg_task_events; - // Maintain insertion order to agg_task_events so events are written - // in the same order as the buffer. - std::vector agg_task_event_insertion_order; - auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order]( - std::shared_ptr &event) { - // Aggregate events by task attempt before converting to proto - auto itr = agg_task_events.find(event->GetTaskAttempt()); - if (itr == agg_task_events.end()) { - // Insert event into agg_task_events if the task attempt of that - // event wasn't already added. - auto event_for_attempt = std::make_shared(); - auto inserted = - agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt}); - RAY_CHECK(inserted.second); - agg_task_event_insertion_order.push_back(event->GetTaskAttempt()); - event->ToRpcTaskExportEvents(event_for_attempt); - } else { - event->ToRpcTaskExportEvents(itr->second); - } - }; - - std::for_each(status_events_to_write_for_export.begin(), - status_events_to_write_for_export.end(), - to_rpc_event_fn); - std::for_each( - profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn); - - for (const auto &task_attempt : agg_task_event_insertion_order) { - auto it = agg_task_events.find(task_attempt); - RAY_CHECK(it != agg_task_events.end()); - RayExportEvent(it->second).SendEvent(); - } -} - void TaskEventBufferImpl::FlushEvents(bool forced) { if (!enabled_) { return; @@ -484,16 +350,13 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { } // Take out status events from the buffer. - std::vector> status_events_to_send; - std::vector> status_events_to_write_for_export; + std::vector> status_events_to_send; absl::flat_hash_set dropped_task_attempts_to_send; status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); - GetTaskStatusEventsToSend(&status_events_to_send, - &status_events_to_write_for_export, - &dropped_task_attempts_to_send); + GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send); // Take profile events from the status events. - std::vector> profile_events_to_send; + std::vector> profile_events_to_send; profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); GetTaskProfileEventsToSend(&profile_events_to_send); @@ -502,10 +365,6 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { CreateDataToSend(std::move(status_events_to_send), std::move(profile_events_to_send), std::move(dropped_task_attempts_to_send)); - if (export_event_write_enabled_) { - WriteExportData(std::move(status_events_to_write_for_export), - std::move(profile_events_to_send)); - } gcs::TaskInfoAccessor *task_accessor; { @@ -579,19 +438,8 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e if (!enabled_) { return; } - std::shared_ptr status_event_shared_ptr = std::move(status_event); - if (export_event_write_enabled_) { - // If status_events_for_export_ is full, the oldest event will be - // dropped in the circular buffer and replaced with the current event. - if (!status_events_for_export_.full()) { - stats_counter_.Increment( - TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored); - } - status_events_for_export_.push_back(status_event_shared_ptr); - } - if (dropped_task_attempts_unreported_.count( - status_event_shared_ptr->GetTaskAttempt())) { + if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) { // This task attempt has been dropped before, so we drop this event. stats_counter_.Increment( TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush); @@ -606,7 +454,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e RAY_LOG_EVERY_N(WARNING, 100000) << "Dropping task status events for task: " - << status_event_shared_ptr->GetTaskAttempt().first + << status_event->GetTaskAttempt().first << ", set a higher value for " "RAY_task_events_max_num_status_events_buffer_on_worker(" << RayConfig::instance().task_events_max_num_status_events_buffer_on_worker() @@ -618,7 +466,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e } else { stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored); } - status_events_.push_back(status_event_shared_ptr); + status_events_.push_back(std::move(status_event)); } void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile_event) { @@ -626,12 +474,10 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile if (!enabled_) { return; } - std::shared_ptr profile_event_shared_ptr = std::move(profile_event); - auto profile_events_itr = - profile_events_.find(profile_event_shared_ptr->GetTaskAttempt()); + auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt()); if (profile_events_itr == profile_events_.end()) { - auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(), - std::vector>()}); + auto inserted = profile_events_.insert( + {profile_event->GetTaskAttempt(), std::vector>()}); RAY_CHECK(inserted.second); profile_events_itr = inserted.first; } @@ -655,8 +501,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile // driver task id and it could generate large number of profile events when submitting // many tasks. RAY_LOG_EVERY_N(WARNING, 100000) - << "Dropping profiling events for task: " - << profile_event_shared_ptr->GetTaskAttempt().first + << "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first << ", set a higher value for RAY_task_events_max_num_profile_events_per_task(" << max_num_profile_event_per_task << "), or RAY_task_events_max_num_profile_events_buffer_on_worker (" @@ -665,7 +510,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile } stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored); - profile_events_itr->second.push_back(profile_event_shared_ptr); + profile_events_itr->second.push_back(std::move(profile_event)); } const std::string TaskEventBufferImpl::DebugString() { diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index f19b4d466ca2..675cbdc3475e 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -27,7 +27,6 @@ #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/util/counter_map.h" -#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -58,12 +57,6 @@ class TaskEvent { /// \param[out] rpc_task_events The rpc task event to be filled. virtual void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) = 0; - /// Convert itself a rpc::ExportTaskEventData - /// - /// \param[out] rpc_task_export_event_data The rpc export task event data to be filled. - virtual void ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) = 0; - /// If it is a profile event. virtual bool IsProfileEvent() const = 0; @@ -133,9 +126,6 @@ class TaskStatusEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; - void ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) override; - bool IsProfileEvent() const override { return false; } private: @@ -163,9 +153,6 @@ class TaskProfileEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; - void ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) override; - bool IsProfileEvent() const override { return true; } void SetEndTime(int64_t end_time) { end_time_ = end_time; } @@ -190,7 +177,6 @@ enum TaskEventBufferCounter { kNumTaskProfileEventsStored, kNumTaskStatusEventsStored, kNumDroppedTaskAttemptsStored, - kNumTaskStatusEventsForExportAPIStored, kTotalNumTaskProfileEventDropped, kTotalNumTaskStatusEventDropped, kTotalNumTaskAttemptsReported, @@ -309,15 +295,10 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// Get data related to task status events to be send to GCS. /// /// \param[out] status_events_to_send Task status events to be sent. - /// \param[out] status_events_to_write_for_export Task status events that will - /// be written to the Export API. This includes both status events - /// that are sent to GCS, and as many dropped status events that - /// fit in the buffer. /// \param[out] dropped_task_attempts_to_send Task attempts that were dropped due to /// status events being dropped. void GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, - std::vector> *status_events_to_write_for_export, + std::vector> *status_events_to_send, absl::flat_hash_set *dropped_task_attempts_to_send) ABSL_LOCKS_EXCLUDED(mutex_); @@ -325,7 +306,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// /// \param[out] profile_events_to_send Task profile events to be sent. void GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) + std::vector> *profile_events_to_send) ABSL_LOCKS_EXCLUDED(profile_mutex_); /// Get the task events to GCS. @@ -336,21 +317,10 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// status events being dropped. /// \return A unique_ptr to rpc::TaskEvents to be sent to GCS. std::unique_ptr CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send); - /// Write task events for the Export API. - /// - /// \param status_events_to_write_for_export Task status events that will - /// be written to the Export API. This includes both status events - /// that are sent to GCS, and as many dropped status events that - /// fit in the buffer. - /// \param profile_events_to_send Task profile events to be written. - void WriteExportData( - std::vector> &&status_events_to_write_for_export, - std::vector> &&profile_events_to_send); - /// Reset the counters during flushing data to GCS. void ResetCountersForFlush(); @@ -417,13 +387,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { std::atomic enabled_ = false; /// Circular buffered task status events. - boost::circular_buffer> status_events_ - ABSL_GUARDED_BY(mutex_); - - /// Status events that will be written for the export API. This could - /// contain events that were dropped from being sent to GCS. A circular - /// buffer is used to limit memory. - boost::circular_buffer> status_events_for_export_ + boost::circular_buffer> status_events_ ABSL_GUARDED_BY(mutex_); /// Status events that will be written for the export API. This could @@ -437,7 +401,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { ABSL_GUARDED_BY(mutex_); /// Buffered task profile events. A FIFO queue to be sent to GCS. - absl::flat_hash_map>> + absl::flat_hash_map>> profile_events_ ABSL_GUARDED_BY(profile_mutex_); /// Stats counter map. @@ -448,9 +412,6 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// process them quick enough. std::atomic grpc_in_progress_ = false; - /// If true, task events are exported for Export API - bool export_event_write_enabled_ = false; - FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail); FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend); FRIEND_TEST(TaskEventBufferTest, TestAddEvent); @@ -461,7 +422,6 @@ class TaskEventBufferImpl : public TaskEventBuffer { FRIEND_TEST(TaskEventBufferTestLimitBuffer, TestBufferSizeLimitStatusEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestBufferSizeLimitProfileEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestLimitProfileEventsPerTask); - FRIEND_TEST(TaskEventTestWriteExport, TestWriteTaskExportEvents); }; } // namespace worker diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index 02dd448fa921..63e24485ec09 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -16,9 +16,6 @@ #include -#include -#include - #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" #include "absl/types/optional.h" @@ -27,7 +24,6 @@ #include "mock/ray/gcs/gcs_client/gcs_client.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" -#include "ray/util/event.h" using ::testing::_; using ::testing::Return; @@ -58,7 +54,6 @@ class TaskEventBufferTest : public ::testing::Test { virtual void TearDown() { if (task_event_buffer_) task_event_buffer_->Stop(); - std::filesystem::remove_all(log_dir_.c_str()); }; std::vector GenTaskIDs(size_t num_tasks) { @@ -129,7 +124,6 @@ class TaskEventBufferTest : public ::testing::Test { } std::unique_ptr task_event_buffer_ = nullptr; - std::string log_dir_ = "event_123"; }; class TaskEventBufferTestManualStart : public TaskEventBufferTest { @@ -180,38 +174,6 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest { } }; -class TaskEventTestWriteExport : public TaskEventBufferTest { - public: - TaskEventTestWriteExport() : TaskEventBufferTest() { - RayConfig::instance().initialize( - R"( -{ - "task_events_report_interval_ms": 1000, - "task_events_max_num_status_events_buffer_on_worker": 10, - "task_events_max_num_profile_events_buffer_on_worker": 5, - "task_events_send_batch_size": 100, - "export_task_events_write_batch_size": 1, - "task_events_max_num_export_status_events_buffer_on_worker": 15, - "enable_export_api_write": true -} - )"); - } -}; - -void ReadContentFromFile(std::vector &vc, - std::string log_file, - std::string filter = "") { - std::string line; - std::ifstream read_file; - read_file.open(log_file, std::ios::binary); - while (std::getline(read_file, line)) { - if (filter.empty() || line.find(filter) != std::string::npos) { - vc.push_back(line); - } - } - read_file.close(); -} - TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) { ASSERT_NE(task_event_buffer_, nullptr); @@ -287,106 +249,6 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); } -TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { - /* - Test writing task events to event_EXPORT_TASK_123.log as part of the export API. - This test verifies the following cases: - 1. Task events that are dropped from being sent to GCS (because more events - added than task_events_max_num_status_events_buffer_on_worker) will still - be written for the export API, if the number of events is less than - task_events_max_num_export_status_events_buffer_on_worker. - 2. Task events over task_events_max_num_status_events_buffer_on_worker - will be dropped and not written for the export API. - 3. Each Flush() call only writes a max of export_task_events_write_batch_size - export events. - In this test, 20 events are added which is greater than the max of 10 status events - that can be stored in buffer. The max export status events in buffer is 15, - so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush() - call will write 1 new event because the batch size is 1. - */ - - // {"task_events_max_num_status_events_buffer_on_worker": 10} and - // {"task_events_max_num_export_status_events_buffer_on_worker": 15} - // in TaskEventTestWriteExport so set num_events > 15. - size_t num_events = 20; - // Value of export_task_events_write_batch_size - size_t batch_size = 1; - // Value of task_events_max_num_export_status_events_buffer_on_worker - size_t max_export_events_on_buffer = 15; - auto task_ids = GenTaskIDs(num_events); - google::protobuf::util::JsonPrintOptions options; - options.preserve_proto_field_names = true; - - std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - - std::vector> task_events; - for (const auto &task_id : task_ids) { - task_events.push_back(GenStatusTaskEvent(task_id, 0)); - } - - // Convert all task_events that were added with AddTaskEvent - // to the ExportTaskEventData proto message - std::vector> task_event_data_protos; - for (const auto &task_event : task_events) { - std::shared_ptr event = - std::make_shared(); - task_event->ToRpcTaskExportEvents(event); - task_event_data_protos.push_back(event); - } - - // Add all num_events tasks - for (auto &task_event : task_events) { - task_event_buffer_->AddTaskEvent(std::move(task_event)); - } - - // Verify that batch_size events are being written for each flush - std::vector vc; - for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { - task_event_buffer_->FlushEvents(true); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); - vc.clear(); - } - - // Verify that all max_export_events_on_buffer events are written to file even though - // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker - vc.clear(); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); - for (size_t i = 0; i < max_export_events_on_buffer; i++) { - json export_event_as_json = json::parse(vc[i]); - EXPECT_EQ(export_event_as_json["source_type"].get(), "EXPORT_TASK"); - EXPECT_EQ(export_event_as_json.contains("event_id"), true); - EXPECT_EQ(export_event_as_json.contains("timestamp"), true); - EXPECT_EQ(export_event_as_json.contains("event_data"), true); - - json event_data = export_event_as_json["event_data"].get(); - - // The events written are the last max_export_events_on_buffer added - // in AddTaskEvent because (num_events-max_export_events_on_buffer) - // were dropped. - std::string expected_event_data_str; - RAY_CHECK(google::protobuf::util::MessageToJsonString( - *task_event_data_protos[i + (num_events - max_export_events_on_buffer)], - &expected_event_data_str, - options) - .ok()); - json expected_event_data = json::parse(expected_event_data_str); - EXPECT_EQ(event_data, expected_event_data); - } - - // Expect no more events. - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); -} - TEST_F(TaskEventBufferTest, TestFailedFlush) { size_t num_status_events = 20; size_t num_profile_events = 20; diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index cb3c518072b2..b97c0c75bf2c 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -21,7 +21,6 @@ #include "ray/common/ray_config.h" #include "ray/common/task/task_spec.h" #include "src/ray/protobuf/autoscaler.pb.h" -#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -257,59 +256,6 @@ inline void FillTaskInfo(rpc::TaskInfoEntry *task_info, } } -// Fill task_info for the export API with task specification from task_spec -inline void FillExportTaskInfo(rpc::ExportTaskEventData::TaskInfoEntry *task_info, - const TaskSpecification &task_spec) { - rpc::TaskType type; - if (task_spec.IsNormalTask()) { - type = rpc::TaskType::NORMAL_TASK; - } else if (task_spec.IsDriverTask()) { - type = rpc::TaskType::DRIVER_TASK; - } else if (task_spec.IsActorCreationTask()) { - type = rpc::TaskType::ACTOR_CREATION_TASK; - task_info->set_actor_id(task_spec.ActorCreationId().Binary()); - } else { - RAY_CHECK(task_spec.IsActorTask()); - type = rpc::TaskType::ACTOR_TASK; - task_info->set_actor_id(task_spec.ActorId().Binary()); - } - task_info->set_type(type); - task_info->set_language(task_spec.GetLanguage()); - task_info->set_func_or_class_name(task_spec.FunctionDescriptor()->CallString()); - - task_info->set_task_id(task_spec.TaskId().Binary()); - // NOTE: we set the parent task id of a task to be submitter's task id, where - // the submitter depends on the owner coreworker's: - // - if the owner coreworker runs a normal task, the submitter's task id is the task id. - // - if the owner coreworker runs an actor, the submitter's task id will be the actor's - // creation task id. - task_info->set_parent_task_id(task_spec.SubmitterTaskId().Binary()); - const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); - task_info->mutable_required_resources()->insert(resources_map.begin(), - resources_map.end()); - - auto export_runtime_env_info = task_info->mutable_runtime_env_info(); - export_runtime_env_info->set_serialized_runtime_env( - task_spec.RuntimeEnvInfo().serialized_runtime_env()); - auto export_runtime_env_uris = export_runtime_env_info->mutable_uris(); - export_runtime_env_uris->set_working_dir_uri( - task_spec.RuntimeEnvInfo().uris().working_dir_uri()); - export_runtime_env_uris->mutable_py_modules_uris()->CopyFrom( - task_spec.RuntimeEnvInfo().uris().py_modules_uris()); - auto export_runtime_env_config = export_runtime_env_info->mutable_runtime_env_config(); - export_runtime_env_config->set_setup_timeout_seconds( - task_spec.RuntimeEnvInfo().runtime_env_config().setup_timeout_seconds()); - export_runtime_env_config->set_eager_install( - task_spec.RuntimeEnvInfo().runtime_env_config().eager_install()); - export_runtime_env_config->mutable_log_files()->CopyFrom( - task_spec.RuntimeEnvInfo().runtime_env_config().log_files()); - - const auto &pg_id = task_spec.PlacementGroupBundleId().first; - if (!pg_id.IsNil()) { - task_info->set_placement_group_id(pg_id.Binary()); - } -} - /// Generate a RayErrorInfo from ErrorType inline rpc::RayErrorInfo GetRayErrorInfo(const rpc::ErrorType &error_type, const std::string &error_msg = "") { @@ -390,34 +336,6 @@ inline void FillTaskStatusUpdateTime(const ray::rpc::TaskStatus &task_status, (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; } -/// Fill the rpc::ExportTaskEventData::TaskStateUpdate with the timestamps -/// according to the status change. -/// -/// \param task_status The task status. -/// \param timestamp The timestamp. -/// \param[out] state_updates The state updates with timestamp to be updated. -inline void FillExportTaskStatusUpdateTime( - const ray::rpc::TaskStatus &task_status, - int64_t timestamp, - rpc::ExportTaskEventData::TaskStateUpdate *state_updates) { - if (task_status == rpc::TaskStatus::NIL) { - // Not status change. - return; - } - (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; -} - -/// Convert rpc::TaskLogInfo to rpc::ExportTaskEventData::TaskLogInfo -inline void TaskLogInfoToExport(const rpc::TaskLogInfo &src, - rpc::ExportTaskEventData::TaskLogInfo *dest) { - dest->set_stdout_file(src.stdout_file()); - dest->set_stderr_file(src.stderr_file()); - dest->set_stdout_start(src.stdout_start()); - dest->set_stdout_end(src.stdout_end()); - dest->set_stderr_start(src.stderr_start()); - dest->set_stderr_end(src.stderr_end()); -} - inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) { return kPlacementGroupConstraintKeyPrefix + pg_id; } diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 0df740274602..90bdf335eb70 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -198,8 +198,14 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) { auto element = export_log_reporter_map_.find(export_event.source_type()); - RAY_CHECK(element != export_log_reporter_map_.end()); - (element->second)->ReportExportEvent(export_event); + if (element != export_log_reporter_map_.end()) { + (element->second)->ReportExportEvent(export_event); + } else { + RAY_LOG(FATAL) + << "RayEventInit wasn't called with the necessary source type " + << ExportEvent_SourceType_Name(export_event.source_type()) + << ". This indicates a bug in the code, and the event will be dropped."; + } } void EventManager::AddReporter(std::shared_ptr reporter) { @@ -416,6 +422,7 @@ void RayExportEvent::SendEvent() { rpc::ExportEvent export_event; export_event.set_event_id(event_id); export_event.set_timestamp(current_sys_time_s()); + if (auto ptr_to_task_event_data_ptr = std::get_if>(&event_data_ptr_)) { export_event.mutable_task_event_data()->CopyFrom(*(*ptr_to_task_event_data_ptr)); @@ -443,6 +450,7 @@ void RayExportEvent::SendEvent() { RAY_LOG(FATAL) << "Invalid event_data type."; return; } + EventManager::Instance().PublishExportEvent(export_event); }