diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 03020f64c74e..ecc0d1b1a805 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -220,14 +220,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_CHECK_OK(gcs_client_->Connect(io_service_)); RegisterToGcs(); - // Register a callback to monitor removed nodes. - auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) { - if (data.state() == rpc::GcsNodeInfo::DEAD) { - OnNodeRemoved(node_id); - } - }; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); - // Initialize profiler. profiler_ = std::make_shared( worker_context_, options_.node_ip_address, io_service_, gcs_client_); @@ -272,6 +264,20 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ new rpc::CoreWorkerClient(addr, *client_call_manager_)); }); + // Register a callback to monitor removed nodes. + // Note we capture a shared ownership of reference_counter_ + // here to avoid destruction order fiasco between gcs_client and reference_counter_. + auto on_node_change = [reference_counter = this->reference_counter_]( + const NodeID &node_id, const rpc::GcsNodeInfo &data) { + if (data.state() == rpc::GcsNodeInfo::DEAD) { + RAY_LOG(INFO) << "Node failure from " << node_id + << ". All objects pinned on that node will be lost if object " + "reconstruction is not enabled."; + reference_counter->ResetObjectsOnRemovedNode(node_id); + } + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); + plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, local_raylet_client_, @@ -722,16 +728,6 @@ void CoreWorker::RunIOService() { RAY_LOG(INFO) << "Core worker main io service stopped."; } -void CoreWorker::OnNodeRemoved(const NodeID &node_id) { - // if (node_id == GetCurrentNodeId()) { - // RAY_LOG(FATAL) << "The raylet for this worker has died. Killing itself..."; - // } - RAY_LOG(INFO) << "Node failure from " << node_id - << ". All objects pinned on that node will be lost if object " - "reconstruction is not enabled."; - reference_counter_->ResetObjectsOnRemovedNode(node_id); -} - const WorkerID &CoreWorker::GetWorkerID() const { return worker_context_.GetWorkerID(); } void CoreWorker::SetCurrentTaskId(const TaskID &task_id, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 2246f16091cf..ae280c3e4055 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1216,9 +1216,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } - /// Handler if a raylet node is removed from the cluster. - void OnNodeRemoved(const NodeID &node_id); - /// Request the spillage of an object that we own from the primary that hosts /// the primary copy to spill. void SpillOwnedObject(const ObjectID &object_id,