diff --git a/openraft/src/core/install_snapshot.rs b/openraft/src/core/install_snapshot.rs index d16a2534d..d71fd8f65 100644 --- a/openraft/src/core/install_snapshot.rs +++ b/openraft/src/core/install_snapshot.rs @@ -40,6 +40,7 @@ impl, S: RaftStorage> Ra if req.term < self.current_term { return Ok(InstallSnapshotResponse { term: self.current_term, + last_applied: None, }); } @@ -134,6 +135,7 @@ impl, S: RaftStorage> Ra self.finalize_snapshot_installation(req, snapshot).await?; return Ok(InstallSnapshotResponse { term: self.current_term, + last_applied: self.last_applied, }); } @@ -145,6 +147,7 @@ impl, S: RaftStorage> Ra }); Ok(InstallSnapshotResponse { term: self.current_term, + last_applied: None, }) } @@ -188,6 +191,7 @@ impl, S: RaftStorage> Ra } Ok(InstallSnapshotResponse { term: self.current_term, + last_applied: self.last_applied, }) } @@ -224,7 +228,14 @@ impl, S: RaftStorage> Ra // --------------------------------------------------------------------> time // ``` - // TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied + if req.meta.last_log_id < self.last_applied { + tracing::info!( + "skip installing snapshot because snapshot_meta.last_log_id({}) <= self.last_applied({})", + req.meta.last_log_id.summary(), + self.last_applied.summary(), + ); + return Ok(()); + } let changes = self.storage.install_snapshot(&req.meta, snapshot).await?; @@ -253,7 +264,7 @@ impl, S: RaftStorage> Ra // There could be unknown membership in the snapshot. let membership = StorageHelper::new(&self.storage).get_membership().await?; - tracing::info!("refetch membership from store: {:?}", membership); + tracing::info!("re-fetch membership from store: {:?}", membership); assert!(membership.is_some()); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index a7783f6c9..6eafde8f6 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -887,13 +887,20 @@ impl, S: RaftStorage> Re // If we just sent the final chunk of the snapshot, then transition to lagging state. if done { tracing::info!( - "done install snapshot: snapshot last_log_id: {:?}, matched: {:?}", + "done install snapshot: snapshot last_log_id: {:?}, self.matched: {:?}, remote last_applied: {:?}", snapshot.meta.last_log_id, self.matched, + res.last_applied, ); - self.update_matched(snapshot.meta.last_log_id); + // In previous version a node that does return `last_applied`. + let matched = if res.last_applied.is_some() { + res.last_applied + } else { + snapshot.meta.last_log_id + }; + self.update_matched(matched); return Ok(()); } diff --git a/openraft/src/types/v070/log_id.rs b/openraft/src/types/v070/log_id.rs index 4c0348e14..383dc0995 100644 --- a/openraft/src/types/v070/log_id.rs +++ b/openraft/src/types/v070/log_id.rs @@ -1,6 +1,8 @@ use serde::Deserialize; use serde::Serialize; +use crate::MessageSummary; + /// The identity of a raft log. /// A term and an index identifies an log globally. #[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)] @@ -8,3 +10,14 @@ pub struct LogId { pub term: u64, pub index: u64, } + +impl MessageSummary for Option { + fn summary(&self) -> String { + match self { + None => "None".to_string(), + Some(log_id) => { + format!("{}", log_id) + } + } + } +} diff --git a/openraft/src/types/v070/rpc.rs b/openraft/src/types/v070/rpc.rs index 2b67cae37..7720d5c41 100644 --- a/openraft/src/types/v070/rpc.rs +++ b/openraft/src/types/v070/rpc.rs @@ -89,6 +89,12 @@ pub struct InstallSnapshotRequest { pub struct InstallSnapshotResponse { /// The receiving node's current term, for leader to update itself. pub term: u64, + + /// The last applied log id after snapshot being installed. + /// + /// A node may choose not to install a snapshot if it already has a greater `last_applied`. + /// In this case, it just returns the `last_applied`. + pub last_applied: Option, } /// The response to a `ClientRequest`.