From 5eb9d3afd7a0b4fef13fd4b87b053bed09c91d9e Mon Sep 17 00:00:00 2001 From: drdr xp Date: Fri, 9 Jul 2021 15:47:50 +0800 Subject: [PATCH] change: RaftCore: replace `snapshot_index` with `snapshot_last_included: LogId`. Keep tracks of both snapshot last log term and index. Also `SnapshotUpdate::SnapshotComplete` now contains an LogId instead of an u64 index. --- async-raft/src/core/install_snapshot.rs | 2 +- async-raft/src/core/mod.rs | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 06e6e3a5f..1abb5805c 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -185,7 +185,7 @@ impl, S: RaftStorage> Ra self.last_log_index = req.last_included.index; self.last_log_term = req.last_included.term; self.last_applied = req.last_included.index; - self.snapshot_index = req.last_included.index; + self.snapshot_last_included = req.last_included; Ok(()) } } diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index be0ba0762..57b5a5132 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -112,10 +112,11 @@ pub struct RaftCore, S: RaftSt /// The node's current snapshot state. snapshot_state: Option>, - /// The index of the current snapshot, if a snapshot exists. + + /// The log id upto which the current snapshot includes, inclusive, if a snapshot exists. /// /// This is primarily used in making a determination on when a compaction job needs to be triggered. - snapshot_index: u64, + snapshot_last_included: LogId, /// A cache of entries which are waiting to be replicated to the state machine. /// @@ -176,7 +177,7 @@ impl, S: RaftStorage> Ra last_log_index: 0, last_log_term: 0, snapshot_state: None, - snapshot_index: 0, + snapshot_last_included: LogId { term: 0, index: 0 }, entries_cache: Default::default(), replicate_to_sm_handle: FuturesOrdered::new(), has_completed_initial_replication_to_sm: false, @@ -211,7 +212,7 @@ impl, S: RaftStorage> Ra if let Some(snapshot) = self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))? { - self.snapshot_index = snapshot.included.index; + self.snapshot_last_included = snapshot.included; } let has_log = self.last_log_index != u64::min_value(); @@ -407,8 +408,8 @@ impl, S: RaftStorage> Ra /// Update the system's snapshot state based on the given data. #[tracing::instrument(level = "trace", skip(self))] fn update_snapshot_state(&mut self, update: SnapshotUpdate) { - if let SnapshotUpdate::SnapshotComplete(index) = update { - self.snapshot_index = index + if let SnapshotUpdate::SnapshotComplete(log_id) = update { + self.snapshot_last_included = log_id } // If snapshot state is anything other than streaming, then drop it. if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() { @@ -425,13 +426,13 @@ impl, S: RaftStorage> Ra } let SnapshotPolicy::LogsSinceLast(threshold) = &self.config.snapshot_policy; // Check to ensure we have actual entries for compaction. - if self.last_applied == 0 || self.last_applied < self.snapshot_index { + if self.last_applied == 0 || self.last_applied < self.snapshot_last_included.index { return; } if !force { // If we are below the threshold, then there is nothing to do. - if self.last_applied < self.snapshot_index + *threshold { + if self.last_applied < self.snapshot_last_included.index + *threshold { return; } } @@ -452,7 +453,7 @@ impl, S: RaftStorage> Ra match res { Ok(res) => match res { Ok(snapshot) => { - let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included.index)); + let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included)); let _ = chan_tx.send(snapshot.included.index); // This will always succeed. } Err(err) => { @@ -549,7 +550,7 @@ pub(self) enum SnapshotState { #[derive(Debug)] pub(self) enum SnapshotUpdate { /// Snapshot creation has finished successfully and covers the given index. - SnapshotComplete(u64), + SnapshotComplete(LogId), /// Snapshot creation failed. SnapshotFailed, }