Skip to content

Commit

Permalink
Fix: last_purged_log_id is not loaded correctly
Browse files Browse the repository at this point in the history
- Fix: `last_purged_log_id` should be `None`, but not `LogId{index=0,
  ..}` when raft startup with a store with log at index 0.

  This is fixed by adding another field `next_purge` to distinguish
  `last_purged_log_id` value `None` and `LogId{index=0, ..}`, because
  `RaftState.log_ids` stores `LogId` but not `Option<LogId>`.

- Add a wrapper `Valid<RaftState>` of `RaftState` to check if the state
  is valid every time accessing it. This check is done only when
  `debug_assertions` is turned on.
  • Loading branch information
drmingdrmer committed Jan 9, 2023
1 parent 465bc63 commit cc8af8c
Show file tree
Hide file tree
Showing 26 changed files with 379 additions and 10 deletions.
7 changes: 5 additions & 2 deletions openraft/src/engine/calc_purge_upto_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ fn log_id(term: u64, index: u64) -> LogId<u64> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.log_ids = LogIdList::new(vec![
//
log_id(0, 0),
Expand All @@ -32,7 +34,7 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {
(None, None, 1, None),
//
(None, Some(log_id(1, 1)), 0, Some(log_id(1, 1))),
(None, Some(log_id(1, 1)), 1, None),
(None, Some(log_id(1, 1)), 1, Some(log_id(0, 0))),
(None, Some(log_id(1, 1)), 2, None),
//
(Some(log_id(0, 0)), Some(log_id(1, 1)), 0, Some(log_id(1, 1))),
Expand All @@ -43,7 +45,7 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {
(None, Some(log_id(3, 4)), 1, Some(log_id(3, 3))),
(None, Some(log_id(3, 4)), 2, Some(log_id(1, 2))),
(None, Some(log_id(3, 4)), 3, Some(log_id(1, 1))),
(None, Some(log_id(3, 4)), 4, None),
(None, Some(log_id(3, 4)), 4, Some(log_id(0, 0))),
(None, Some(log_id(3, 4)), 5, None),
//
(Some(log_id(1, 2)), Some(log_id(3, 4)), 0, Some(log_id(3, 4))),
Expand All @@ -61,6 +63,7 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {

if let Some(last_purged) = last_purged {
eng.state.log_ids.purge(&last_purged);
eng.state.next_purge = last_purged.index + 1;
}
eng.state.snapshot_meta.last_log_id = snapshot_last_log_id;
let got = eng.calc_purge_upto();
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/engine/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ fn m12() -> Membership<u64, ()> {
}

fn eng() -> Engine<u64, ()> {
Engine::default()
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state
eng
}

#[test]
Expand Down
23 changes: 18 additions & 5 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::raft_types::RaftLogId;
use crate::summary::MessageSummary;
use crate::valid::Valid;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
Expand Down Expand Up @@ -106,7 +107,7 @@ where
pub(crate) config: EngineConfig<NID>,

/// The state of this raft node.
pub(crate) state: RaftState<NID, N>,
pub(crate) state: Valid<RaftState<NID, N>>,

/// The internal server state used by Engine.
pub(crate) internal_server_state: InternalServerState<NID>,
Expand All @@ -120,10 +121,10 @@ where
N: Node,
NID: NodeId,
{
pub(crate) fn new(init_state: &RaftState<NID, N>, config: EngineConfig<NID>) -> Self {
pub(crate) fn new(init_state: RaftState<NID, N>, config: EngineConfig<NID>) -> Self {
Self {
config,
state: init_state.clone(),
state: Valid::new(init_state),
internal_server_state: InternalServerState::default(),
output: EngineOutput::default(),
}
Expand Down Expand Up @@ -657,7 +658,7 @@ where
return;
}

st.log_ids.purge(&upto);
st.purge_log(&upto);

self.output.push_command(Command::PurgeLog { upto });
}
Expand Down Expand Up @@ -859,6 +860,11 @@ where

tracing::info!("install_snapshot: meta:{:?}", meta);

// TODO: temp solution: committed is updated after snapshot_last_log_id.
// committed should be updated first or together with snapshot_last_log_id(i.e., extract `state` first).
let old_validate = self.state.enable_validate;
self.state.enable_validate = false;

let snap_last_log_id = meta.last_log_id;

if snap_last_log_id <= self.state.committed {
Expand All @@ -868,6 +874,8 @@ where
self.state.committed.summary()
);
self.output.push_command(Command::CancelSnapshot { snapshot_meta: meta });
// TODO: temp solution: committed is updated after snapshot_last_log_id.
self.state.enable_validate = old_validate;
return;
}

Expand All @@ -877,6 +885,8 @@ where
let mut snap_handler = self.snapshot_handler();
let updated = snap_handler.update_snapshot(meta.clone());
if !updated {
// TODO: temp solution: committed is updated after snapshot_last_log_id.
self.state.enable_validate = old_validate;
return;
}

Expand Down Expand Up @@ -924,7 +934,10 @@ where
// In the second case, if local-last-log-id is smaller than snapshot-last-log-id,
// and this node crashes after installing snapshot and before purging logs,
// the log will be purged the next start up, in [`RaftState::get_initial_state`].
self.purge_log(snap_last_log_id)
self.purge_log(snap_last_log_id);

// TODO: temp solution: committed is updated after snapshot_last_log_id.
self.state.enable_validate = old_validate;
}

#[tracing::instrument(level = "debug", skip_all)]
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/follower_commit_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ fn m23() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.committed = Some(log_id(1, 1));
eng.state.membership_state.committed = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23()));
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/follower_do_append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ fn m45() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.config.id = 2;
eng.state.log_ids.append(log_id(1, 1));
eng.state.log_ids.append(log_id(2, 3));
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/handle_append_entries_req_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ fn m34() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.config.id = 2;
eng.state.vote = Vote::new(2, 1);
eng.state.log_ids.append(log_id(1, 1));
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/handle_vote_req_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ fn m01() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.vote = Vote::new(2, 1);
eng.state.server_state = ServerState::Candidate;
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
Expand Down
5 changes: 4 additions & 1 deletion openraft/src/engine/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ fn m1234() -> Membership<u64, ()> {
}

fn eng() -> Engine<u64, ()> {
Engine::<u64, ()>::default()
let mut eng = Engine::<u64, ()>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/handler/snapshot_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod tests {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()> { ..Default::default() };
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/engine/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::Vote;
fn test_initialize_single_node() -> anyhow::Result<()> {
let eng = || {
let mut eng = Engine::<u64, ()>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.server_state = eng.calc_server_state();
eng
};
Expand Down Expand Up @@ -130,6 +132,8 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
fn test_initialize() -> anyhow::Result<()> {
let eng = || {
let mut eng = Engine::<u64, ()>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.server_state = eng.calc_server_state();
eng
};
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/install_snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn m1234() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()> { ..Default::default() };
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.committed = Some(log_id(4, 5));
eng.state.log_ids = LogIdList::new(vec![
Expand Down Expand Up @@ -199,6 +200,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
// And there should be no conflicting logs left.
let mut eng = {
let mut eng = Engine::<u64, ()> { ..Default::default() };
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.committed = Some(log_id(2, 3));
eng.state.log_ids = LogIdList::new(vec![
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/internal_handle_vote_req_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ fn m01() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.vote = Vote::new(2, 1);
eng.state.server_state = ServerState::Candidate;
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/leader_append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ fn m34() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.config.id = 1;
eng.state.committed = Some(log_id(0, 0));
eng.state.vote = Vote::new_committed(3, 1);
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/engine/purge_log_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ fn log_id(term: u64, index: u64) -> LogId<u64> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.log_ids = LogIdList::new(vec![log_id(2, 2), log_id(4, 4), log_id(4, 6)]);
eng.state.next_purge = 3;
eng
}

Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/truncate_logs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ fn m23() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.config.id = 2;
eng.state.log_ids = LogIdList::new(vec![
log_id(2, 2), //
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/update_progress_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ fn m123() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.config.id = 2;
eng.state.vote = Vote::new_committed(2, 1);
eng.state.membership_state.committed = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
Expand Down
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod runtime;
pub mod storage;
pub mod testing;
pub mod timer;
pub(crate) mod valid;
pub mod versioned;

#[cfg(test)] mod raft_state_test;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
// TODO(xp): this is not necessary.
storage.save_vote(&state.vote).await?;

let engine = Engine::new(&state, EngineConfig {
let engine = Engine::new(state, EngineConfig {
id,
max_in_snapshot_log_to_keep: config.max_in_snapshot_log_to_keep,
purge_batch_size: config.purge_batch_size,
Expand Down
35 changes: 35 additions & 0 deletions openraft/src/raft_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::error::Error;

use crate::engine::LogIdList;
use crate::equal;
use crate::less_equal;
use crate::node::Node;
use crate::raft_types::RaftLogId;
use crate::valid::Validate;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MembershipState;
Expand Down Expand Up @@ -57,6 +62,8 @@ where
/// - A quorum could be a uniform quorum or joint quorum.
pub committed: Option<LogId<NID>>,

pub(crate) next_purge: u64,

/// All log ids this node has.
pub log_ids: LogIdList<NID>,

Expand Down Expand Up @@ -94,10 +101,33 @@ where
}

fn last_purged_log_id(&self) -> Option<&LogId<NID>> {
if self.next_purge == 0 {
return None;
}
self.log_ids.first()
}
}

impl<NID, N> Validate for RaftState<NID, N>
where
NID: NodeId,
N: Node,
{
fn validate(&self) -> Result<(), Box<dyn Error>> {
if self.next_purge == 0 {
less_equal!(self.log_ids.first().index(), Some(0));
} else {
equal!(self.next_purge, self.log_ids.first().next_index());
}

less_equal!(self.last_purged_log_id(), self.snapshot_last_log_id());
less_equal!(self.snapshot_last_log_id(), self.committed());
less_equal!(self.committed(), self.last_log_id());

Ok(())
}
}

impl<NID, N> RaftState<NID, N>
where
NID: NodeId,
Expand Down Expand Up @@ -156,4 +186,9 @@ where
None
}
}

pub(crate) fn purge_log(&mut self, upto: &LogId<NID>) {
self.next_purge = upto.index + 1;
self.log_ids.purge(upto);
}
}
2 changes: 2 additions & 0 deletions openraft/src/raft_state_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ fn test_raft_state_last_purged_log_id() -> anyhow::Result<()> {

let rs = RaftState::<u64, ()> {
log_ids: LogIdList::new(vec![log_id(1, 2)]),
next_purge: 3,
..Default::default()
};
assert_eq!(Some(log_id(1, 2)), rs.last_purged_log_id().copied());

let rs = RaftState::<u64, ()> {
log_ids: LogIdList::new(vec![log_id(1, 2), log_id(3, 4)]),
next_purge: 3,
..Default::default()
};
assert_eq!(Some(log_id(1, 2)), rs.last_purged_log_id().copied());
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ where
last_purged_log_id = last_applied;
}

println!("purged: {:?}", last_purged_log_id);
println!("last: {:?}", last_log_id);
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self).await?;
println!("log_ids: {:?}", log_ids);

let snapshot_meta = self.sto.get_current_snapshot().await?.map(|x| x.meta).unwrap_or_default();

Expand All @@ -62,6 +65,7 @@ where
// The initial value for `vote` is the minimal possible value.
// See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization)
vote: vote.unwrap_or_default(),
next_purge: last_purged_log_id.next_index(),
log_ids,
membership_state: mem_state,
snapshot_meta,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ where
snapshot_id: self.snapshot_id.clone(),
}
}

/// Returns a ref to the id of the last log that is included in this snasphot.
pub fn last_log_id(&self) -> Option<&LogId<NID>> {
self.last_log_id.as_ref()
}
}

/// The data associated with the current snapshot.
Expand Down
1 change: 1 addition & 0 deletions openraft/src/valid/bench/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod valid_deref;
Loading

0 comments on commit cc8af8c

Please sign in to comment.