diff --git a/BUILD.bazel b/BUILD.bazel index 10b14b4d0624..8f348be2c0ab 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1637,25 +1637,6 @@ ray_cc_test( ], ) -ray_cc_test( - name = "gcs_node_manager_export_event_test", - size = "small", - srcs = [ - "src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc", - ], - tags = [ - "no_windows", - "team:core" - ], - deps = [ - ":gcs_server_lib", - ":gcs_server_test_util", - ":gcs_test_util_lib", - ":ray_mock", - "@com_google_googletest//:gtest_main", - ], -) - ray_cc_test( name = "gcs_job_manager_test", size = "small", @@ -2363,6 +2344,25 @@ ray_cc_test( ], ) +ray_cc_test( + name = "gcs_export_event_test", + size = "small", + srcs = glob([ + "src/ray/gcs/gcs_server/test/export_api/*.cc", + ]), + tags = [ + "no_windows", + "team:core" + ], + deps = [ + ":gcs_server_lib", + ":gcs_server_test_util", + ":gcs_test_util_lib", + ":ray_mock", + "@com_google_googletest//:gtest_main", + ], +) + flatbuffer_cc_library( name = "node_manager_fbs", srcs = ["src/ray/raylet/format/node_manager.fbs"], diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 7e7d361c6af7..5b4c93655707 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -224,6 +224,36 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const { rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } +void GcsActor::WriteActorExportEvent() const { + /// Write actor_table_data_ as a export actor event if + /// enable_export_api_write() is enabled. + if (!RayConfig::instance().enable_export_api_write()) { + return; + } + std::shared_ptr export_actor_data_ptr = + std::make_shared(); + + export_actor_data_ptr->set_actor_id(actor_table_data_.actor_id()); + export_actor_data_ptr->set_job_id(actor_table_data_.job_id()); + export_actor_data_ptr->set_state(ConvertActorStateToExport(actor_table_data_.state())); + export_actor_data_ptr->set_is_detached(actor_table_data_.is_detached()); + export_actor_data_ptr->set_name(actor_table_data_.name()); + export_actor_data_ptr->set_pid(actor_table_data_.pid()); + export_actor_data_ptr->set_ray_namespace(actor_table_data_.ray_namespace()); + export_actor_data_ptr->set_serialized_runtime_env( + actor_table_data_.serialized_runtime_env()); + export_actor_data_ptr->set_class_name(actor_table_data_.class_name()); + export_actor_data_ptr->mutable_death_cause()->CopyFrom(actor_table_data_.death_cause()); + export_actor_data_ptr->mutable_required_resources()->insert( + actor_table_data_.required_resources().begin(), + actor_table_data_.required_resources().end()); + export_actor_data_ptr->set_node_id(actor_table_data_.node_id()); + export_actor_data_ptr->set_placement_group_id(actor_table_data_.placement_group_id()); + export_actor_data_ptr->set_repr_name(actor_table_data_.repr_name()); + + RayExportEvent(export_actor_data_ptr).SendEvent(); +} + rpc::TaskSpec *GcsActor::GetMutableTaskSpec() { return task_spec_.get(); } const ResourceRequest &GcsActor::GetAcquiredResources() const { @@ -660,6 +690,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ [this, actor](const Status &status) { // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); + actor->WriteActorExportEvent(); // If a creator dies before this callback is called, the actor could have // been already destroyed. It is okay not to invoke a callback because we // don't need to reply to the creator as it is already dead. @@ -756,6 +787,7 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, // Pub this state for dashboard showing. RAY_CHECK_OK(gcs_publisher_->PublishActor(actor_id, actor_table_data, nullptr)); + actor->WriteActorExportEvent(); RemoveUnresolvedActor(actor); // Update the registered actor as its creation task specification may have changed due @@ -947,10 +979,11 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor_table_data, - [this, actor_id, actor_table_data](Status status) { + [this, actor, actor_id, actor_table_data](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + actor->WriteActorExportEvent(); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); })); @@ -1234,9 +1267,10 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, - [this, actor_id, mutable_actor_table_data](Status status) { + [this, actor, actor_id, mutable_actor_table_data](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); + actor->WriteActorExportEvent(); })); gcs_actor_scheduler_->Schedule(actor); } else { @@ -1262,6 +1296,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); RAY_CHECK_OK( gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + actor->WriteActorExportEvent(); })); // The actor is dead, but we should not remove the entry from the // registered actors yet. If the actor is owned, we will destroy the actor @@ -1368,6 +1403,7 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &ac [this, actor_id, actor_table_data, actor, reply](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr)); + actor->WriteActorExportEvent(); // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from // actor_to_create_callbacks_. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 2469724047a2..c26834599671 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -30,6 +30,7 @@ #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/worker/core_worker_client.h" #include "ray/util/counter_map.h" +#include "ray/util/event.h" #include "src/ray/protobuf/gcs_service.pb.h" namespace ray { @@ -187,6 +188,9 @@ class GcsActor { /// Get the mutable ActorTableData of this actor. rpc::ActorTableData *GetMutableActorTableData(); rpc::TaskSpec *GetMutableTaskSpec(); + /// Write an event containing this actor's ActorTableData + /// to file for the Export API. + void WriteActorExportEvent() const; const ResourceRequest &GetAcquiredResources() const; void SetAcquiredResources(ResourceRequest &&resource_request); @@ -213,6 +217,27 @@ class GcsActor { last_metric_state_ = cur_state; } + rpc::ExportActorData::ActorState ConvertActorStateToExport( + rpc::ActorTableData::ActorState actor_state) const { + switch (actor_state) { + case rpc::ActorTableData::DEPENDENCIES_UNREADY: + return rpc::ExportActorData::DEPENDENCIES_UNREADY; + case rpc::ActorTableData::PENDING_CREATION: + return rpc::ExportActorData::PENDING_CREATION; + case rpc::ActorTableData::ALIVE: + return rpc::ExportActorData::ALIVE; + case rpc::ActorTableData::RESTARTING: + return rpc::ExportActorData::RESTARTING; + case rpc::ActorTableData::DEAD: + return rpc::ExportActorData::DEAD; + default: + // Unknown rpc::ActorTableData::ActorState value + RAY_LOG(FATAL) << "Invalid value for rpc::ActorTableData::ActorState" + << rpc::ActorTableData::ActorState_Name(actor_state); + return rpc::ExportActorData::DEAD; + } + } + /// The actor meta data which contains the task specification as well as the state of /// the gcs actor and so on (see gcs.proto). rpc::ActorTableData actor_table_data_; diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index e2e3da236c8e..947f1c640ac7 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -88,12 +88,13 @@ int main(int argc, char *argv[]) { // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) { - // This GCS server process emits GCS standard events, and Node export events + // This GCS server process emits GCS standard events, and Node and Actor export events // so the various source types are passed to RayEventInit. The type of an // event is determined by the schema of its event data. const std::vector source_types = { ray::rpc::Event_SourceType::Event_SourceType_GCS, - ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; + ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE, + ray::rpc::ExportEvent_SourceType_EXPORT_ACTOR}; ray::RayEventInit(source_types, absl::flat_hash_map(), log_dir, diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc new file mode 100644 index 000000000000..0d2dc48cc763 --- /dev/null +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc @@ -0,0 +1,331 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +// clang-format off +#include "gtest/gtest.h" +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/test_util.h" +#include "ray/gcs/gcs_server/test/gcs_server_test_util.h" +#include "ray/gcs/test/gcs_test_util.h" +#include "ray/gcs/gcs_server/gcs_kv_manager.h" +#include "mock/ray/gcs/gcs_server/gcs_kv_manager.h" +#include "mock/ray/gcs/gcs_server/gcs_node_manager.h" +#include "mock/ray/pubsub/publisher.h" +#include "ray/util/event.h" +// clang-format on + +namespace ray { + +using ::testing::_; +using ::testing::Return; + +class MockActorScheduler : public gcs::GcsActorSchedulerInterface { + public: + MockActorScheduler() {} + + void Schedule(std::shared_ptr actor) { actors.push_back(actor); } + void Reschedule(std::shared_ptr actor) {} + void ReleaseUnusedActorWorkers( + const absl::flat_hash_map> &node_to_workers) {} + void OnActorDestruction(std::shared_ptr actor) { + const auto &actor_id = actor->GetActorID(); + auto pending_it = + std::find_if(actors.begin(), + actors.end(), + [actor_id](const std::shared_ptr &actor) { + return actor->GetActorID() == actor_id; + }); + if (pending_it != actors.end()) { + actors.erase(pending_it); + } + } + + size_t GetPendingActorsCount() const { return 0; } + bool CancelInFlightActorScheduling(const std::shared_ptr &actor) { + return false; + } + + MOCK_CONST_METHOD0(DebugString, std::string()); + MOCK_METHOD1(CancelOnNode, std::vector(const NodeID &node_id)); + MOCK_METHOD2(CancelOnWorker, ActorID(const NodeID &node_id, const WorkerID &worker_id)); + MOCK_METHOD3(CancelOnLeasing, + void(const NodeID &node_id, + const ActorID &actor_id, + const TaskID &task_id)); + + std::vector> actors; +}; + +class MockWorkerClient : public rpc::CoreWorkerClientInterface { + public: + MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {} + + void WaitForActorOutOfScope( + const rpc::WaitForActorOutOfScopeRequest &request, + const rpc::ClientCallback &callback) override { + callbacks_.push_back(callback); + } + + void KillActor(const rpc::KillActorRequest &request, + const rpc::ClientCallback &callback) override { + killed_actors_.push_back(ActorID::FromBinary(request.intended_actor_id())); + } + + bool Reply(Status status = Status::OK()) { + if (callbacks_.size() == 0) { + return false; + } + + // The created_actors_ of gcs actor manager will be modified in io_service thread. + // In order to avoid multithreading reading and writing created_actors_, we also + // send the `WaitForActorOutOfScope` callback operation to io_service thread. + std::promise promise; + io_service_.post( + [this, status, &promise]() { + auto callback = callbacks_.front(); + auto reply = rpc::WaitForActorOutOfScopeReply(); + callback(status, std::move(reply)); + promise.set_value(false); + }, + "test"); + promise.get_future().get(); + + callbacks_.pop_front(); + return true; + } + + std::list> callbacks_; + std::vector killed_actors_; + instrumented_io_context &io_service_; +}; + +class GcsActorManagerTest : public ::testing::Test { + public: + GcsActorManagerTest() + : mock_actor_scheduler_(new MockActorScheduler()), periodical_runner_(io_service_) { + RayConfig::instance().initialize( + R"( +{ + "maximum_gcs_destroyed_actor_cached_count": 10, + "enable_export_api_write": true +} + )"); + std::promise promise; + thread_io_service_.reset(new std::thread([this, &promise] { + std::unique_ptr work( + new boost::asio::io_service::work(io_service_)); + promise.set_value(true); + io_service_.run(); + })); + promise.get_future().get(); + worker_client_ = std::make_shared(io_service_); + runtime_env_mgr_ = + std::make_unique([](auto, auto f) { f(true); }); + std::vector channels = {rpc::ChannelType::GCS_ACTOR_CHANNEL}; + auto publisher = std::make_unique( + std::vector{ + rpc::ChannelType::GCS_ACTOR_CHANNEL, + }, + /*periodic_runner=*/&periodical_runner_, + /*get_time_ms=*/[]() -> double { return absl::ToUnixMicros(absl::Now()); }, + /*subscriber_timeout_ms=*/absl::ToInt64Microseconds(absl::Seconds(30)), + /*batch_size=*/100); + + gcs_publisher_ = std::make_shared(std::move(publisher)); + store_client_ = std::make_shared(io_service_); + gcs_table_storage_ = std::make_shared(io_service_); + kv_ = std::make_unique(); + function_manager_ = std::make_unique(*kv_); + gcs_actor_manager_ = std::make_unique( + mock_actor_scheduler_, + gcs_table_storage_, + gcs_publisher_, + *runtime_env_mgr_, + *function_manager_, + [](const ActorID &actor_id) {}, + [this](const rpc::Address &addr) { return worker_client_; }); + + for (int i = 1; i <= 10; i++) { + auto job_id = JobID::FromInt(i); + job_namespace_table_[job_id] = ""; + } + log_dir_ = "event_123"; + } + + virtual ~GcsActorManagerTest() { + io_service_.stop(); + thread_io_service_->join(); + std::filesystem::remove_all(log_dir_.c_str()); + } + + void WaitActorCreated(const ActorID &actor_id) { + auto condition = [this, actor_id]() { + // The created_actors_ of gcs actor manager will be modified in io_service thread. + // In order to avoid multithreading reading and writing created_actors_, we also + // send the read operation to io_service thread. + std::promise promise; + io_service_.post( + [this, actor_id, &promise]() { + const auto &created_actors = gcs_actor_manager_->GetCreatedActors(); + for (auto &node_iter : created_actors) { + for (auto &actor_iter : node_iter.second) { + if (actor_iter.second == actor_id) { + promise.set_value(true); + return; + } + } + } + promise.set_value(false); + }, + "test"); + return promise.get_future().get(); + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + } + + rpc::Address RandomAddress() const { + rpc::Address address; + auto node_id = NodeID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + return address; + } + + std::shared_ptr RegisterActor( + const JobID &job_id, + int max_restarts = 0, + bool detached = false, + const std::string &name = "", + const std::string &ray_namespace = "test") { + std::promise> promise; + auto request = Mocker::GenRegisterActorRequest( + job_id, max_restarts, detached, name, ray_namespace); + // `DestroyActor` triggers some asynchronous operations. + // If we register an actor after destroying an actor, it may result in multithreading + // reading and writing the same variable. In order to avoid the problem of + // multithreading, we put `RegisterActor` to io_service thread. + io_service_.post( + [this, request, &promise]() { + auto status = gcs_actor_manager_->RegisterActor( + request, [&promise](std::shared_ptr actor) { + promise.set_value(std::move(actor)); + }); + if (!status.ok()) { + promise.set_value(nullptr); + } + }, + "test"); + return promise.get_future().get(); + } + + instrumented_io_context io_service_; + std::unique_ptr thread_io_service_; + std::shared_ptr store_client_; + std::shared_ptr gcs_table_storage_; + std::shared_ptr mock_actor_scheduler_; + std::shared_ptr worker_client_; + absl::flat_hash_map job_namespace_table_; + std::unique_ptr gcs_actor_manager_; + std::shared_ptr gcs_publisher_; + std::unique_ptr runtime_env_mgr_; + const std::chrono::milliseconds timeout_ms_{2000}; + absl::Mutex mutex_; + std::unique_ptr function_manager_; + std::unique_ptr kv_; + PeriodicalRunner periodical_runner_; + std::string log_dir_; +}; + +TEST_F(GcsActorManagerTest, TestBasic) { + std::vector source_types = { + rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_ACTOR}; + RayEventInit(source_types, absl::flat_hash_map(), log_dir_); + auto job_id = JobID::FromInt(1); + auto registered_actor = RegisterActor(job_id); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetCreationTaskSpecification().GetMessage()); + RAY_CHECK_EQ( + gcs_actor_manager_->CountFor(rpc::ActorTableData::DEPENDENCIES_UNREADY, ""), 1); + + std::vector> finished_actors; + Status status = gcs_actor_manager_->CreateActor( + create_actor_request, + [&finished_actors](const std::shared_ptr &actor, + const rpc::PushTaskReply &reply, + const Status &status) { finished_actors.emplace_back(actor); }); + RAY_CHECK_OK(status); + RAY_CHECK_EQ(gcs_actor_manager_->CountFor(rpc::ActorTableData::PENDING_CREATION, ""), + 1); + + ASSERT_EQ(finished_actors.size(), 0); + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + + // Check that the actor is in state `ALIVE`. + actor->UpdateAddress(RandomAddress()); + gcs_actor_manager_->OnActorCreationSuccess(actor, rpc::PushTaskReply()); + WaitActorCreated(actor->GetActorID()); + ASSERT_EQ(finished_actors.size(), 1); + RAY_CHECK_EQ(gcs_actor_manager_->CountFor(rpc::ActorTableData::ALIVE, ""), 1); + + ASSERT_TRUE(worker_client_->Reply()); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); + RAY_CHECK_EQ(gcs_actor_manager_->CountFor(rpc::ActorTableData::ALIVE, ""), 0); + RAY_CHECK_EQ(gcs_actor_manager_->CountFor(rpc::ActorTableData::DEAD, ""), 1); + + // Check correct export events are written for each of the 4 state transitions + int num_retry = 5; + int num_export_events = 4; + std::vector expected_states = { + "DEPENDENCIES_UNREADY", "PENDING_CREATION", "ALIVE", "DEAD"}; + std::vector vc; + for (int i = 0; i < num_retry; i++) { + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log"); + if ((int)vc.size() == num_export_events) { + for (int event_idx = 0; event_idx < num_export_events; event_idx++) { + json export_event_as_json = json::parse(vc[event_idx]); + json event_data = export_event_as_json["event_data"].get(); + ASSERT_EQ(event_data["state"], expected_states[event_idx]); + if (event_idx == num_export_events - 1) { + // Verify death cause for last actor DEAD event + ASSERT_EQ( + event_data["death_cause"]["actor_died_error_context"]["error_message"], + "The actor is dead because all references to the actor were removed."); + } + } + return; + } else { + // Sleep and retry + std::this_thread::sleep_for(std::chrono::seconds(1)); + vc.clear(); + } + } + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log"); + std::ostringstream lines; + for (auto line : vc) { + lines << line << "\n"; + } + ASSERT_TRUE(false) << "Export API wrote " << (int)vc.size() << " lines, but expecting " + << num_export_events << ".\nLines:\n" + << lines.str(); +} + +} // namespace ray diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc similarity index 100% rename from src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc rename to src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index c49a42d30a20..2d149f6635ef 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -140,6 +140,10 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export RAY_CHECK(google::protobuf::util::MessageToJsonString( export_event.node_event_data(), &event_data_as_string, options) .ok()); + } else if (export_event.has_actor_event_data()) { + RAY_CHECK(google::protobuf::util::MessageToJsonString( + export_event.actor_event_data(), &event_data_as_string, options) + .ok()); } else { RAY_LOG(FATAL) << "event_data missing from export event with id " << export_event.event_id()