Skip to content

Commit

Permalink
Fix: consistency issue between ReplicationCore.last_log_id and last_l…
Browse files Browse the repository at this point in the history
…og_state.last_log_id
  • Loading branch information
drmingdrmer committed Feb 28, 2022
1 parent d4e3a66 commit 1219a88
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
5 changes: 4 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}

let start = prev_index.next_index();
let end = std::cmp::min(start + self.config.max_payload_entries, self.last_log_id.next_index());
let end = std::cmp::min(
start + self.config.max_payload_entries,
log_state.last_log_id.next_index(),
);

tracing::debug!(
?self.matched,
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/append_entries/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ mod t30_append_inconsistent_log;
mod t40_append_updates_membership;
mod t50_append_entries_with_bigger_term;
mod t60_large_heartbeat;
mod t90_issue_216_stale_last_log_id;
65 changes: 65 additions & 0 deletions openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::Config;
use tracing::Instrument;

use crate::fixtures::RaftRouter;

/// Ensures the stale value of ReplicationCore.last_log_id won't affect replication.
/// If `ReplicationCore.last_log_id` is used, the end position of log for loading may underflow the start.
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn stale_last_log_id() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();

async {
// Setup test dependencies.
let config = Arc::new(
Config {
heartbeat_interval: 50,
election_timeout_min: 500,
election_timeout_max: 1000,
max_payload_entries: 1,
max_applied_log_to_keep: 0,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());
router.network_send_delay(5);

let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3,4}).await?;

let n_threads = 4;
let n_ops = 500;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

for i in 0..n_threads {
tokio::spawn({
let router = router.clone();
let tx = tx.clone();

async move {
router.client_request_many(0, &format!("{}", i), n_ops).await;
let _ = tx.send(());
}
});
}

for _i in 0..n_threads {
let _ = rx.recv().await;
log_index += n_ops as u64;
}

router.wait(&1, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?;
router.wait(&2, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?;
router.wait(&3, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?;
router.wait(&4, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?;

Ok(())
}
.instrument(ut_span)
.await
}

0 comments on commit 1219a88

Please sign in to comment.