Skip to content

Commit

Permalink
Change: InstallSnapshotResponse: replies the last applied log id; Do …
Browse files Browse the repository at this point in the history
…not install a smaller snapshot

A snapshot may not be installed by a follower if it already has a higher
`last_applied` log id locally.
In such a case, it just ignores the snapshot and respond with its local
`last_applied` log id.

This way the applied state(i.e., `last_applied`) will never revert back.
  • Loading branch information
drmingdrmer committed Sep 22, 2022
1 parent 6057abb commit 25e94c3
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
15 changes: 13 additions & 2 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if req.term < self.current_term {
return Ok(InstallSnapshotResponse {
term: self.current_term,
last_applied: None,
});
}

Expand Down Expand Up @@ -134,6 +135,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.finalize_snapshot_installation(req, snapshot).await?;
return Ok(InstallSnapshotResponse {
term: self.current_term,
last_applied: self.last_applied,
});
}

Expand All @@ -145,6 +147,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
});
Ok(InstallSnapshotResponse {
term: self.current_term,
last_applied: None,
})
}

Expand Down Expand Up @@ -188,6 +191,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
Ok(InstallSnapshotResponse {
term: self.current_term,
last_applied: self.last_applied,
})
}

Expand Down Expand Up @@ -224,7 +228,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// --------------------------------------------------------------------> time
// ```

// TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied
if req.meta.last_log_id < self.last_applied {
tracing::info!(
"skip installing snapshot because snapshot_meta.last_log_id({}) <= self.last_applied({})",
req.meta.last_log_id.summary(),
self.last_applied.summary(),
);
return Ok(());
}

let changes = self.storage.install_snapshot(&req.meta, snapshot).await?;

Expand Down Expand Up @@ -253,7 +264,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// There could be unknown membership in the snapshot.
let membership = StorageHelper::new(&self.storage).get_membership().await?;
tracing::info!("refetch membership from store: {:?}", membership);
tracing::info!("re-fetch membership from store: {:?}", membership);

assert!(membership.is_some());

Expand Down
11 changes: 9 additions & 2 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,13 +887,20 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// If we just sent the final chunk of the snapshot, then transition to lagging state.
if done {
tracing::info!(
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
"done install snapshot: snapshot last_log_id: {:?}, self.matched: {:?}, remote last_applied: {:?}",
snapshot.meta.last_log_id,
self.matched,
res.last_applied,
);

self.update_matched(snapshot.meta.last_log_id);
// In previous version a node that does return `last_applied`.
let matched = if res.last_applied.is_some() {
res.last_applied
} else {
snapshot.meta.last_log_id
};

self.update_matched(matched);
return Ok(());
}

Expand Down
13 changes: 13 additions & 0 deletions openraft/src/types/v070/log_id.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
use serde::Deserialize;
use serde::Serialize;

use crate::MessageSummary;

/// The identity of a raft log.
/// A term and an index identifies an log globally.
#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)]
pub struct LogId {
pub term: u64,
pub index: u64,
}

impl MessageSummary for Option<LogId> {
fn summary(&self) -> String {
match self {
None => "None".to_string(),
Some(log_id) => {
format!("{}", log_id)
}
}
}
}
6 changes: 6 additions & 0 deletions openraft/src/types/v070/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub struct InstallSnapshotRequest {
pub struct InstallSnapshotResponse {
/// The receiving node's current term, for leader to update itself.
pub term: u64,

/// The last applied log id after snapshot being installed.
///
/// A node may choose not to install a snapshot if it already has a greater `last_applied`.
/// In this case, it just returns the `last_applied`.
pub last_applied: Option<LogId>,
}

/// The response to a `ClientRequest`.
Expand Down

0 comments on commit 25e94c3

Please sign in to comment.