From 74283fda9c18510ed4cdb0435929180a7dc97585 Mon Sep 17 00:00:00 2001 From: drdr xp Date: Tue, 14 Sep 2021 23:32:05 +0800 Subject: [PATCH] change: RaftStorage::do_log_compaction() do not need to delete logs any more raft-core will delete them. --- async-raft/src/core/append_entries.rs | 11 +++- async-raft/src/core/client.rs | 23 ++++++-- async-raft/src/core/install_snapshot.rs | 7 ++- async-raft/src/core/mod.rs | 37 ++++++++++++ async-raft/src/raft.rs | 7 +++ async-raft/tests/clean_applied_logs.rs | 55 ++++++++++++++++++ async-raft/tests/compaction.rs | 58 +++++++++++++------ .../tests/snapshot_ge_half_threshold.rs | 1 + .../tests/snapshot_overrides_membership.rs | 5 +- .../snapshot_uses_prev_snap_membership.rs | 7 ++- memstore/src/lib.rs | 4 -- 11 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 async-raft/tests/clean_applied_logs.rs diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 02cd7f5b7..5b77281a4 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -1,3 +1,4 @@ +use crate::core::apply_to_state_machine; use crate::core::RaftCore; use crate::core::State; use crate::core::UpdateCurrentLeader; @@ -438,7 +439,10 @@ impl, S: RaftStorage> Ra tracing::debug!(?last_log_id); let entries_refs: Vec<_> = entries.iter().collect(); - self.storage.apply_to_state_machine(&entries_refs).await.map_err(|e| self.map_storage_error(e))?; + + apply_to_state_machine(self.storage.clone(), &entries_refs, self.config.max_applied_log_to_keep) + .await + .map_err(|e| self.map_storage_error(e))?; self.last_applied = last_log_id; @@ -473,7 +477,10 @@ impl, S: RaftStorage> Ra let new_last_applied = entries.last().unwrap(); let data_entries: Vec<_> = entries.iter().collect(); - storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_storage_error(e))?; + + apply_to_state_machine(storage, &data_entries, self.config.max_applied_log_to_keep) + .await + .map_err(|e| self.map_storage_error(e))?; self.last_applied = new_last_applied.log_id; self.report_metrics(Update::Ignore); diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 6fb87dd6c..5f45bf4bf 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -9,6 +9,7 @@ use tokio::time::timeout; use tokio::time::Duration; use tracing::Instrument; +use crate::core::apply_to_state_machine; use crate::core::LeaderState; use crate::core::State; use crate::error::ClientReadError; @@ -445,20 +446,30 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let data_entries: Vec<_> = entries.iter().collect(); if !data_entries.is_empty() { - self.core - .storage - .apply_to_state_machine(&data_entries) - .await - .map_err(|err| self.core.map_storage_error(err))?; + apply_to_state_machine( + self.core.storage.clone(), + &data_entries, + self.core.config.max_applied_log_to_keep, + ) + .await + .map_err(|err| self.core.map_storage_error(err))?; } } // Apply this entry to the state machine and return its data response. - let res = self.core.storage.apply_to_state_machine(&[entry]).await.map_err(|err| { + let apply_res = apply_to_state_machine( + self.core.storage.clone(), + &[entry], + self.core.config.max_applied_log_to_keep, + ) + .await; + + let res = apply_res.map_err(|err| { if let StorageError::IO { .. } = err { // If this is an instance of the storage impl's shutdown error, then trigger shutdown. self.core.map_storage_error(err) } else { + // TODO(xp): remove this // Else, we propagate normally. RaftError::RaftStorage(err.into()) } diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 62fe1aa8f..e51282236 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -3,6 +3,7 @@ use std::io::SeekFrom; use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; +use crate::core::delete_applied_logs; use crate::core::RaftCore; use crate::core::SnapshotState; use crate::core::State; @@ -203,6 +204,7 @@ impl, S: RaftStorage> Ra .map_err(|e| self.map_storage_error(e))?; tracing::debug!("update after apply or install-snapshot: {:?}", changes); + println!("update after apply or install-snapshot: {:?}", changes); // After installing snapshot, no inconsistent log is removed. // This does not affect raft consistency. @@ -210,7 +212,9 @@ impl, S: RaftStorage> Ra if let Some(last_applied) = changes.last_applied { // Applied logs are not needed. - self.storage.delete_logs_from(..=last_applied.index).await.map_err(|e| self.map_storage_error(e))?; + delete_applied_logs(self.storage.clone(), &last_applied, self.config.max_applied_log_to_keep) + .await + .map_err(|e| self.map_storage_error(e))?; // snapshot is installed self.last_applied = last_applied; @@ -221,6 +225,7 @@ impl, S: RaftStorage> Ra // There could be unknown membership in the snapshot. let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?; + println!("storage membership: {:?}", membership); self.update_membership(membership)?; diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index fc93e649f..54e3f6237 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -41,6 +41,7 @@ use crate::metrics::RaftMetrics; use crate::raft::ClientReadResponseTx; use crate::raft::ClientWriteRequest; use crate::raft::ClientWriteResponseTx; +use crate::raft::Entry; use crate::raft::EntryPayload; use crate::raft::MembershipConfig; use crate::raft::RaftMsg; @@ -435,6 +436,7 @@ impl, S: RaftStorage> Ra handle, sender: chan_tx.clone(), }); + tokio::spawn( async move { let f = storage.do_log_compaction(); @@ -495,6 +497,41 @@ impl, S: RaftStorage> Ra } } +#[tracing::instrument(level = "debug", skip(sto), fields(entries=%entries.summary()))] +async fn apply_to_state_machine( + sto: Arc, + entries: &[&Entry], + max_keep: u64, +) -> Result, StorageError> +where + D: AppData, + R: AppDataResponse, + S: RaftStorage, +{ + let last = entries.last().map(|x| x.log_id); + + if let Some(last_applied) = last { + // TODO(xp): apply_to_state_machine should return the last applied + let res = sto.apply_to_state_machine(entries).await?; + delete_applied_logs(sto, &last_applied, max_keep).await?; + Ok(res) + } else { + Ok(vec![]) + } +} + +#[tracing::instrument(level = "debug", skip(sto))] +async fn delete_applied_logs(sto: Arc, last_applied: &LogId, max_keep: u64) -> Result<(), StorageError> +where + D: AppData, + R: AppDataResponse, + S: RaftStorage, +{ + // TODO(xp): periodically batch delete + let x = last_applied.index.saturating_sub(max_keep); + sto.delete_logs_from(..=x).await +} + /// An enum describing the way the current leader property is to be updated. #[derive(Debug)] pub(self) enum UpdateCurrentLeader { diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index 12d9676dc..827185bcc 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -526,6 +526,13 @@ impl MessageSummary for Option> { } impl MessageSummary for &[Entry] { + fn summary(&self) -> String { + let entry_refs: Vec<_> = self.iter().collect(); + entry_refs.as_slice().summary() + } +} + +impl MessageSummary for &[&Entry] { fn summary(&self) -> String { let mut res = Vec::with_capacity(self.len()); for x in self.iter() { diff --git a/async-raft/tests/clean_applied_logs.rs b/async-raft/tests/clean_applied_logs.rs new file mode 100644 index 000000000..0642aed8a --- /dev/null +++ b/async-raft/tests/clean_applied_logs.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use async_raft::Config; +use async_raft::RaftStorage; +use fixtures::RaftRouter; +use maplit::btreeset; + +#[macro_use] +mod fixtures; + +/// Logs should be deleted by raft after applying them, on leader and non-voter. +/// +/// - assert logs are deleted on leader aftre applying them. +/// - assert logs are deleted on replication target after installing a snapshot. +/// +/// RUST_LOG=async_raft,memstore,clean_applied_logs=trace cargo test -p async-raft --test clean_applied_logs +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn clean_applied_logs() -> Result<()> { + let (_log_guard, ut_span) = init_ut!(); + let _ent = ut_span.enter(); + + // Setup test dependencies. + let config = Arc::new( + Config { + max_applied_log_to_keep: 2, + ..Default::default() + } + .validate()?, + ); + let router = Arc::new(RaftRouter::new(config.clone())); + + let mut n_logs = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; + + router.client_request_many(0, "0", (10 - n_logs) as usize).await; + n_logs = 10; + + router.wait_for_log(&btreeset! {0,1}, n_logs, timeout(), "write upto 10 logs").await?; + + tracing::info!("--- logs before max_applied_log_to_keep should be cleaned"); + { + for node_id in 0..1 { + let sto = router.get_storage_handle(&node_id).await?; + let logs = sto.get_log_entries(..).await?; + assert_eq!(2, logs.len(), "node {} should have only {} logs", node_id, 2); + } + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(5000)) +} diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index 6ef524b56..2656bb252 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use anyhow::Result; +use async_raft::raft::Entry; +use async_raft::raft::EntryPayload; use async_raft::raft::MembershipConfig; use async_raft::Config; use async_raft::LogId; +use async_raft::RaftStorage; use async_raft::SnapshotPolicy; use async_raft::State; use fixtures::RaftRouter; @@ -32,6 +35,7 @@ async fn compaction() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), + max_applied_log_to_keep: 2, ..Default::default() } .validate()?, @@ -39,10 +43,10 @@ async fn compaction() -> Result<()> { let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; - let mut want = 0; + let mut n_logs = 0; // Assert all nodes are in non-voter state & have no entries. - router.wait_for_log(&btreeset![0], want, None, "empty").await?; + router.wait_for_log(&btreeset![0], n_logs, None, "empty").await?; router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; @@ -50,26 +54,27 @@ async fn compaction() -> Result<()> { tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - want += 1; + n_logs += 1; - router.wait_for_log(&btreeset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], n_logs, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Send enough requests to the cluster that compaction on the node should be triggered. // Puts us exactly at the configured snapshot policy threshold. - router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; - want = snapshot_threshold; + router.client_request_many(0, "0", (snapshot_threshold - n_logs) as usize).await; + n_logs = snapshot_threshold; + + router.wait_for_log(&btreeset![0], n_logs, None, "write").await?; + router.assert_stable_cluster(Some(1), Some(n_logs)).await; + router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: n_logs }, None, "snapshot").await?; - router.wait_for_log(&btreeset![0], want, None, "write").await?; - router.assert_stable_cluster(Some(1), Some(want)).await; - router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; router .assert_storage_state( 1, - want, + n_logs, Some(0), - LogId { term: 1, index: want }, - Some((want.into(), 1, MembershipConfig { + LogId { term: 1, index: n_logs }, + Some((n_logs.into(), 1, MembershipConfig { members: btreeset![0], members_after_consensus: None, })), @@ -77,15 +82,32 @@ async fn compaction() -> Result<()> { .await; // Add a new node and assert that it received the same snapshot. - router.new_raft_node(1).await; + let sto1 = router.new_store(1).await; + sto1.append_to_log(&[&Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }]) + .await?; + + router.new_raft_node_with_sto(1, sto1.clone()).await; router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); tracing::info!("--- add 1 log after snapshot"); + { + router.client_request_many(0, "0", 1).await; + n_logs += 1; + } + + router.wait_for_log(&btreeset![0, 1], n_logs, None, "add follower").await?; - router.client_request_many(0, "0", 1).await; - want += 1; + tracing::info!("--- logs should be deleted after installing snapshot; left only the last one"); + { + let sto = router.get_storage_handle(&1).await?; + let logs = sto.get_log_entries(..).await?; + assert_eq!(1, logs.len()); + assert_eq!(LogId { term: 1, index: 51 }, logs[0].log_id) + } - router.wait_for_log(&btreeset![0, 1], want, None, "add follower").await?; let expected_snap = Some((snapshot_threshold.into(), 1, MembershipConfig { members: btreeset![0u64], members_after_consensus: None, @@ -93,9 +115,9 @@ async fn compaction() -> Result<()> { router .assert_storage_state( 1, - want, + n_logs, None, /* non-voter does not vote */ - LogId { term: 1, index: want }, + LogId { term: 1, index: n_logs }, expected_snap, ) .await; diff --git a/async-raft/tests/snapshot_ge_half_threshold.rs b/async-raft/tests/snapshot_ge_half_threshold.rs index 65b4d983e..4c2ff17ed 100644 --- a/async-raft/tests/snapshot_ge_half_threshold.rs +++ b/async-raft/tests/snapshot_ge_half_threshold.rs @@ -36,6 +36,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), + max_applied_log_to_keep: 6, ..Default::default() } .validate()?, diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index adf3db9d2..55dbb73f2 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -39,6 +39,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), + max_applied_log_to_keep: 0, ..Default::default() } .validate()?, @@ -130,11 +131,13 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!("--- DONE add non-voter"); router.wait_for_log(&btreeset![0, 1], want, timeout(), "add non-voter").await?; + router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?; + let expected_snap = Some((want.into(), 1, MembershipConfig { members: btreeset![0u64], members_after_consensus: None, })); - router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?; + router .assert_storage_state( 1, diff --git a/async-raft/tests/snapshot_uses_prev_snap_membership.rs b/async-raft/tests/snapshot_uses_prev_snap_membership.rs index bbb021fe0..bbf60c98a 100644 --- a/async-raft/tests/snapshot_uses_prev_snap_membership.rs +++ b/async-raft/tests/snapshot_uses_prev_snap_membership.rs @@ -5,6 +5,7 @@ use anyhow::Result; use async_raft::raft::MembershipConfig; use async_raft::Config; use async_raft::LogId; +use async_raft::MessageSummary; use async_raft::RaftStorage; use async_raft::SnapshotPolicy; use async_raft::State; @@ -35,6 +36,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), + max_applied_log_to_keep: 1, ..Default::default() } .validate()?, @@ -76,7 +78,8 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { { let logs = sto0.get_log_entries(..).await?; - assert_eq!(1, logs.len(), "only one snapshot pointer log"); + println!("{}", logs.as_slice().summary()); + assert_eq!(1, logs.len(), "only one applied log is kept"); } let m = sto0.get_membership_config().await?; assert_eq!( @@ -119,7 +122,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { { { let logs = sto0.get_log_entries(..).await?; - assert_eq!(1, logs.len(), "only one snapshot pointer log"); + assert_eq!(1, logs.len(), "only one applied log"); } let m = sto0.get_membership_config().await?; assert_eq!( diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index c3c993858..4b488f388 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -747,12 +747,8 @@ impl RaftStorage for MemStore { let meta; { - let mut log = self.log.write().await; let mut current_snapshot = self.current_snapshot.write().await; - // Leaves at least one log or replication can not find out the mismatched log. - *log = log.split_off(&last_applied_log.index); - let snapshot_id = format!("{}-{}-{}", last_applied_log.term, last_applied_log.index, snapshot_idx); meta = SnapshotMeta {