Skip to content

Commit

Permalink
change: RaftStorage::do_log_compaction() do not need to delete logs a…
Browse files Browse the repository at this point in the history
…ny more raft-core will delete them.
  • Loading branch information
drmingdrmer committed Sep 14, 2021
1 parent ac4bf4b commit 74283fd
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 34 deletions.
11 changes: 9 additions & 2 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::core::apply_to_state_machine;
use crate::core::RaftCore;
use crate::core::State;
use crate::core::UpdateCurrentLeader;
Expand Down Expand Up @@ -438,7 +439,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::debug!(?last_log_id);

let entries_refs: Vec<_> = entries.iter().collect();
self.storage.apply_to_state_machine(&entries_refs).await.map_err(|e| self.map_storage_error(e))?;

apply_to_state_machine(self.storage.clone(), &entries_refs, self.config.max_applied_log_to_keep)
.await
.map_err(|e| self.map_storage_error(e))?;

self.last_applied = last_log_id;

Expand Down Expand Up @@ -473,7 +477,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let new_last_applied = entries.last().unwrap();

let data_entries: Vec<_> = entries.iter().collect();
storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_storage_error(e))?;

apply_to_state_machine(storage, &data_entries, self.config.max_applied_log_to_keep)
.await
.map_err(|e| self.map_storage_error(e))?;

self.last_applied = new_last_applied.log_id;
self.report_metrics(Update::Ignore);
Expand Down
23 changes: 17 additions & 6 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::time::timeout;
use tokio::time::Duration;
use tracing::Instrument;

use crate::core::apply_to_state_machine;
use crate::core::LeaderState;
use crate::core::State;
use crate::error::ClientReadError;
Expand Down Expand Up @@ -445,20 +446,30 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

let data_entries: Vec<_> = entries.iter().collect();
if !data_entries.is_empty() {
self.core
.storage
.apply_to_state_machine(&data_entries)
.await
.map_err(|err| self.core.map_storage_error(err))?;
apply_to_state_machine(
self.core.storage.clone(),
&data_entries,
self.core.config.max_applied_log_to_keep,
)
.await
.map_err(|err| self.core.map_storage_error(err))?;
}
}

// Apply this entry to the state machine and return its data response.
let res = self.core.storage.apply_to_state_machine(&[entry]).await.map_err(|err| {
let apply_res = apply_to_state_machine(
self.core.storage.clone(),
&[entry],
self.core.config.max_applied_log_to_keep,
)
.await;

let res = apply_res.map_err(|err| {
if let StorageError::IO { .. } = err {
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
self.core.map_storage_error(err)
} else {
// TODO(xp): remove this
// Else, we propagate normally.
RaftError::RaftStorage(err.into())
}
Expand Down
7 changes: 6 additions & 1 deletion async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io::SeekFrom;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;

use crate::core::delete_applied_logs;
use crate::core::RaftCore;
use crate::core::SnapshotState;
use crate::core::State;
Expand Down Expand Up @@ -203,14 +204,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.map_err(|e| self.map_storage_error(e))?;

tracing::debug!("update after apply or install-snapshot: {:?}", changes);
println!("update after apply or install-snapshot: {:?}", changes);

// After installing snapshot, no inconsistent log is removed.
// This does not affect raft consistency.
// If you have any question about this, let me know: drdr.xp at gmail.com

if let Some(last_applied) = changes.last_applied {
// Applied logs are not needed.
self.storage.delete_logs_from(..=last_applied.index).await.map_err(|e| self.map_storage_error(e))?;
delete_applied_logs(self.storage.clone(), &last_applied, self.config.max_applied_log_to_keep)
.await
.map_err(|e| self.map_storage_error(e))?;

// snapshot is installed
self.last_applied = last_applied;
Expand All @@ -221,6 +225,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// There could be unknown membership in the snapshot.
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?;
println!("storage membership: {:?}", membership);

self.update_membership(membership)?;

Expand Down
37 changes: 37 additions & 0 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::metrics::RaftMetrics;
use crate::raft::ClientReadResponseTx;
use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponseTx;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::MembershipConfig;
use crate::raft::RaftMsg;
Expand Down Expand Up @@ -435,6 +436,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
handle,
sender: chan_tx.clone(),
});

tokio::spawn(
async move {
let f = storage.do_log_compaction();
Expand Down Expand Up @@ -495,6 +497,41 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
}

#[tracing::instrument(level = "debug", skip(sto), fields(entries=%entries.summary()))]
async fn apply_to_state_machine<D, R, S>(
sto: Arc<S>,
entries: &[&Entry<D>],
max_keep: u64,
) -> Result<Vec<R>, StorageError>
where
D: AppData,
R: AppDataResponse,
S: RaftStorage<D, R>,
{
let last = entries.last().map(|x| x.log_id);

if let Some(last_applied) = last {
// TODO(xp): apply_to_state_machine should return the last applied
let res = sto.apply_to_state_machine(entries).await?;
delete_applied_logs(sto, &last_applied, max_keep).await?;
Ok(res)
} else {
Ok(vec![])
}
}

#[tracing::instrument(level = "debug", skip(sto))]
async fn delete_applied_logs<D, R, S>(sto: Arc<S>, last_applied: &LogId, max_keep: u64) -> Result<(), StorageError>
where
D: AppData,
R: AppDataResponse,
S: RaftStorage<D, R>,
{
// TODO(xp): periodically batch delete
let x = last_applied.index.saturating_sub(max_keep);
sto.delete_logs_from(..=x).await
}

/// An enum describing the way the current leader property is to be updated.
#[derive(Debug)]
pub(self) enum UpdateCurrentLeader {
Expand Down
7 changes: 7 additions & 0 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ impl<D: AppData> MessageSummary for Option<Entry<D>> {
}

impl<D: AppData> MessageSummary for &[Entry<D>] {
fn summary(&self) -> String {
let entry_refs: Vec<_> = self.iter().collect();
entry_refs.as_slice().summary()
}
}

impl<D: AppData> MessageSummary for &[&Entry<D>] {
fn summary(&self) -> String {
let mut res = Vec::with_capacity(self.len());
for x in self.iter() {
Expand Down
55 changes: 55 additions & 0 deletions async-raft/tests/clean_applied_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use async_raft::RaftStorage;
use fixtures::RaftRouter;
use maplit::btreeset;

#[macro_use]
mod fixtures;

/// Logs should be deleted by raft after applying them, on leader and non-voter.
///
/// - assert logs are deleted on leader aftre applying them.
/// - assert logs are deleted on replication target after installing a snapshot.
///
/// RUST_LOG=async_raft,memstore,clean_applied_logs=trace cargo test -p async-raft --test clean_applied_logs
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn clean_applied_logs() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(
Config {
max_applied_log_to_keep: 2,
..Default::default()
}
.validate()?,
);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut n_logs = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?;

router.client_request_many(0, "0", (10 - n_logs) as usize).await;
n_logs = 10;

router.wait_for_log(&btreeset! {0,1}, n_logs, timeout(), "write upto 10 logs").await?;

tracing::info!("--- logs before max_applied_log_to_keep should be cleaned");
{
for node_id in 0..1 {
let sto = router.get_storage_handle(&node_id).await?;
let logs = sto.get_log_entries(..).await?;
assert_eq!(2, logs.len(), "node {} should have only {} logs", node_id, 2);
}
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(5000))
}
58 changes: 40 additions & 18 deletions async-raft/tests/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;

use anyhow::Result;
use async_raft::raft::Entry;
use async_raft::raft::EntryPayload;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::LogId;
use async_raft::RaftStorage;
use async_raft::SnapshotPolicy;
use async_raft::State;
use fixtures::RaftRouter;
Expand Down Expand Up @@ -32,70 +35,89 @@ async fn compaction() -> Result<()> {
let config = Arc::new(
Config {
snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
max_applied_log_to_keep: 2,
..Default::default()
}
.validate()?,
);
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

let mut want = 0;
let mut n_logs = 0;

// Assert all nodes are in non-voter state & have no entries.
router.wait_for_log(&btreeset![0], want, None, "empty").await?;
router.wait_for_log(&btreeset![0], n_logs, None, "empty").await?;
router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?;

router.assert_pristine_cluster().await;

tracing::info!("--- initializing cluster");

router.initialize_from_single_node(0).await?;
want += 1;
n_logs += 1;

router.wait_for_log(&btreeset![0], want, None, "init leader").await?;
router.wait_for_log(&btreeset![0], n_logs, None, "init leader").await?;
router.assert_stable_cluster(Some(1), Some(1)).await;

// Send enough requests to the cluster that compaction on the node should be triggered.
// Puts us exactly at the configured snapshot policy threshold.
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
want = snapshot_threshold;
router.client_request_many(0, "0", (snapshot_threshold - n_logs) as usize).await;
n_logs = snapshot_threshold;

router.wait_for_log(&btreeset![0], n_logs, None, "write").await?;
router.assert_stable_cluster(Some(1), Some(n_logs)).await;
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: n_logs }, None, "snapshot").await?;

router.wait_for_log(&btreeset![0], want, None, "write").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
router
.assert_storage_state(
1,
want,
n_logs,
Some(0),
LogId { term: 1, index: want },
Some((want.into(), 1, MembershipConfig {
LogId { term: 1, index: n_logs },
Some((n_logs.into(), 1, MembershipConfig {
members: btreeset![0],
members_after_consensus: None,
})),
)
.await;

// Add a new node and assert that it received the same snapshot.
router.new_raft_node(1).await;
let sto1 = router.new_store(1).await;
sto1.append_to_log(&[&Entry {
log_id: LogId { term: 1, index: 1 },
payload: EntryPayload::Blank,
}])
.await?;

router.new_raft_node_with_sto(1, sto1.clone()).await;
router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter");

tracing::info!("--- add 1 log after snapshot");
{
router.client_request_many(0, "0", 1).await;
n_logs += 1;
}

router.wait_for_log(&btreeset![0, 1], n_logs, None, "add follower").await?;

router.client_request_many(0, "0", 1).await;
want += 1;
tracing::info!("--- logs should be deleted after installing snapshot; left only the last one");
{
let sto = router.get_storage_handle(&1).await?;
let logs = sto.get_log_entries(..).await?;
assert_eq!(1, logs.len());
assert_eq!(LogId { term: 1, index: 51 }, logs[0].log_id)
}

router.wait_for_log(&btreeset![0, 1], want, None, "add follower").await?;
let expected_snap = Some((snapshot_threshold.into(), 1, MembershipConfig {
members: btreeset![0u64],
members_after_consensus: None,
}));
router
.assert_storage_state(
1,
want,
n_logs,
None, /* non-voter does not vote */
LogId { term: 1, index: want },
LogId { term: 1, index: n_logs },
expected_snap,
)
.await;
Expand Down
1 change: 1 addition & 0 deletions async-raft/tests/snapshot_ge_half_threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
let config = Arc::new(
Config {
snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
max_applied_log_to_keep: 6,
..Default::default()
}
.validate()?,
Expand Down
5 changes: 4 additions & 1 deletion async-raft/tests/snapshot_overrides_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
let config = Arc::new(
Config {
snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
max_applied_log_to_keep: 0,
..Default::default()
}
.validate()?,
Expand Down Expand Up @@ -130,11 +131,13 @@ async fn snapshot_overrides_membership() -> Result<()> {
tracing::info!("--- DONE add non-voter");

router.wait_for_log(&btreeset![0, 1], want, timeout(), "add non-voter").await?;
router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?;

let expected_snap = Some((want.into(), 1, MembershipConfig {
members: btreeset![0u64],
members_after_consensus: None,
}));
router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?;

router
.assert_storage_state(
1,
Expand Down
Loading

0 comments on commit 74283fd

Please sign in to comment.