Skip to content

Commit

Permalink
fix: RaftCore.entries_cache is inconsistent with storage. removed it.
Browse files Browse the repository at this point in the history
- When leader changes, `entries_cache` is cleared.
  Thus there may be cached entries wont be applied to state machine.

- When applying finished, the applied entries are not removed from the
  cache.
  Thus there could be entries being applied more than once.
  • Loading branch information
drmingdrmer committed Aug 22, 2021
1 parent eee8e53 commit 8cd24ba
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 72 deletions.
80 changes: 46 additions & 34 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::MessageSummary;
use crate::RaftError;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Update;
Expand Down Expand Up @@ -61,15 +62,19 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_id.index == u64::min_value();
let msg_index_and_term_match =
(msg.prev_log_id.index == self.last_log_id.index) && (msg.prev_log_id.term == self.last_log_id.term);
let msg_prev_index_is_min = msg.prev_log_id.index == u64::MIN;
let msg_index_and_term_match = msg.prev_log_id == self.last_log_id;

if msg_prev_index_is_min || msg_index_and_term_match {
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if !msg.entries.is_empty() {
self.append_log_entries(&msg.entries).await?;
}
self.replicate_to_state_machine_if_needed().await?;

if report_metrics {
self.report_metrics(Update::Ignore);
}

return Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
Expand Down Expand Up @@ -154,7 +159,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::debug!("end log consistency check");

self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
self.replicate_to_state_machine_if_needed().await?;
if report_metrics {
self.report_metrics(Update::Ignore);
}
Expand Down Expand Up @@ -196,53 +201,58 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
///
/// Very importantly, this routine must not block the main control loop main task, else it
/// may cause the Raft leader to timeout the requests to this node.
#[tracing::instrument(level = "trace", skip(self, entries))]
async fn replicate_to_state_machine_if_needed(&mut self, entries: Vec<Entry<D>>) {
// Update cache. Always.
for entry in entries {
self.entries_cache.insert(entry.log_id.index, entry);
}
#[tracing::instrument(level = "trace", skip(self))]
async fn replicate_to_state_machine_if_needed(&mut self) -> Result<(), RaftError> {
tracing::debug!("replicate_to_sm_if_needed: last_applied: {}", self.last_applied,);

// Perform initial replication to state machine if needed.
if !self.has_completed_initial_replication_to_sm {
// Optimistic update, as failures will cause shutdown.
self.has_completed_initial_replication_to_sm = true;
return self.initial_replicate_to_state_machine().await;
self.initial_replicate_to_state_machine().await;
return Ok(());
}

// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
return;
tracing::debug!("replicate_to_sm_handle is not empty, return");
return Ok(());
}

// If we don't have any new entries to replicate, then do nothing.
if self.commit_index <= self.last_applied.index {
return;
tracing::debug!(
"commit_index({}) <= last_applied({}), return",
self.commit_index,
self.last_applied
);
return Ok(());
}
// If we have no cached entries, then do nothing.
let first_idx = match self.entries_cache.iter().next() {
Some((_, entry)) => entry.log_id.index,
None => return,
};

// Drain entries from the beginning of the cache up to commit index.
let mut last_entry_seen: Option<LogId> = None;
let entries: Vec<_> = (first_idx..=self.commit_index)
.filter_map(|idx| {
if let Some(entry) = self.entries_cache.remove(&idx) {
last_entry_seen = Some(entry.log_id);
Some(entry)
} else {
None
}
})
.collect();

// TODO(xp): logs in storage must be consecutive.
let entries = self
.storage
.get_log_entries(self.last_applied.index + 1, self.commit_index + 1)
.await
.map_err(|e| self.map_fatal_storage_error(e))?;

let last_log_id = entries.last().map(|x| x.log_id);

tracing::debug!("entries: {:?}", entries.iter().map(|x| x.log_id).collect::<Vec<_>>());
tracing::debug!(?last_log_id);

// If we have no data entries to apply, then do nothing.
if entries.is_empty() {
if let Some(log_id) = last_entry_seen {
if let Some(log_id) = last_log_id {
self.last_applied = log_id;
self.report_metrics(Update::Ignore);
}
return;
tracing::debug!("entries is empty, return");
return Ok(());
}

// Spawn task to replicate these entries to the state machine.
// Linearizability is guaranteed by `replicate_to_sm_handle`, which is the mechanism used
// to ensure that only a single task can replicate data to the state machine, and that is
Expand All @@ -254,11 +264,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// interface a bit before 1.0.
let entries_refs: Vec<_> = entries.iter().collect();
storage.replicate_to_state_machine(&entries_refs).await?;
Ok(last_entry_seen)
Ok(last_log_id)
}
.instrument(tracing::debug_span!("spawn")),
);
self.replicate_to_sm_handle.push(handle);

Ok(())
}

/// Perform an initial replication of outstanding entries to the state machine.
Expand Down
15 changes: 4 additions & 11 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::metrics::RaftMetrics;
use crate::raft::ClientReadResponseTx;
use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponseTx;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::MembershipConfig;
use crate::raft::RaftMsg;
Expand Down Expand Up @@ -121,14 +120,6 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// This is primarily used in making a determination on when a compaction job needs to be triggered.
snapshot_last_log_id: LogId,

/// A cache of entries which are waiting to be replicated to the state machine.
///
/// It is important to note that this cache must only be populated from the AppendEntries RPC
/// handler, as these values must only ever represent the entries which have been sent from
/// the current cluster leader.
///
/// Whenever there is a leadership change, this cache will be cleared.
entries_cache: BTreeMap<u64, Entry<D>>,
/// The stream of join handles from state machine replication tasks. There will only ever be
/// a maximum of 1 element at a time.
///
Expand Down Expand Up @@ -180,7 +171,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_log_id: LogId { term: 0, index: 0 },
snapshot_state: None,
snapshot_last_log_id: LogId { term: 0, index: 0 },
entries_cache: Default::default(),
replicate_to_sm_handle: FuturesOrdered::new(),
has_completed_initial_replication_to_sm: false,
last_heartbeat: None,
Expand Down Expand Up @@ -353,7 +343,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update the value of the `current_leader` property.
#[tracing::instrument(level = "trace", skip(self))]
fn update_current_leader(&mut self, update: UpdateCurrentLeader) {
self.entries_cache.clear();
match update {
UpdateCurrentLeader::ThisNode => {
self.current_leader = Some(self.id);
Expand Down Expand Up @@ -479,9 +468,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
#[tracing::instrument(level = "trace", skip(self, res))]
pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result<Option<LogId>>) -> RaftResult<()> {
let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?;

tracing::debug!("last_applied:{:?}", last_applied_opt);

if let Some(last_applied) = last_applied_opt {
self.last_applied = last_applied;
}

self.report_metrics(Update::Ignore);
self.trigger_log_compaction_if_needed(false);
Ok(())
Expand Down
15 changes: 2 additions & 13 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::config::SnapshotPolicy;
use crate::error::RaftResult;
use crate::raft::AppendEntriesRequest;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::InstallSnapshotRequest;
use crate::storage::CurrentSnapshotData;
use crate::AppData;
Expand Down Expand Up @@ -684,12 +683,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return;
}
};
for entry in entries.iter() {
if let EntryPayload::SnapshotPointer(_) = entry.payload {
self.replication_core.target_state = TargetReplState::Snapshotting;
return;
}
}

// Prepend.
self.replication_core.outbound_buffer.reverse();
self.replication_core.outbound_buffer.extend(entries.into_iter().rev().map(OutboundEntry::Raw));
Expand Down Expand Up @@ -795,12 +789,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return;
}
};
for entry in entries.iter() {
if let EntryPayload::SnapshotPointer(_) = entry.payload {
self.replication_core.target_state = TargetReplState::Snapshotting;
return;
}
}

self.replication_core.outbound_buffer.extend(entries.into_iter().map(OutboundEntry::Raw));
}
}
Expand Down
23 changes: 18 additions & 5 deletions async-raft/tests/snapshot_uses_prev_snap_membership.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
Expand Down Expand Up @@ -62,17 +63,21 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
router.wait_for_log(&btreeset![0, 1], want, None, "cluster of 2").await?;
}

let sto = router.get_storage_handle(&0).await?;
let sto0 = router.get_storage_handle(&0).await?;

tracing::info!("--- send just enough logs to trigger snapshot");
{
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
want = snapshot_threshold;

router.wait_for_log(&btreeset![0, 1], want, None, "send log to trigger snapshot").await?;
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
router.wait_for_log(&btreeset![0, 1], want, to(), "send log to trigger snapshot").await?;
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, to(), "snapshot").await?;

let m = sto.get_membership_config().await?;
{
let logs = sto0.get_log().await;
assert_eq!(1, logs.len(), "only one snapshot pointer log");
}
let m = sto0.get_membership_config().await?;
assert_eq!(
MembershipConfig {
members: btreeset![0, 1],
Expand Down Expand Up @@ -111,7 +116,11 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {

tracing::info!("--- check membership");
{
let m = sto.get_membership_config().await?;
{
let logs = sto0.get_log().await;
assert_eq!(1, logs.len(), "only one snapshot pointer log");
}
let m = sto0.get_membership_config().await?;
assert_eq!(
MembershipConfig {
members: btreeset![0, 1],
Expand All @@ -124,3 +133,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {

Ok(())
}

fn to() -> Option<Duration> {
Some(Duration::from_millis(500))
}
32 changes: 23 additions & 9 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,21 @@ impl MemStore {
/// Go backwards through the log to find the most recent membership config <= `upto_index`.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn get_membership_from_log(&self, upto_index: Option<u64>) -> Result<MembershipConfig> {
let log = self.log.read().await;

let reversed_logs = log.values().rev();
let membership = match upto_index {
Some(upto) => {
let skipped = reversed_logs.skip_while(|entry| entry.log_id.index > upto);
Self::find_first_membership_log(skipped)
let membership = {
let log = self.log.read().await;

let reversed_logs = log.values().rev();
match upto_index {
Some(upto) => {
let skipped = reversed_logs.skip_while(|entry| entry.log_id.index > upto);
Self::find_first_membership_log(skipped)
}
None => Self::find_first_membership_log(reversed_logs),
}
None => Self::find_first_membership_log(reversed_logs),
};

// Otherwise, create a default one.

Ok(match membership {
Some(cfg) => cfg,
None => MembershipConfig::new_initial(self.id),
Expand Down Expand Up @@ -282,6 +287,10 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
#[tracing::instrument(level = "trace", skip(self, entry))]
async fn apply_entry_to_state_machine(&self, entry: &Entry<ClientRequest>) -> Result<ClientResponse> {
let mut sm = self.sm.write().await;

tracing::debug!("id:{} apply to sm index:{}", self.id, entry.log_id.index);
assert_eq!(sm.last_applied_log.index + 1, entry.log_id.index);

sm.last_applied_log = entry.log_id;

return match entry.payload {
Expand Down Expand Up @@ -309,6 +318,11 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
async fn replicate_to_state_machine(&self, entries: &[&Entry<ClientRequest>]) -> Result<()> {
let mut sm = self.sm.write().await;
for entry in entries {
tracing::debug!("id:{} replicate to sm index:{}", self.id, entry.log_id.index);

// TODO(xp) return error if there is out of order apply
assert_eq!(sm.last_applied_log.index + 1, entry.log_id.index);

sm.last_applied_log = entry.log_id;

match entry.payload {
Expand Down Expand Up @@ -356,7 +370,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
{
let mut log = self.log.write().await;
let mut current_snapshot = self.current_snapshot.write().await;
*log = log.split_off(&last_applied_log.index);
*log = log.split_off(&(last_applied_log.index + 1));

let snapshot_id = format!("{}-{}-{}", last_applied_log.term, last_applied_log.index, snapshot_idx);

Expand Down

0 comments on commit 8cd24ba

Please sign in to comment.