Skip to content

Commit

Permalink
change: RaftCore: replace snapshot_index with `snapshot_last_includ…
Browse files Browse the repository at this point in the history
…ed: LogId`. Keep tracks of both snapshot last log term and index.

Also `SnapshotUpdate::SnapshotComplete` now contains an LogId instead of an u64 index.
  • Loading branch information
drmingdrmer committed Jul 9, 2021
1 parent 85859d0 commit 5eb9d3a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.last_log_index = req.last_included.index;
self.last_log_term = req.last_included.term;
self.last_applied = req.last_included.index;
self.snapshot_index = req.last_included.index;
self.snapshot_last_included = req.last_included;
Ok(())
}
}
21 changes: 11 additions & 10 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt

/// The node's current snapshot state.
snapshot_state: Option<SnapshotState<S::Snapshot>>,
/// The index of the current snapshot, if a snapshot exists.

/// The log id upto which the current snapshot includes, inclusive, if a snapshot exists.
///
/// This is primarily used in making a determination on when a compaction job needs to be triggered.
snapshot_index: u64,
snapshot_last_included: LogId,

/// A cache of entries which are waiting to be replicated to the state machine.
///
Expand Down Expand Up @@ -176,7 +177,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_log_index: 0,
last_log_term: 0,
snapshot_state: None,
snapshot_index: 0,
snapshot_last_included: LogId { term: 0, index: 0 },
entries_cache: Default::default(),
replicate_to_sm_handle: FuturesOrdered::new(),
has_completed_initial_replication_to_sm: false,
Expand Down Expand Up @@ -211,7 +212,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if let Some(snapshot) =
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
{
self.snapshot_index = snapshot.included.index;
self.snapshot_last_included = snapshot.included;
}

let has_log = self.last_log_index != u64::min_value();
Expand Down Expand Up @@ -407,8 +408,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update the system's snapshot state based on the given data.
#[tracing::instrument(level = "trace", skip(self))]
fn update_snapshot_state(&mut self, update: SnapshotUpdate) {
if let SnapshotUpdate::SnapshotComplete(index) = update {
self.snapshot_index = index
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.snapshot_last_included = log_id
}
// If snapshot state is anything other than streaming, then drop it.
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
Expand All @@ -425,13 +426,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
let SnapshotPolicy::LogsSinceLast(threshold) = &self.config.snapshot_policy;
// Check to ensure we have actual entries for compaction.
if self.last_applied == 0 || self.last_applied < self.snapshot_index {
if self.last_applied == 0 || self.last_applied < self.snapshot_last_included.index {
return;
}

if !force {
// If we are below the threshold, then there is nothing to do.
if self.last_applied < self.snapshot_index + *threshold {
if self.last_applied < self.snapshot_last_included.index + *threshold {
return;
}
}
Expand All @@ -452,7 +453,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
match res {
Ok(res) => match res {
Ok(snapshot) => {
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included.index));
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included));
let _ = chan_tx.send(snapshot.included.index); // This will always succeed.
}
Err(err) => {
Expand Down Expand Up @@ -549,7 +550,7 @@ pub(self) enum SnapshotState<S> {
#[derive(Debug)]
pub(self) enum SnapshotUpdate {
/// Snapshot creation has finished successfully and covers the given index.
SnapshotComplete(u64),
SnapshotComplete(LogId),
/// Snapshot creation failed.
SnapshotFailed,
}
Expand Down

0 comments on commit 5eb9d3a

Please sign in to comment.