From d62183517be4b6e38360cc107d10f2488123641b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 21 Jul 2022 23:31:39 +0800 Subject: [PATCH] Refactor: merge InternalMessage into RaftMsg - Refactor: move internal communication message `SnapshotUpdate` from `InternalMessage` to `RaftMsg`. So that the RaftCore event loop only needs two channel: message channel and shutdown channel. Removes `InternalMessage` channel. - Refactor: merge server state specific loop into one `runtime_loop()`, since every server state has the same logic now. --- openraft/src/core/internal_msg.rs | 10 --- openraft/src/core/mod.rs | 6 +- openraft/src/core/raft_core.rs | 123 +++++++--------------------- openraft/src/core/snapshot_state.rs | 2 +- openraft/src/raft.rs | 8 ++ 5 files changed, 41 insertions(+), 108 deletions(-) delete mode 100644 openraft/src/core/internal_msg.rs diff --git a/openraft/src/core/internal_msg.rs b/openraft/src/core/internal_msg.rs deleted file mode 100644 index 53494fb94..000000000 --- a/openraft/src/core/internal_msg.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::core::snapshot_state::SnapshotUpdate; -use crate::NodeId; - -/// Message for communication between internal tasks. -/// -/// Such as log compaction task, RaftCore task, replication tasks etc. -#[derive(Debug, Clone)] -pub(crate) enum InternalMessage { - SnapshotUpdate(SnapshotUpdate), -} diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 8ae8a402a..73c2969a9 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -5,7 +5,6 @@ //! messages to other raft nodes. mod install_snapshot; -mod internal_msg; mod raft_core; pub(crate) mod replication; mod replication_expectation; @@ -14,12 +13,11 @@ mod server_state; mod snapshot_state; mod tick; -pub(crate) use internal_msg::InternalMessage; pub use raft_core::RaftCore; pub(crate) use replication_expectation::Expectation; pub(crate) use replication_state::replication_lag; pub use server_state::ServerState; -use snapshot_state::SnapshotState; -use snapshot_state::SnapshotUpdate; +pub(crate) use snapshot_state::SnapshotState; +pub(crate) use snapshot_state::SnapshotUpdate; pub(crate) use tick::Tick; pub(crate) use tick::VoteWiseTime; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index ee3cde0be..29737ffbb 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -28,7 +28,6 @@ use crate::config::SnapshotPolicy; use crate::core::replication::snapshot_is_within_half_of_threshold; use crate::core::replication_lag; use crate::core::Expectation; -use crate::core::InternalMessage; use crate::core::ServerState; use crate::core::SnapshotState; use crate::core::SnapshotUpdate; @@ -147,10 +146,6 @@ pub struct RaftCore, S: RaftStorage< /// The time to elect if a follower does not receive any append-entry message. pub(crate) next_election_time: VoteWiseTime, - tx_internal: mpsc::Sender>, - - pub(crate) rx_internal: mpsc::Receiver>, - pub(crate) tx_api: mpsc::UnboundedSender>, pub(crate) rx_api: mpsc::UnboundedReceiver>, @@ -172,8 +167,6 @@ impl, S: RaftStorage> RaftCore>, rx_shutdown: oneshot::Receiver<()>, ) -> JoinHandle>> { - let (tx_internal, rx_internal) = mpsc::channel(1024); - let span = tracing::span!( parent: tracing::Span::current(), Level::DEBUG, @@ -195,9 +188,6 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore self.candidate_loop().await?, - ServerState::Follower => self.follower_learner_loop(ServerState::Follower).await?, - ServerState::Learner => self.follower_learner_loop(ServerState::Learner).await?, + ServerState::Candidate | ServerState::Follower | ServerState::Learner => { + // report the new metrics for last metrics change. + self.report_metrics(Update::Update(None)); + self.runtime_loop(self.engine.state.server_state).await? + } ServerState::Shutdown => { tracing::info!("node has shutdown"); return Ok(()); @@ -839,19 +831,6 @@ impl, S: RaftStorage> RaftCore, - ) -> Result<(), StorageError> { - match msg { - InternalMessage::SnapshotUpdate(update) => { - self.update_snapshot_state(update); - } - } - Ok(()) - } - /// Update the system's snapshot state based on the given data. #[tracing::instrument(level = "trace", skip(self))] pub(crate) fn update_snapshot_state(&mut self, update: SnapshotUpdate) { @@ -898,7 +877,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore match res { Ok(snapshot) => { - let _ = tx_internal.try_send(InternalMessage::SnapshotUpdate( - SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id), - )); + let _ = tx_api.send(RaftMsg::SnapshotUpdate { + update: SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id), + }); // This will always succeed. let _ = chan_tx.send(snapshot.meta.last_log_id.index); } Err(err) => { tracing::error!({error=%err}, "error while generating snapshot"); - let _ = - tx_internal.try_send(InternalMessage::SnapshotUpdate(SnapshotUpdate::SnapshotFailed)); + let _ = tx_api.send(RaftMsg::SnapshotUpdate { + update: SnapshotUpdate::SnapshotFailed, + }); } }, Err(_aborted) => { - let _ = tx_internal.try_send(InternalMessage::SnapshotUpdate(SnapshotUpdate::SnapshotFailed)); + let _ = tx_api.send(RaftMsg::SnapshotUpdate { + update: SnapshotUpdate::SnapshotFailed, + }); } } } @@ -1232,68 +1214,19 @@ impl, S: RaftStorage> RaftCore { - self.handle_api_msg(msg).await?; - }, - - Some(internal_msg) = self.rx_internal.recv() => { - tracing::info!("leader recv from rx_internal: {:?}", internal_msg); - self.handle_internal_msg(internal_msg).await?; - } - - Ok(_) = &mut self.rx_shutdown => { - tracing::info!("leader recv from rx_shutdown"); - self.set_target_state(ServerState::Shutdown); - } - } - } - } - - #[tracing::instrument(level="debug", skip(self), fields(id=display(self.id), raft_state="candidate"))] - async fn candidate_loop(&mut self) -> Result<(), Fatal> { - // report the new state before enter the loop - self.report_metrics(Update::Update(None)); - - loop { - if !self.engine.state.server_state.is_candidate() { - return Ok(()); - } - - self.flush_metrics(); - - tokio::select! { - Some(msg) = self.rx_api.recv() => { - self.handle_api_msg(msg).await?; - }, - - Some(internal_msg) = self.rx_internal.recv() => { - self.handle_internal_msg(internal_msg).await?; - }, - - Ok(_) = &mut self.rx_shutdown => self.set_target_state(ServerState::Shutdown), - } - } + self.runtime_loop(ServerState::Leader).await } + /// Run an event handling loop until server state changes #[tracing::instrument(level="debug", skip(self), fields(id=display(self.id)))] - async fn follower_learner_loop(&mut self, server_state: ServerState) -> Result<(), Fatal> { - // report the new state before enter the loop - self.report_metrics(Update::Update(None)); - + async fn runtime_loop(&mut self, server_state: ServerState) -> Result<(), Fatal> { loop { if self.engine.state.server_state != server_state { + tracing::info!( + "id={} server_state becomes: {:?}", + self.id, + self.engine.state.server_state + ); return Ok(()); } @@ -1304,9 +1237,10 @@ impl, S: RaftStorage> RaftCore self.handle_internal_msg(internal_msg).await?, - - Ok(_) = &mut self.rx_shutdown => self.set_target_state(ServerState::Shutdown), + Ok(_) = &mut self.rx_shutdown => { + tracing::info!("recv rx_shutdown"); + self.set_target_state(ServerState::Shutdown); + } } } } @@ -1427,6 +1361,9 @@ impl, S: RaftStorage> RaftCore { let _ = tx.send(self.handle_install_snapshot_request(rpc).await.extract_fatal()?); } + RaftMsg::SnapshotUpdate { update } => { + self.update_snapshot_state(update); + } RaftMsg::CheckIsLeaderRequest { tx } => { if is_leader() { self.handle_check_is_leader_request(tx).await; diff --git a/openraft/src/core/snapshot_state.rs b/openraft/src/core/snapshot_state.rs index e15cdb824..f7306364c 100644 --- a/openraft/src/core/snapshot_state.rs +++ b/openraft/src/core/snapshot_state.rs @@ -10,7 +10,7 @@ pub(crate) enum SnapshotState { Snapshotting { /// A handle to abort the compaction process early if needed. handle: AbortHandle, - /// A sender for notifiying any other tasks of the completion of this compaction. + /// A sender for notifying any other tasks of the completion of this compaction. sender: broadcast::Sender, }, /// The Raft node is streaming in a snapshot from the leader. diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 966452120..bf6fadfeb 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -18,6 +18,7 @@ use crate::config::Config; use crate::core::replication_lag; use crate::core::Expectation; use crate::core::RaftCore; +use crate::core::SnapshotUpdate; use crate::core::Tick; use crate::error::AddLearnerError; use crate::error::AppendEntriesError; @@ -731,6 +732,10 @@ pub(crate) enum RaftMsg, S: RaftStor tx: RaftRespTx, InstallSnapshotError>, }, + SnapshotUpdate { + update: SnapshotUpdate, + }, + ClientWriteRequest { rpc: ClientWriteRequest, tx: RaftRespTx, ClientWriteError>, @@ -845,6 +850,9 @@ where RaftMsg::InstallSnapshot { rpc, .. } => { format!("InstallSnapshot: {}", rpc.summary()) } + RaftMsg::SnapshotUpdate { update } => { + format!("SnapshotUpdate: {:?}", update) + } RaftMsg::ClientWriteRequest { rpc, .. } => { format!("ClientWriteRequest: {}", rpc.summary()) }