Skip to content

Commit

Permalink
[Core][Core-worker] fix the initialization/destruction order between …
Browse files Browse the repository at this point in the history
…reference_counter_ and node change subscription. (#29108)

Why are these changes needed?
we experienced segfault in core worker recently in some of our nightly tests. As we investigated this is likely caused by that when we get a callback from gcs client that the node has changed, the underlining reference_counter_ which handles the changed callback is not initialized (likely) or already being destructed.

Fix:

ensure reference_counter_ is initialized before the callback is registered,
capture the shared ownership of reference_counter in the callback.
we should be able to fix the bug.
  • Loading branch information
scv119 authored Oct 6, 2022
1 parent 889f769 commit 207efc9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
32 changes: 14 additions & 18 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();

// Register a callback to monitor removed nodes.
auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
OnNodeRemoved(node_id);
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr));

// Initialize profiler.
profiler_ = std::make_shared<worker::Profiler>(
worker_context_, options_.node_ip_address, io_service_, gcs_client_);
Expand Down Expand Up @@ -273,6 +265,20 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});

// Register a callback to monitor removed nodes.
// Note we capture a shared ownership of reference_counter_
// here to avoid destruction order fiasco between gcs_client and reference_counter_.
auto on_node_change = [reference_counter = this->reference_counter_](
const NodeID &node_id, const rpc::GcsNodeInfo &data) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
RAY_LOG(INFO) << "Node failure from " << node_id
<< ". All objects pinned on that node will be lost if object "
"reconstruction is not enabled.";
reference_counter->ResetObjectsOnRemovedNode(node_id);
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr));

plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
options_.store_socket,
local_raylet_client_,
Expand Down Expand Up @@ -723,16 +729,6 @@ void CoreWorker::RunIOService() {
RAY_LOG(INFO) << "Core worker main io service stopped.";
}

void CoreWorker::OnNodeRemoved(const NodeID &node_id) {
// if (node_id == GetCurrentNodeId()) {
// RAY_LOG(FATAL) << "The raylet for this worker has died. Killing itself...";
// }
RAY_LOG(INFO) << "Node failure from " << node_id
<< ". All objects pinned on that node will be lost if object "
"reconstruction is not enabled.";
reference_counter_->ResetObjectsOnRemovedNode(node_id);
}

const WorkerID &CoreWorker::GetWorkerID() const { return worker_context_.GetWorkerID(); }

void CoreWorker::SetCurrentTaskId(const TaskID &task_id,
Expand Down
3 changes: 0 additions & 3 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1217,9 +1217,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
}

/// Handler if a raylet node is removed from the cluster.
void OnNodeRemoved(const NodeID &node_id);

/// Request the spillage of an object that we own from the primary that hosts
/// the primary copy to spill.
void SpillOwnedObject(const ObjectID &object_id,
Expand Down

0 comments on commit 207efc9

Please sign in to comment.