Skip to content

Commit

Permalink
change: Entry: merge term and index to log_id: LogId
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 11, 2021
1 parent 9e4fb64 commit 24e3813
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 103 deletions.
33 changes: 12 additions & 21 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Update;
Expand Down Expand Up @@ -105,12 +104,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
};

// The target entry was found. Compare its term with target term to ensure everything is consistent.
if target_entry.term == msg.prev_log.term {
if target_entry.log_id.term == msg.prev_log.term {
// We've found a point of agreement with the leader. If we have any logs present
// with an index greater than this, then we must delete them per §5.3.
if self.last_log.index > target_entry.index {
if self.last_log.index > target_entry.log_id.index {
self.storage
.delete_logs_from(target_entry.index + 1, None)
.delete_logs_from(target_entry.log_id.index + 1, None)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let membership =
Expand All @@ -131,13 +130,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.get_log_entries(start, msg.prev_log.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log.term) {
Some(entry) => Some(ConflictOpt {
log_id: LogId {
term: entry.term,
index: entry.index,
},
}),
let opt = match old_entries.iter().find(|entry| entry.log_id.term == msg.prev_log.term) {
Some(entry) => Some(ConflictOpt { log_id: entry.log_id }),
None => Some(ConflictOpt { log_id: self.last_log }),
};
if report_metrics {
Expand Down Expand Up @@ -188,10 +182,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// Replicate entries to log (same as append, but in follower mode).
self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_log = LogId {
term: entry.term,
index: entry.index,
};
self.last_log = entry.log_id;
}
Ok(())
}
Expand All @@ -204,7 +195,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn replicate_to_state_machine_if_needed(&mut self, entries: Vec<Entry<D>>) {
// Update cache. Always.
for entry in entries {
self.entries_cache.insert(entry.index, entry);
self.entries_cache.insert(entry.log_id.index, entry);
}
// Perform initial replication to state machine if needed.
if !self.has_completed_initial_replication_to_sm {
Expand All @@ -222,7 +213,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
// If we have no cached entries, then do nothing.
let first_idx = match self.entries_cache.iter().next() {
Some((_, entry)) => entry.index,
Some((_, entry)) => entry.log_id.index,
None => return,
};

Expand All @@ -231,9 +222,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let entries: Vec<_> = (first_idx..=self.commit_index)
.filter_map(|idx| {
if let Some(entry) = self.entries_cache.remove(&idx) {
last_entry_seen = Some(entry.index);
last_entry_seen = Some(entry.log_id.index);
match entry.payload {
EntryPayload::Normal(inner) => Some((entry.index, inner.data)),
EntryPayload::Normal(inner) => Some((entry.log_id.index, inner.data)),
_ => None,
}
} else {
Expand Down Expand Up @@ -285,12 +276,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let mut new_last_applied: Option<u64> = None;
let entries = storage.get_log_entries(start, stop).await?;
if let Some(entry) = entries.last() {
new_last_applied = Some(entry.index);
new_last_applied = Some(entry.log_id.index);
}
let data_entries: Vec<_> = entries
.iter()
.filter_map(|entry| match &entry.payload {
EntryPayload::Normal(inner) => Some((&entry.index, &inner.data)),
EntryPayload::Normal(inner) => Some((&entry.log_id.index, &inner.data)),
_ => None,
})
.collect();
Expand Down
23 changes: 13 additions & 10 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::raft::EntryPayload;
use crate::replication::RaftEvent;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::RaftNetwork;
use crate::RaftStorage;

Expand Down Expand Up @@ -252,16 +253,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self, payload))]
pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload<D>) -> RaftResult<Entry<D>> {
let entry = Entry {
index: self.core.last_log.index + 1,
term: self.core.current_term,
log_id: LogId {
index: self.core.last_log.index + 1,
term: self.core.current_term,
},
payload,
};
self.core
.storage
.append_entry_to_log(&entry)
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
self.core.last_log.index = entry.index;
self.core.last_log.index = entry.log_id.index;
Ok(entry)
}

Expand All @@ -285,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
} else {
// Else, there are no voting nodes for replication, so the payload is now committed.
self.core.commit_index = entry_arc.index;
self.core.commit_index = entry_arc.log_id.index;
self.leader_report_metrics();
self.client_request_post_commit(req).await;
}
Expand All @@ -308,10 +311,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
ClientOrInternalResponseTx::Client(tx) => {
match &req.entry.payload {
EntryPayload::Normal(inner) => {
match self.apply_entry_to_state_machine(&req.entry.index, &inner.data).await {
match self.apply_entry_to_state_machine(&req.entry.log_id.index, &inner.data).await {
Ok(data) => {
let _ = tx.send(Ok(ClientWriteResponse {
index: req.entry.index,
index: req.entry.log_id.index,
data,
}));
}
Expand All @@ -331,9 +334,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
ClientOrInternalResponseTx::Internal(tx) => {
self.core.last_applied = req.entry.index;
self.core.last_applied = req.entry.log_id.index;
self.leader_report_metrics();
let _ = tx.send(Ok(req.entry.index));
let _ = tx.send(Ok(req.entry.log_id.index));
}
}

Expand All @@ -358,12 +361,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.core.last_applied = entry.index;
self.core.last_applied = entry.log_id.index;
}
let data_entries: Vec<_> = entries
.iter()
.filter_map(|entry| match &entry.payload {
EntryPayload::Normal(inner) => Some((&entry.index, &inner.data)),
EntryPayload::Normal(inner) => Some((&entry.log_id.index, &inner.data)),
_ => None,
})
.collect();
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.awaiting_committed
.iter()
.enumerate()
.take_while(|(_idx, elem)| elem.entry.index <= self.core.commit_index)
.take_while(|(_idx, elem)| elem.entry.log_id.index <= self.core.commit_index)
.last()
.map(|(idx, _)| idx);

Expand Down
9 changes: 3 additions & 6 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,8 @@ pub struct ConflictOpt {
/// A Raft log entry.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Entry<D: AppData> {
/// This entry's term.
pub term: u64,
/// This entry's index.
pub index: u64,
pub log_id: LogId,

/// This entry's payload.
#[serde(bound = "D: AppData")]
pub payload: EntryPayload<D>,
Expand All @@ -428,8 +426,7 @@ impl<D: AppData> Entry<D> {
/// latest membership covered by the snapshot.
pub fn new_snapshot_pointer(index: u64, term: u64, id: String, membership: MembershipConfig) -> Self {
Entry {
term,
index,
log_id: LogId { term, index },
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer { id, membership }),
}
}
Expand Down
22 changes: 11 additions & 11 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,24 +264,24 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
return;
}
};
let last_index_and_term = self.outbound_buffer.last().map(|last| (last.as_ref().index, last.as_ref().term));
let last_log_id = self.outbound_buffer.last().map(|last| last.as_ref().log_id);

// Once we've successfully sent a payload of entries, don't send them again.
self.outbound_buffer.clear();

tracing::debug!("append_entries last: {:?}", last_index_and_term);
tracing::debug!("append_entries last: {:?}", last_log_id);

// Handle success conditions.
if res.success {
tracing::debug!("append entries succeeded to {:?}", last_index_and_term);
tracing::debug!("append entries succeeded to {:?}", last_log_id);

// If this was a proper replication event (last index & term were provided), then update state.
if let Some((index, term)) = last_index_and_term {
self.next_index = index + 1; // This should always be the next expected index.
self.matched = (term, index).into();
if let Some(log_id) = last_log_id {
self.next_index = log_id.index + 1; // This should always be the next expected index.
self.matched = log_id;
let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex {
target: self.target,
matched: (term, index).into(),
matched: log_id,
});

// If running at line rate, and our buffered outbound requests have accumulated too
Expand Down Expand Up @@ -338,7 +338,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
.storage
.get_log_entries(conflict.log_id.index, conflict.log_id.index + 1)
.await
.map(|entries| entries.get(0).map(|entry| entry.term))
.map(|entries| entries.get(0).map(|entry| entry.log_id.term))
{
Ok(Some(term)) => {
self.matched.term = term; // If we have the specified log, ensure we use its term.
Expand Down Expand Up @@ -425,7 +425,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re

RaftEvent::Replicate { entry, commit_index } => {
self.commit_index = commit_index;
self.last_log_index = entry.index;
self.last_log_index = entry.log_id.index;
if self.target_state == TargetReplState::LineRate {
self.replication_buffer.push(entry);
}
Expand Down Expand Up @@ -568,8 +568,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.core
.outbound_buffer
.first()
.map(|entry| entry.as_ref().index)
.or_else(|| self.core.replication_buffer.first().map(|entry| entry.index));
.map(|entry| entry.as_ref().log_id.index)
.or_else(|| self.core.replication_buffer.first().map(|entry| entry.log_id.index));

// When converting to `LaggingState`, `outbound_buffer` and `replication_buffer` is cleared,
// in which there may be uncommitted logs.
Expand Down
6 changes: 2 additions & 4 deletions async-raft/tests/conflict_with_empty_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,11 @@ async fn conflict_with_empty_entries() -> Result<()> {
prev_log: LogId { term: 1, index: 0 },
entries: vec![
Entry {
term: 1,
index: 1,
log_id: (1, 1).into(),
payload: EntryPayload::Blank,
},
Entry {
term: 1,
index: 2,
log_id: (1, 2).into(),
payload: EntryPayload::Normal(EntryNormal {
data: ClientRequest {
client: "foo".to_string(),
Expand Down
22 changes: 9 additions & 13 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use async_raft::storage::HardState;
use async_raft::storage::InitialState;
use async_raft::AppData;
use async_raft::AppDataResponse;
use async_raft::LogId;
use async_raft::NodeId;
use async_raft::RaftStorage;
use async_raft::SnapshotId;
Expand Down Expand Up @@ -188,16 +187,13 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
let sm = self.sm.read().await;
match &mut *hs {
Some(inner) => {
let (last_log_index, last_log_term) = match log.values().rev().next() {
Some(log) => (log.index, log.term),
None => (0, 0),
let last_log_id = match log.values().rev().next() {
Some(log) => log.log_id,
None => (0, 0).into(),
};
let last_applied_log = sm.last_applied_log;
Ok(InitialState {
last_log: LogId {
term: last_log_term,
index: last_log_index,
},
last_log: last_log_id,
last_applied_log,
hard_state: inner.clone(),
membership,
Expand Down Expand Up @@ -251,15 +247,15 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
#[tracing::instrument(level = "trace", skip(self, entry))]
async fn append_entry_to_log(&self, entry: &Entry<ClientRequest>) -> Result<()> {
let mut log = self.log.write().await;
log.insert(entry.index, entry.clone());
log.insert(entry.log_id.index, entry.clone());
Ok(())
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn replicate_to_log(&self, entries: &[Entry<ClientRequest>]) -> Result<()> {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.index, entry.clone());
log.insert(entry.log_id.index, entry.clone());
}
Ok(())
}
Expand Down Expand Up @@ -311,7 +307,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
membership_config = log
.values()
.rev()
.skip_while(|entry| entry.index > last_applied_log)
.skip_while(|entry| entry.log_id.index > last_applied_log)
.find_map(|entry| match &entry.payload {
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
_ => None,
Expand All @@ -333,7 +329,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
let mut current_snapshot = self.current_snapshot.write().await;
term = log
.get(&last_applied_log)
.map(|entry| entry.term)
.map(|entry| entry.log_id.term)
.ok_or_else(|| anyhow::anyhow!(ERR_INCONSISTENT_LOG))?;
*log = log.split_off(&last_applied_log);
log.insert(
Expand Down Expand Up @@ -402,7 +398,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
let membership_config = log
.values()
.rev()
.skip_while(|entry| entry.index > index)
.skip_while(|entry| entry.log_id.index > index)
.find_map(|entry| match &entry.payload {
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
_ => None,
Expand Down
Loading

0 comments on commit 24e3813

Please sign in to comment.