Skip to content

Commit

Permalink
Fix: when handling append-entries, if prev_log_id is purged, it shoul…
Browse files Browse the repository at this point in the history
…d not delete any logs.

When handling append-entries, if the local log at `prev_log_id.index` is
purged, a follower should not believe it is a **conflict** and should
not delete all logs. It will get committed log lost.

To fix this issue, use `last_applied` instead of `committed`:
`last_applied` is always the committed log id, while `committed` is not
persisted and may be smaller than the actually applied, when a follower
is restarted.
  • Loading branch information
drmingdrmer committed Aug 14, 2022
1 parent 0355a60 commit 44381b0
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 3 deletions.
9 changes: 6 additions & 3 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// - keep track of last_log_id, first_log_id,
// RaftStorage should only provides the least basic APIs.

self.storage.delete_conflict_logs_since(start).await?;
let res = self.storage.delete_conflict_logs_since(start).await;
tracing::debug!("delete_conflict_logs_since res: {:?}", res);

res?;

self.last_log_id = self.storage.get_log_state().await?.last_log_id;

Expand Down Expand Up @@ -278,7 +281,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
for i in 0..l {
let log_id = entries[i].log_id;

if Some(log_id) <= self.committed {
if Some(log_id) <= self.last_applied {
continue;
}

Expand Down Expand Up @@ -312,7 +315,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
};

// Committed entries are always safe and are consistent to a valid leader.
if remote_log_id <= self.committed {
if remote_log_id <= self.last_applied {
return Ok(None);
}

Expand Down
4 changes: 4 additions & 0 deletions openraft/src/defensive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ where
}

async fn defensive_delete_conflict_gt_last_applied(&self, since: LogId) -> Result<(), StorageError> {
if !self.is_defensive() {
return Ok(());
}

let (last_applied, _) = self.inner().last_applied_state().await?;
if Some(since.index) <= last_applied.index() {
return Err(
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/append_entries/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod fixtures;
mod t10_conflict_with_empty_entries;
mod t20_append_conflicts;
mod t30_append_inconsistent_log;
mod t31_append_prev_is_purged;
mod t40_append_updates_membership;
mod t50_append_entries_with_bigger_term;
mod t60_large_heartbeat;
97 changes: 97 additions & 0 deletions openraft/tests/append_entries/t31_append_prev_is_purged.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::sync::Arc;

use anyhow::Result;
use maplit::btreeset;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::AppendEntriesRequest;
use openraft::Config;
use openraft::DefensiveCheck;
use openraft::LogId;
use openraft::Membership;
use openraft::Raft;
use openraft::RaftStorage;

use crate::fixtures::blank;
use crate::fixtures::RaftRouter;

/// When handling append-entries, if the local log at `prev_log_id.index` is purged, a follower should not believe it is
/// a **conflict** and should not delete all logs. Which will get committed log lost.
///
/// Fake a raft node with one log (1,3) and set last-applied to (1,2).
/// Then an append-entries with `prev_log_id=(1,2)` should not be considered as **conflict**.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn append_prev_is_purged() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(
Config {
max_applied_log_to_keep: 2,
..Default::default()
}
.validate()?,
);
let router = Arc::new(RaftRouter::new(config.clone()));

tracing::info!("--- fake store: logs: (1,3), last_applied == last_purged == (1,2)");
let sto0 = {
let sto0 = router.new_store().await;

// With defensive==true, it will panic.
sto0.set_defensive(false);

let entries = [
&Entry {
log_id: LogId { term: 0, index: 0 },
payload: EntryPayload::Membership(Membership::new_single(btreeset! {0,1})),
},
&blank(1, 1),
&blank(1, 2),
&blank(1, 3),
];

sto0.append_to_log(&entries).await?;
sto0.apply_to_state_machine(&entries[0..3]).await?;
sto0.purge_logs_upto(LogId::new(1, 2)).await?;

let logs = sto0.try_get_log_entries(..).await?;
tracing::debug!("logs left after purge: {:?}", logs);
assert_eq!(LogId::new(1, 3), logs[0].log_id);

sto0
};

tracing::info!("--- new node with faked sto");
let node0 = {
let config0 = Arc::new(
Config {
max_applied_log_to_keep: 1,
..Default::default()
}
.validate()?,
);
let node0 = Raft::new(0, config0.clone(), router.clone(), sto0.clone());
router.add_raft_node(0, node0.clone(), sto0.clone()).await;
node0
};

tracing::info!("--- append-entries with prev_log_id=(1,2), should not erase any logs");
{
node0
.append_entries(AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_id: Some(LogId::new(1, 2)),
entries: vec![],
leader_commit: None,
})
.await?;

let logs = sto0.try_get_log_entries(..).await?;
tracing::debug!("logs left after append: {:?}", logs);
assert_eq!(LogId::new(1, 3), logs[0].log_id);
}

Ok(())
}
5 changes: 5 additions & 0 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ impl RaftRouter {
rt.insert(id, (node, sto));
}

pub async fn add_raft_node(self: &Arc<Self>, id: NodeId, node: MemRaft, sto: Arc<StoreWithDefensive>) {
let mut rt = self.routing_table.write().await;
rt.insert(id, (node, sto));
}

/// Remove the target node from the routing table & isolation.
pub async fn remove_node(&self, id: NodeId) -> Option<(MemRaft, Arc<StoreWithDefensive>)> {
let mut rt = self.routing_table.write().await;
Expand Down

0 comments on commit 44381b0

Please sign in to comment.