Skip to content

Commit

Permalink
Fix: when handling append-entries, if prev_log_id is purged, it sho…
Browse files Browse the repository at this point in the history
…uld not treat it as a **conflict**

- Fix: when handling append-entries, if `prev_log_id` is purged, it
  should not treat it as a **conflict** log and should not delete any
  log.

  This bug is caused by using `committed` as `last_applied`.
  `committed` may be smaller than `last_applied` when a follower just
  starts up.

  The solution is merging `committed` and `last_applied` into one field:
  `committed`, which is always greater than or equal the actually
  committed(applied).

- Change: remove `RaftState.last_applied`, use `committed` to represent
  the already committed and applied log id.

- Refactor: Command::LeaderCommit and Command::FollowerCommit
  • Loading branch information
drmingdrmer committed Aug 14, 2022
1 parent aee0ff0 commit 71a290c
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 69 deletions.
3 changes: 0 additions & 3 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
if st.committed < Some(last_applied) {
st.committed = Some(last_applied);
}
if st.last_applied < Some(last_applied) {
st.last_applied = Some(last_applied);
}

debug_assert!(st.last_purged_log_id() <= Some(last_applied));

Expand Down
46 changes: 20 additions & 26 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
keep_unsnapshoted_log: self.config.keep_unsnapshoted_log,
});

self.engine.state.last_applied = state.last_applied;

// Fetch the most recent snapshot in the system.
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
self.engine.snapshot_last_log_id = Some(snapshot.meta.last_log_id);
Expand Down Expand Up @@ -728,7 +726,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// --- data ---
current_term: self.engine.state.vote.term,
last_log_index: self.engine.state.last_log_id().map(|id| id.index),
last_applied: self.engine.state.last_applied,
last_applied: self.engine.state.committed,
snapshot: self.engine.snapshot_last_log_id,

// --- cluster ---
Expand Down Expand Up @@ -856,7 +854,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
let SnapshotPolicy::LogsSinceLast(threshold) = &self.config.snapshot_policy;

let last_applied = match self.engine.state.last_applied {
let last_applied = match self.engine.state.committed {
None => {
return;
}
Expand All @@ -870,8 +868,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

if !force {
// If we are below the threshold, then there is nothing to do.
if self.engine.state.last_applied.next_index() - self.engine.snapshot_last_log_id.next_index() < *threshold
{
if self.engine.state.committed.next_index() - self.engine.snapshot_last_log_id.next_index() < *threshold {
return;
}
}
Expand Down Expand Up @@ -954,10 +951,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn apply_to_state_machine(&mut self, upto_index: u64) -> Result<(), StorageError<C::NodeId>> {
pub(crate) async fn apply_to_state_machine(
&mut self,
since: u64,
upto_index: u64,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(upto_index = display(upto_index), "apply_to_state_machine");

let since = self.engine.state.last_applied.next_index();
let end = upto_index + 1;

debug_assert!(
Expand All @@ -978,8 +978,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let apply_results = self.storage.apply_to_state_machine(&entry_refs).await?;

let last_applied = entries[entries.len() - 1].log_id;
self.engine.state.last_applied = Some(last_applied);

tracing::debug!(last_applied = display(last_applied), "update last_applied");

if let Some(l) = &mut self.leader_data {
Expand Down Expand Up @@ -1029,16 +1027,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
);
}

/// Handle the post-commit logic for a client request.
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn leader_commit(&mut self, log_index: u64) -> Result<(), StorageError<C::NodeId>> {
self.leader_step_down();

self.apply_to_state_machine(log_index).await?;

Ok(())
}

/// Spawn a new replication stream returning its replication state handle.
#[tracing::instrument(level = "debug", skip(self))]
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -1656,13 +1644,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
unreachable!("it has to be a leader!!!");
}
}
Command::LeaderCommit { ref upto, .. } => {
for i in self.engine.state.last_applied.next_index()..(upto.index + 1) {
self.leader_commit(i).await?;
}
Command::LeaderCommit {
already_committed: ref committed,
ref upto,
} => {
self.apply_to_state_machine(committed.next_index(), upto.index).await?;
// Stepping down should be controlled by Engine.
self.leader_step_down();
}
Command::FollowerCommit { upto, .. } => {
self.apply_to_state_machine(upto.index).await?;
Command::FollowerCommit {
already_committed: ref committed,
ref upto,
} => {
self.apply_to_state_machine(committed.next_index(), upto.index).await?;
}
Command::ReplicateEntries { upto } => {
if let Some(l) = &self.leader_data {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/calc_purge_upto_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn eng() -> Engine<u64, ()> {

#[test]
fn test_calc_purge_upto() -> anyhow::Result<()> {
// last_purged_log_id, last_applied, max_keep, want
// last_purged_log_id, committed, max_keep, want
let cases = vec![
//
(None, None, 0, None),
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ where
/// Commit entries that are already in the store, upto `upto`, inclusive.
/// And send applied result to the client that proposed the entry.
LeaderCommit {
since: Option<LogId<NID>>,
// TODO: pass the log id list?
// TODO: merge LeaderCommit and FollowerCommit
already_committed: Option<LogId<NID>>,
upto: LogId<NID>,
},

/// Commit entries that are already in the store, upto `upto`, inclusive.
FollowerCommit {
since: Option<LogId<NID>>,
already_committed: Option<LogId<NID>>,
upto: LogId<NID>,
},

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn test_elect() -> anyhow::Result<()> {
},),
},
Command::LeaderCommit {
since: None,
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index: 0,
Expand Down Expand Up @@ -152,7 +152,7 @@ fn test_elect() -> anyhow::Result<()> {
},),
},
Command::LeaderCommit {
since: None,
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
index: 0,
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ where
tracing::debug!(
my_vote = display(self.state.vote),
my_last_log_id = display(self.state.last_log_id().summary()),
my_last_applied = display(self.state.last_applied.summary()),
my_committed = display(self.state.committed.summary()),
"local state"
);

Expand Down Expand Up @@ -431,7 +431,7 @@ where
if let Some(prev_committed) = self.state.update_committed(&committed) {
self.push_command(Command::FollowerCommit {
// TODO(xp): when restart, commit is reset to None. Use last_applied instead.
since: prev_committed,
already_committed: prev_committed,
upto: committed.unwrap(),
});
self.purge_applied_log();
Expand Down Expand Up @@ -741,7 +741,7 @@ where
committed: self.state.committed,
});
self.push_command(Command::LeaderCommit {
since: prev_committed,
already_committed: prev_committed,
upto: self.state.committed.unwrap(),
});
self.purge_applied_log();
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/engine/follower_commit_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ fn test_follower_commit_entries_lt_last_entry() -> anyhow::Result<()> {

assert_eq!(
vec![Command::FollowerCommit {
since: Some(log_id(1, 1)),
already_committed: Some(log_id(1, 1)),
upto: log_id(2, 3)
}],
eng.commands
Expand Down Expand Up @@ -167,7 +167,7 @@ fn test_follower_commit_entries_gt_last_entry() -> anyhow::Result<()> {

assert_eq!(
vec![Command::FollowerCommit {
since: Some(log_id(1, 1)),
already_committed: Some(log_id(1, 1)),
upto: log_id(2, 3)
}],
eng.commands
Expand All @@ -191,7 +191,7 @@ fn test_follower_commit_entries_purge_to_committed() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::FollowerCommit {
since: Some(log_id(1, 1)),
already_committed: Some(log_id(1, 1)),
upto: log_id(2, 3)
},
Command::PurgeLog { upto: log_id(2, 3) },
Expand All @@ -217,7 +217,7 @@ fn test_follower_commit_entries_purge_to_committed_minus_1() -> anyhow::Result<(
assert_eq!(
vec![
Command::FollowerCommit {
since: Some(log_id(1, 1)),
already_committed: Some(log_id(1, 1)),
upto: log_id(2, 3)
},
Command::PurgeLog { upto: log_id(1, 2) },
Expand Down
64 changes: 61 additions & 3 deletions openraft/src/engine/handle_append_entries_req_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ fn eng() -> Engine<u64, ()> {
id: 2, // make it a member
..Default::default()
};
eng.state.last_applied = Some(log_id(0, 0));
eng.state.vote = Vote::new(2, 1);
eng.state.server_state = ServerState::Candidate;
eng.state.log_ids.append(log_id(1, 1));
Expand Down Expand Up @@ -103,6 +102,65 @@ fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> {
Ok(())
}

#[test]
fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> {
// An applied log id has to be committed thus
let mut eng = eng();
eng.state.vote = Vote::new(1, 2);
eng.enter_leading();

let resp = eng.handle_append_entries_req(
&Vote::new_committed(2, 1),
Some(log_id(0, 0)),
&Vec::<Entry<Foo>>::new(),
None,
);

assert_eq!(AppendEntriesResponse::Success, resp);
assert_eq!(
&[
log_id(1, 1), //
log_id(2, 3), //
],
eng.state.log_ids.key_log_ids()
);
assert_eq!(Vote::new_committed(2, 1), eng.state.vote);
assert_eq!(Some(log_id(2, 3)), eng.state.last_log_id());
assert_eq!(Some(log_id(0, 0)), eng.state.committed);
assert_eq!(
MembershipState {
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())),
effective: Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23()))
},
eng.state.membership_state
);
assert_eq!(ServerState::Follower, eng.state.server_state);

assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: true,
cluster: true,
},
eng.metrics_flags
);

assert_eq!(
vec![
Command::SaveVote {
vote: Vote::new_committed(2, 1)
},
Command::InstallElectionTimer { can_be_leader: false },
Command::UpdateServerState {
server_state: ServerState::Follower
},
],
eng.commands
);

Ok(())
}

#[test]
fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> {
let mut eng = eng();
Expand Down Expand Up @@ -218,7 +276,7 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<(
Command::AppendInputEntries { range: 1..2 },
Command::MoveInputCursorBy { n: 2 },
Command::FollowerCommit {
since: Some(log_id(0, 0)),
already_committed: Some(log_id(0, 0)),
upto: log_id(1, 1)
},
],
Expand Down Expand Up @@ -353,7 +411,7 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> {
},
Command::MoveInputCursorBy { n: 2 },
Command::FollowerCommit {
since: Some(log_id(0, 0)),
already_committed: Some(log_id(0, 0)),
upto: log_id(3, 3)
},
],
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
},),
},
Command::LeaderCommit {
since: None,
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index: 1,
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/engine/leader_append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn eng() -> Engine<u64, ()> {
id: 1, // make it a member
..Default::default()
};
eng.state.last_applied = Some(log_id(0, 0));
eng.state.committed = Some(log_id(0, 0));
eng.state.vote = Vote::new_committed(3, 1);
eng.state.log_ids.append(log_id(1, 1));
eng.state.log_ids.append(log_id(2, 3));
Expand Down Expand Up @@ -210,7 +210,7 @@ fn test_leader_append_entries_fast_commit() -> anyhow::Result<()> {
committed: Some(log_id(3, 6))
},
Command::LeaderCommit {
since: None,
already_committed: Some(log_id(0, 0)),
upto: LogId::new(LeaderId::new(3, 1), 6)
},
Command::ReplicateEntries {
Expand Down Expand Up @@ -282,7 +282,7 @@ fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Res
committed: Some(log_id(3, 4))
},
Command::LeaderCommit {
since: None,
already_committed: Some(log_id(0, 0)),
upto: LogId::new(LeaderId::new(3, 1), 4)
},
Command::UpdateMembership {
Expand Down Expand Up @@ -364,7 +364,7 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow
committed: Some(log_id(3, 4))
},
Command::LeaderCommit {
since: None,
already_committed: Some(log_id(0, 0)),
upto: LogId::new(LeaderId::new(3, 1), 4)
},
Command::UpdateMembership {
Expand All @@ -381,7 +381,7 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow
committed: Some(log_id(3, 6))
},
Command::LeaderCommit {
since: Some(LogId::new(LeaderId::new(3, 1), 4)),
already_committed: Some(LogId::new(LeaderId::new(3, 1), 4)),
upto: LogId::new(LeaderId::new(3, 1), 6)
},
Command::ReplicateEntries {
Expand Down Expand Up @@ -465,7 +465,7 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a
committed: Some(log_id(3, 6))
},
Command::LeaderCommit {
since: None,
already_committed: Some(log_id(0, 0)),
upto: LogId::new(LeaderId::new(3, 1), 6)
},
Command::ReplicateEntries {
Expand Down
Loading

0 comments on commit 71a290c

Please sign in to comment.