From eed681d57950fc58b6ca71a45814b8f6d2bb1223 Mon Sep 17 00:00:00 2001 From: drdr xp Date: Wed, 1 Sep 2021 20:54:17 +0800 Subject: [PATCH] fix: race condition of concurrent snapshot-install and apply. Problem: Concurrent snapshot-install and apply mess up `last_applied`. `finalize_snapshot_installation` runs in the `RaftCore` thread. `apply_to_state_machine` runs in a separate tokio task(thread). Thus there is chance the `last_applied` being reset to a previous value: - `apply_to_state_machine` is called and finished in a thread. - `finalize_snapshot_installation` is called in `RaftCore` thread and finished with `last_applied` updated. - `RaftCore` thread finished waiting for `apply_to_state_machine`, and updated `last_applied` to a previous value. ``` RaftCore: -. install-snapshot, .-> replicate_to_sm_handle.next(), | update last_applied=5 | update last_applied=2 | | v | task: apply 2------------------------' --------------------------------------------------------------------> time ``` Solution: Rule: All changes to state machine must be serialized. A temporary simple solution for now is to call all methods that modify state machine in `RaftCore` thread. But this way it blocks `RaftCore` thread. A better way is to move all tasks that modifies state machine to a standalone thread, and send update request back to `RaftCore` to update its fields such as `last_applied` --- async-raft/src/core/append_entries.rs | 94 ++++++------------- async-raft/src/core/client.rs | 12 +-- async-raft/src/core/install_snapshot.rs | 53 +++++++++-- async-raft/src/core/mod.rs | 44 --------- async-raft/src/lib.rs | 1 + async-raft/src/raft_types.rs | 8 ++ async-raft/src/storage.rs | 3 +- .../tests/snapshot_overrides_membership.rs | 2 +- memstore/src/lib.rs | 8 +- 9 files changed, 94 insertions(+), 131 deletions(-) diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index f3278be95..1a6f55397 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -1,5 +1,3 @@ -use tracing::Instrument; - use crate::core::RaftCore; use crate::core::State; use crate::core::UpdateCurrentLeader; @@ -27,7 +25,7 @@ impl, S: RaftStorage> Ra &mut self, msg: AppendEntriesRequest, ) -> RaftResult { - tracing::debug!(%self.last_log_id); + tracing::debug!(%self.last_log_id, %self.last_applied); let mut msg_entries = msg.entries.as_slice(); let mut prev_log_id = msg.prev_log_id; @@ -323,7 +321,7 @@ impl, S: RaftStorage> Ra /// /// Very importantly, this routine must not block the main control loop main task, else it /// may cause the Raft leader to timeout the requests to this node. - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = "debug", skip(self))] async fn replicate_to_state_machine_if_needed(&mut self) -> Result<(), RaftError> { tracing::debug!("replicate_to_sm_if_needed: last_applied: {}", self.last_applied,); @@ -331,13 +329,7 @@ impl, S: RaftStorage> Ra if !self.has_completed_initial_replication_to_sm { // Optimistic update, as failures will cause shutdown. self.has_completed_initial_replication_to_sm = true; - self.initial_replicate_to_state_machine().await; - return Ok(()); - } - - // If we already have an active replication task, then do nothing. - if !self.replicate_to_sm_handle.is_empty() { - tracing::debug!("replicate_to_sm_handle is not empty, return"); + self.initial_replicate_to_state_machine().await?; return Ok(()); } @@ -353,44 +345,27 @@ impl, S: RaftStorage> Ra // Drain entries from the beginning of the cache up to commit index. - // TODO(xp): logs in storage must be consecutive. let entries = self .storage .get_log_entries(self.last_applied.index + 1..=self.commit_index) .await .map_err(|e| self.map_fatal_storage_error(e))?; - let last_log_id = entries.last().map(|x| x.log_id); + let last_log_id = entries.last().map(|x| x.log_id).unwrap(); - tracing::debug!("entries: {:?}", entries.iter().map(|x| x.log_id).collect::>()); + tracing::debug!("entries: {}", entries.as_slice().summary()); tracing::debug!(?last_log_id); - // If we have no data entries to apply, then do nothing. - if entries.is_empty() { - if let Some(log_id) = last_log_id { - self.last_applied = log_id; - self.report_metrics(Update::Ignore); - } - tracing::debug!("entries is empty, return"); - return Ok(()); - } + let entries_refs: Vec<_> = entries.iter().collect(); + self.storage + .apply_to_state_machine(&entries_refs) + .await + .map_err(|e| self.map_fatal_storage_error(e))?; - // Spawn task to replicate these entries to the state machine. - // Linearizability is guaranteed by `replicate_to_sm_handle`, which is the mechanism used - // to ensure that only a single task can replicate data to the state machine, and that is - // owned by a single task, not shared between multiple threads/tasks. - let storage = self.storage.clone(); - let handle = tokio::spawn( - async move { - // Create a new vector of references to the entries data ... might have to change this - // interface a bit before 1.0. - let entries_refs: Vec<_> = entries.iter().collect(); - storage.apply_to_state_machine(&entries_refs).await?; - Ok(last_log_id) - } - .instrument(tracing::debug_span!("spawn")), - ); - self.replicate_to_sm_handle.push(handle); + self.last_applied = last_log_id; + + self.report_metrics(Update::Ignore); + self.trigger_log_compaction_if_needed(false); Ok(()) } @@ -399,42 +374,33 @@ impl, S: RaftStorage> Ra /// /// This will only be executed once, and only in response to its first payload of entries /// from the AppendEntries RPC handler. - #[tracing::instrument(level = "trace", skip(self))] - async fn initial_replicate_to_state_machine(&mut self) { + #[tracing::instrument(level = "debug", skip(self))] + async fn initial_replicate_to_state_machine(&mut self) -> Result<(), RaftError> { let stop = std::cmp::min(self.commit_index, self.last_log_id.index) + 1; let start = self.last_applied.index + 1; let storage = self.storage.clone(); - // If we already have an active replication task, then do nothing. - if !self.replicate_to_sm_handle.is_empty() { - return; - } - tracing::debug!(start, stop, self.commit_index, %self.last_log_id, "start stop"); // when self.commit_index is not initialized, e.g. the first heartbeat from leader always has a commit_index to // be 0, because the leader needs one round of heartbeat to find out the commit index. if start >= stop { - return; + return Ok(()); } // Fetch the series of entries which must be applied to the state machine, then apply them. - let handle = tokio::spawn( - async move { - let mut new_last_applied: Option = None; - let entries = storage.get_log_entries(start..stop).await?; - if let Some(entry) = entries.last() { - new_last_applied = Some(entry.log_id); - } - let data_entries: Vec<_> = entries.iter().collect(); - if data_entries.is_empty() { - return Ok(new_last_applied); - } - storage.apply_to_state_machine(&data_entries).await?; - Ok(new_last_applied) - } - .instrument(tracing::debug_span!("spawn-init-replicate-to-sm")), - ); - self.replicate_to_sm_handle.push(handle); + + let entries = storage.get_log_entries(start..stop).await.map_err(|e| self.map_fatal_storage_error(e))?; + + let new_last_applied = entries.last().unwrap(); + + let data_entries: Vec<_> = entries.iter().collect(); + storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_fatal_storage_error(e))?; + + self.last_applied = new_last_applied.log_id; + self.report_metrics(Update::Ignore); + self.trigger_log_compaction_if_needed(false); + + Ok(()) } } diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index b1ddb8003..bcf095513 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -339,7 +339,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Handle the post-commit logic for a client request. - #[tracing::instrument(level = "trace", skip(self, req))] + #[tracing::instrument(level = "debug", skip(self, req))] pub(super) async fn client_request_post_commit(&mut self, req: ClientRequestEntry) { let entry = &req.entry; @@ -419,7 +419,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Apply the given log entry to the state machine. - #[tracing::instrument(level = "trace", skip(self, entry))] + #[tracing::instrument(level = "debug", skip(self, entry))] pub(super) async fn apply_entry_to_state_machine(&mut self, entry: &Entry) -> RaftResult { // First, we just ensure that we apply any outstanding up to, but not including, the index // of the given entry. We need to be able to return the data response from applying this @@ -453,14 +453,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } - // Before we can safely apply this entry to the state machine, we need to ensure there is - // no pending task to replicate entries to the state machine. This is edge case, and would only - // happen once very early in a new leader's term. - if !self.core.replicate_to_sm_handle.is_empty() { - if let Some(Ok(replicate_to_sm_result)) = self.core.replicate_to_sm_handle.next().await { - self.core.handle_replicate_to_sm_result(replicate_to_sm_result)?; - } - } // Apply this entry to the state machine and return its data response. let res = self.core.storage.apply_to_state_machine(&[entry]).await.map_err(|err| { if err.downcast_ref::().is_some() { diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 663207a43..a38420c26 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -181,17 +181,52 @@ impl, S: RaftStorage> Ra ) -> RaftResult<()> { snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?; - self.storage + // Caveat: All changes to state machine must be serialized + // + // If `finalize_snapshot_installation` is run in RaftCore thread, + // there is chance the last_applied being reset to a previous value: + // + // ``` + // RaftCore: -. install-snapc, .-> replicate_to_sm_handle.next(), + // | update last_applied=5 | update last_applied=2 + // | | + // v | + // task: apply 2------------------------' + // --------------------------------------------------------------------> time + // ``` + + let changes = self + .storage .finalize_snapshot_installation(&req.meta, snapshot) .await - .map_err(|err| self.map_fatal_storage_error(err))?; - - let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; - self.update_membership(membership)?; - self.last_log_id = req.meta.last_log_id; - self.last_applied = req.meta.last_log_id; - self.snapshot_last_log_id = req.meta.last_log_id; - self.report_metrics(Update::Ignore); + .map_err(|e| self.map_fatal_storage_error(e))?; + + tracing::debug!("update after apply or install-snapshot: {:?}", changes); + + // After installing snapshot, no inconsistent log is removed. + // This does not affect raft consistency. + // If you have any question about this, let me know: drdr.xp at gmail.com + + if let Some(last_applied) = changes.last_applied { + // snapshot is installed + self.last_applied = last_applied; + + if self.last_log_id < self.last_applied { + self.last_log_id = self.last_applied; + } + + // There could be unknown membership in the snapshot. + let membership = + self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; + + self.update_membership(membership)?; + + self.snapshot_last_log_id = self.last_applied; + self.report_metrics(Update::Ignore); + } else { + // snapshot not installed + } + Ok(()) } } diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index faf3cb4cb..40553dcb6 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -14,8 +14,6 @@ use std::sync::Arc; use futures::future::AbortHandle; use futures::future::Abortable; -use futures::stream::FuturesOrdered; -use futures::stream::StreamExt; use serde::Deserialize; use serde::Serialize; use tokio::sync::broadcast; @@ -120,13 +118,6 @@ pub struct RaftCore, S: RaftSt /// This is primarily used in making a determination on when a compaction job needs to be triggered. snapshot_last_log_id: LogId, - /// The stream of join handles from state machine replication tasks. There will only ever be - /// a maximum of 1 element at a time. - /// - /// This abstraction is needed to ensure that replicating to the state machine does not block - /// the AppendEntries RPC flow, and to ensure that we have a smooth transition to becoming - /// leader without concern over duplicate application of entries to the state machine. - replicate_to_sm_handle: FuturesOrdered>>>, /// A bool indicating if this system has performed its initial replication of /// outstanding entries to the state machine. has_completed_initial_replication_to_sm: bool, @@ -171,7 +162,6 @@ impl, S: RaftStorage> Ra last_log_id: LogId { term: 0, index: 0 }, snapshot_state: None, snapshot_last_log_id: LogId { term: 0, index: 0 }, - replicate_to_sm_handle: FuturesOrdered::new(), has_completed_initial_replication_to_sm: false, last_heartbeat: None, next_election_timeout: None, @@ -464,22 +454,6 @@ impl, S: RaftStorage> Ra ); } - /// Handle the output of an async task replicating entries to the state machine. - #[tracing::instrument(level = "trace", skip(self, res))] - pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result>) -> RaftResult<()> { - let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?; - - tracing::debug!("last_applied:{:?}", last_applied_opt); - - if let Some(last_applied) = last_applied_opt { - self.last_applied = last_applied; - } - - self.report_metrics(Update::Ignore); - self.trigger_log_compaction_if_needed(false); - Ok(()) - } - /// Reject an init config request due to the Raft node being in a state which prohibits the request. #[tracing::instrument(level = "trace", skip(self, tx))] fn reject_init_with_config(&self, tx: oneshot::Sender>) { @@ -731,12 +705,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let _ent = span.enter(); self.handle_replica_event(event).await; } - Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => { - tracing::info!("leader recv from replicate_to_sm_handle: {:?}", repl_sm_result); - - // Errors herein will trigger shutdown, so no need to process error. - let _ = self.core.handle_replicate_to_sm_result(repl_sm_result); - } Ok(_) = &mut self.core.rx_shutdown => { tracing::info!("leader recv from rx_shudown"); self.core.set_target_state(State::Shutdown); @@ -909,10 +877,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } }, Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update), - Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => { - // Errors herein will trigger shutdown, so no need to process error. - let _ = self.core.handle_replicate_to_sm_result(repl_sm_result); - } Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown), } } @@ -980,10 +944,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } }, Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update), - Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => { - // Errors herein will trigger shutdown, so no need to process error. - let _ = self.core.handle_replicate_to_sm_result(repl_sm_result); - } Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown), } } @@ -1047,10 +1007,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } }, Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update), - Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => { - // Errors herein will trigger shutdown, so no need to process error. - let _ = self.core.handle_replicate_to_sm_result(repl_sm_result); - } Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown), } } diff --git a/async-raft/src/lib.rs b/async-raft/src/lib.rs index e198af61b..8d177bead 100644 --- a/async-raft/src/lib.rs +++ b/async-raft/src/lib.rs @@ -33,6 +33,7 @@ pub use crate::raft::Raft; pub use crate::raft_types::LogId; pub use crate::raft_types::SnapshotId; pub use crate::raft_types::SnapshotSegmentId; +pub use crate::raft_types::StateMachineChanges; pub use crate::raft_types::Update; pub use crate::replication::ReplicationMetrics; pub use crate::storage::RaftStorage; diff --git a/async-raft/src/raft_types.rs b/async-raft/src/raft_types.rs index f4ef8d65f..f07f51bb8 100644 --- a/async-raft/src/raft_types.rs +++ b/async-raft/src/raft_types.rs @@ -55,3 +55,11 @@ pub enum Update { Update(T), Ignore, } + +/// The changes of a state machine. +/// E.g. when applying a log to state machine, or installing a state machine from snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateMachineChanges { + pub last_applied: Option, + pub is_snapshot: bool, +} diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 53312851a..6c1f2a8be 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -15,6 +15,7 @@ use tokio::io::AsyncWrite; use crate::raft::Entry; use crate::raft::MembershipConfig; use crate::raft_types::SnapshotId; +use crate::raft_types::StateMachineChanges; use crate::AppData; use crate::AppDataResponse; use crate::LogId; @@ -269,7 +270,7 @@ where &self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result<()>; + ) -> Result; /// Get a readable handle to the current snapshot, along with its metadata. /// diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index 19f011d89..b32138fa5 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -29,7 +29,7 @@ mod fixtures; /// /// export RUST_LOG=async_raft,memstore,snapshot_overrides_membership=trace /// cargo test -p async-raft --test snapshot_overrides_membership -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] async fn snapshot_overrides_membership() -> Result<()> { let (_log_guard, ut_span) = init_ut!(); let _ent = ut_span.enter(); diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index db3f7728d..f755745aa 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -28,6 +28,7 @@ use async_raft::NodeId; use async_raft::RaftStorage; use async_raft::RaftStorageDebug; use async_raft::SnapshotMeta; +use async_raft::StateMachineChanges; use serde::Deserialize; use serde::Serialize; use thiserror::Error; @@ -747,7 +748,7 @@ impl RaftStorage for MemStore { &self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result<()> { + ) -> Result { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -787,7 +788,10 @@ impl RaftStorage for MemStore { // Update current snapshot. let mut current_snapshot = self.current_snapshot.write().await; *current_snapshot = Some(new_snapshot); - Ok(()) + Ok(StateMachineChanges { + last_applied: Some(meta.last_log_id), + is_snapshot: true, + }) } #[tracing::instrument(level = "trace", skip(self))]