From 317aa84e16e818ecdf7cecb56a48e2526f525778 Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Oct 2022 17:56:02 -0700 Subject: [PATCH 1/4] fix the initialization order --- src/ray/core_worker/core_worker.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 03020f64c74e..b6914993ef54 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -220,13 +220,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_CHECK_OK(gcs_client_->Connect(io_service_)); RegisterToGcs(); - // Register a callback to monitor removed nodes. - auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) { - if (data.state() == rpc::GcsNodeInfo::DEAD) { - OnNodeRemoved(node_id); - } - }; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); // Initialize profiler. profiler_ = std::make_shared( @@ -272,6 +265,14 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ new rpc::CoreWorkerClient(addr, *client_call_manager_)); }); + // Register a callback to monitor removed nodes. + auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) { + if (data.state() == rpc::GcsNodeInfo::DEAD) { + OnNodeRemoved(node_id); + } + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); + plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, local_raylet_client_, From 26845541fa08f6409e1c474c812eefc8b29cbbb9 Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Oct 2022 18:03:49 -0700 Subject: [PATCH 2/4] add --- src/ray/core_worker/core_worker.cc | 1 - src/ray/core_worker/core_worker.h | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b6914993ef54..273bdf343ac7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -220,7 +220,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_CHECK_OK(gcs_client_->Connect(io_service_)); RegisterToGcs(); - // Initialize profiler. profiler_ = std::make_shared( worker_context_, options_.node_ip_address, io_service_, gcs_client_); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 2246f16091cf..478ed9520d78 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1277,9 +1277,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Whether or not this worker is connected to the raylet and GCS. bool connected_ = false; - // Client to the GCS shared by core worker interfaces. - std::shared_ptr gcs_client_; - // Client to the raylet shared by core worker interfaces. This needs to be a // shared_ptr for direct calls because we can lease multiple workers through // one client, and we need to keep the connection alive until we return all @@ -1292,6 +1289,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Keeps track of object ID reference counts. std::shared_ptr reference_counter_; + // Client to the GCS shared by core worker interfaces. + std::shared_ptr gcs_client_; + /// /// Fields related to storing and retrieving objects. /// From e655517803f04e6c1664838ce5c9aa85430d6137 Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Oct 2022 18:29:35 -0700 Subject: [PATCH 3/4] better --- src/ray/core_worker/core_worker.cc | 21 +++++++++------------ src/ray/core_worker/core_worker.h | 3 --- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 273bdf343ac7..fc9a23bf3cf7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -265,9 +265,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }); // Register a callback to monitor removed nodes. - auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) { + // Note we capture a shared ownership of reference_counter_ + // here to avoid destruction order fiasco between gcs_client and reference_counter_. + auto on_node_change = [](const NodeID &node_id, + const rpc::GcsNodeInfo &data, + reference_counter = reference_counter_) { if (data.state() == rpc::GcsNodeInfo::DEAD) { - OnNodeRemoved(node_id); + RAY_LOG(INFO) << "Node failure from " << node_id + << ". All objects pinned on that node will be lost if object " + "reconstruction is not enabled."; + reference_counter->ResetObjectsOnRemovedNode(node_id); } }; RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); @@ -722,16 +729,6 @@ void CoreWorker::RunIOService() { RAY_LOG(INFO) << "Core worker main io service stopped."; } -void CoreWorker::OnNodeRemoved(const NodeID &node_id) { - // if (node_id == GetCurrentNodeId()) { - // RAY_LOG(FATAL) << "The raylet for this worker has died. Killing itself..."; - // } - RAY_LOG(INFO) << "Node failure from " << node_id - << ". All objects pinned on that node will be lost if object " - "reconstruction is not enabled."; - reference_counter_->ResetObjectsOnRemovedNode(node_id); -} - const WorkerID &CoreWorker::GetWorkerID() const { return worker_context_.GetWorkerID(); } void CoreWorker::SetCurrentTaskId(const TaskID &task_id, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 478ed9520d78..bbe15a9ae9b0 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1216,9 +1216,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } - /// Handler if a raylet node is removed from the cluster. - void OnNodeRemoved(const NodeID &node_id); - /// Request the spillage of an object that we own from the primary that hosts /// the primary copy to spill. void SpillOwnedObject(const ObjectID &object_id, From c9b357a25e9af0e54891ed647db4f0889506e2f3 Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Oct 2022 18:32:55 -0700 Subject: [PATCH 4/4] add --- src/ray/core_worker/core_worker.cc | 5 ++--- src/ray/core_worker/core_worker.h | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index fc9a23bf3cf7..ecc0d1b1a805 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -267,9 +267,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Register a callback to monitor removed nodes. // Note we capture a shared ownership of reference_counter_ // here to avoid destruction order fiasco between gcs_client and reference_counter_. - auto on_node_change = [](const NodeID &node_id, - const rpc::GcsNodeInfo &data, - reference_counter = reference_counter_) { + auto on_node_change = [reference_counter = this->reference_counter_]( + const NodeID &node_id, const rpc::GcsNodeInfo &data) { if (data.state() == rpc::GcsNodeInfo::DEAD) { RAY_LOG(INFO) << "Node failure from " << node_id << ". All objects pinned on that node will be lost if object " diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index bbe15a9ae9b0..ae280c3e4055 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1274,6 +1274,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Whether or not this worker is connected to the raylet and GCS. bool connected_ = false; + // Client to the GCS shared by core worker interfaces. + std::shared_ptr gcs_client_; + // Client to the raylet shared by core worker interfaces. This needs to be a // shared_ptr for direct calls because we can lease multiple workers through // one client, and we need to keep the connection alive until we return all @@ -1286,9 +1289,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Keeps track of object ID reference counts. std::shared_ptr reference_counter_; - // Client to the GCS shared by core worker interfaces. - std::shared_ptr gcs_client_; - /// /// Fields related to storing and retrieving objects. ///