Skip to content

Commit

Permalink
Merge pull request #468 from drmingdrmer/7-rm-internal-msg
Browse files Browse the repository at this point in the history
Refactor: merge InternalMessage into RaftMsg
  • Loading branch information
mergify[bot] authored Jul 22, 2022
2 parents caaf2f9 + d621835 commit db047b9
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 108 deletions.
10 changes: 0 additions & 10 deletions openraft/src/core/internal_msg.rs

This file was deleted.

6 changes: 2 additions & 4 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! messages to other raft nodes.

mod install_snapshot;
mod internal_msg;
mod raft_core;
pub(crate) mod replication;
mod replication_expectation;
Expand All @@ -14,12 +13,11 @@ mod server_state;
mod snapshot_state;
mod tick;

pub(crate) use internal_msg::InternalMessage;
pub use raft_core::RaftCore;
pub(crate) use replication_expectation::Expectation;
pub(crate) use replication_state::replication_lag;
pub use server_state::ServerState;
use snapshot_state::SnapshotState;
use snapshot_state::SnapshotUpdate;
pub(crate) use snapshot_state::SnapshotState;
pub(crate) use snapshot_state::SnapshotUpdate;
pub(crate) use tick::Tick;
pub(crate) use tick::VoteWiseTime;
123 changes: 30 additions & 93 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::config::SnapshotPolicy;
use crate::core::replication::snapshot_is_within_half_of_threshold;
use crate::core::replication_lag;
use crate::core::Expectation;
use crate::core::InternalMessage;
use crate::core::ServerState;
use crate::core::SnapshotState;
use crate::core::SnapshotUpdate;
Expand Down Expand Up @@ -147,10 +146,6 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
/// The time to elect if a follower does not receive any append-entry message.
pub(crate) next_election_time: VoteWiseTime<C::NodeId>,

tx_internal: mpsc::Sender<InternalMessage<C::NodeId>>,

pub(crate) rx_internal: mpsc::Receiver<InternalMessage<C::NodeId>>,

pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C, N, S>>,

Expand All @@ -172,8 +167,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
tx_metrics: watch::Sender<RaftMetrics<C::NodeId>>,
rx_shutdown: oneshot::Receiver<()>,
) -> JoinHandle<Result<(), Fatal<C::NodeId>>> {
let (tx_internal, rx_internal) = mpsc::channel(1024);

let span = tracing::span!(
parent: tracing::Span::current(),
Level::DEBUG,
Expand All @@ -195,9 +188,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
last_heartbeat: None,
next_election_time: VoteWiseTime::new(Vote::default(), Instant::now() + Duration::from_secs(86400)),

tx_internal,
rx_internal,

tx_api,
rx_api,

Expand Down Expand Up @@ -315,9 +305,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.leader_data = Some(LeaderData::new());
self.leader_loop().await?;
}
ServerState::Candidate => self.candidate_loop().await?,
ServerState::Follower => self.follower_learner_loop(ServerState::Follower).await?,
ServerState::Learner => self.follower_learner_loop(ServerState::Learner).await?,
ServerState::Candidate | ServerState::Follower | ServerState::Learner => {
// report the new metrics for last metrics change.
self.report_metrics(Update::Update(None));
self.runtime_loop(self.engine.state.server_state).await?
}
ServerState::Shutdown => {
tracing::info!("node has shutdown");
return Ok(());
Expand Down Expand Up @@ -839,19 +831,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.last_heartbeat = Some(now);
}

#[tracing::instrument(level = "trace", skip(self))]
pub(crate) async fn handle_internal_msg(
&mut self,
msg: InternalMessage<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
match msg {
InternalMessage::SnapshotUpdate(update) => {
self.update_snapshot_state(update);
}
}
Ok(())
}

/// Update the system's snapshot state based on the given data.
#[tracing::instrument(level = "trace", skip(self))]
pub(crate) fn update_snapshot_state(&mut self, update: SnapshotUpdate<C::NodeId>) {
Expand Down Expand Up @@ -898,7 +877,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let mut builder = self.storage.get_snapshot_builder().await;
let (handle, reg) = AbortHandle::new_pair();
let (chan_tx, _) = broadcast::channel(1);
let tx_internal = self.tx_internal.clone();
let tx_api = self.tx_api.clone();
self.snapshot_state = Some(SnapshotState::Snapshotting {
handle,
sender: chan_tx.clone(),
Expand All @@ -911,20 +890,23 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
match res {
Ok(res) => match res {
Ok(snapshot) => {
let _ = tx_internal.try_send(InternalMessage::SnapshotUpdate(
SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id),
));
let _ = tx_api.send(RaftMsg::SnapshotUpdate {
update: SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id),
});
// This will always succeed.
let _ = chan_tx.send(snapshot.meta.last_log_id.index);
}
Err(err) => {
tracing::error!({error=%err}, "error while generating snapshot");
let _ =
tx_internal.try_send(InternalMessage::SnapshotUpdate(SnapshotUpdate::SnapshotFailed));
let _ = tx_api.send(RaftMsg::SnapshotUpdate {
update: SnapshotUpdate::SnapshotFailed,
});
}
},
Err(_aborted) => {
let _ = tx_internal.try_send(InternalMessage::SnapshotUpdate(SnapshotUpdate::SnapshotFailed));
let _ = tx_api.send(RaftMsg::SnapshotUpdate {
update: SnapshotUpdate::SnapshotFailed,
});
}
}
}
Expand Down Expand Up @@ -1232,68 +1214,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
};
self.report_metrics(Update::Update(Some(replication_metrics)));

loop {
if !self.engine.state.server_state.is_leader() {
tracing::info!("id={} state becomes: {:?}", self.id, self.engine.state.server_state);

// implicit drop replication_rx
// notify to all nodes DO NOT send replication event any more.
return Ok(());
}

self.flush_metrics();

tokio::select! {
Some(msg) = self.rx_api.recv() => {
self.handle_api_msg(msg).await?;
},

Some(internal_msg) = self.rx_internal.recv() => {
tracing::info!("leader recv from rx_internal: {:?}", internal_msg);
self.handle_internal_msg(internal_msg).await?;
}

Ok(_) = &mut self.rx_shutdown => {
tracing::info!("leader recv from rx_shutdown");
self.set_target_state(ServerState::Shutdown);
}
}
}
}

#[tracing::instrument(level="debug", skip(self), fields(id=display(self.id), raft_state="candidate"))]
async fn candidate_loop(&mut self) -> Result<(), Fatal<C::NodeId>> {
// report the new state before enter the loop
self.report_metrics(Update::Update(None));

loop {
if !self.engine.state.server_state.is_candidate() {
return Ok(());
}

self.flush_metrics();

tokio::select! {
Some(msg) = self.rx_api.recv() => {
self.handle_api_msg(msg).await?;
},

Some(internal_msg) = self.rx_internal.recv() => {
self.handle_internal_msg(internal_msg).await?;
},

Ok(_) = &mut self.rx_shutdown => self.set_target_state(ServerState::Shutdown),
}
}
self.runtime_loop(ServerState::Leader).await
}

/// Run an event handling loop until server state changes
#[tracing::instrument(level="debug", skip(self), fields(id=display(self.id)))]
async fn follower_learner_loop(&mut self, server_state: ServerState) -> Result<(), Fatal<C::NodeId>> {
// report the new state before enter the loop
self.report_metrics(Update::Update(None));

async fn runtime_loop(&mut self, server_state: ServerState) -> Result<(), Fatal<C::NodeId>> {
loop {
if self.engine.state.server_state != server_state {
tracing::info!(
"id={} server_state becomes: {:?}",
self.id,
self.engine.state.server_state
);
return Ok(());
}

Expand All @@ -1304,9 +1237,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.handle_api_msg(msg).await?;
},

Some(internal_msg) = self.rx_internal.recv() => self.handle_internal_msg(internal_msg).await?,

Ok(_) = &mut self.rx_shutdown => self.set_target_state(ServerState::Shutdown),
Ok(_) = &mut self.rx_shutdown => {
tracing::info!("recv rx_shutdown");
self.set_target_state(ServerState::Shutdown);
}
}
}
}
Expand Down Expand Up @@ -1427,6 +1361,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
RaftMsg::InstallSnapshot { rpc, tx } => {
let _ = tx.send(self.handle_install_snapshot_request(rpc).await.extract_fatal()?);
}
RaftMsg::SnapshotUpdate { update } => {
self.update_snapshot_state(update);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if is_leader() {
self.handle_check_is_leader_request(tx).await;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/snapshot_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) enum SnapshotState<S> {
Snapshotting {
/// A handle to abort the compaction process early if needed.
handle: AbortHandle,
/// A sender for notifiying any other tasks of the completion of this compaction.
/// A sender for notifying any other tasks of the completion of this compaction.
sender: broadcast::Sender<u64>,
},
/// The Raft node is streaming in a snapshot from the leader.
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::config::Config;
use crate::core::replication_lag;
use crate::core::Expectation;
use crate::core::RaftCore;
use crate::core::SnapshotUpdate;
use crate::core::Tick;
use crate::error::AddLearnerError;
use crate::error::AppendEntriesError;
Expand Down Expand Up @@ -731,6 +732,10 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
tx: RaftRespTx<InstallSnapshotResponse<C::NodeId>, InstallSnapshotError<C::NodeId>>,
},

SnapshotUpdate {
update: SnapshotUpdate<C::NodeId>,
},

ClientWriteRequest {
rpc: ClientWriteRequest<C>,
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>,
Expand Down Expand Up @@ -845,6 +850,9 @@ where
RaftMsg::InstallSnapshot { rpc, .. } => {
format!("InstallSnapshot: {}", rpc.summary())
}
RaftMsg::SnapshotUpdate { update } => {
format!("SnapshotUpdate: {:?}", update)
}
RaftMsg::ClientWriteRequest { rpc, .. } => {
format!("ClientWriteRequest: {}", rpc.summary())
}
Expand Down

0 comments on commit db047b9

Please sign in to comment.