diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index 036182b67..ea38d58e1 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -31,8 +31,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage &mut self, mut members: HashSet, ) -> Result<(), InitializeError> { - if self.core.last_log_index != 0 || self.core.current_term != 0 { - tracing::error!({self.core.last_log_index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0"); + if self.core.last_log.index != 0 || self.core.current_term != 0 { + tracing::error!({self.core.last_log.index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0"); return Err(InitializeError::NotAllowed); } diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 4b055df22..8e59a34be 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -9,6 +9,7 @@ use crate::raft::Entry; use crate::raft::EntryPayload; use crate::AppData; use crate::AppDataResponse; +use crate::LogId; use crate::RaftNetwork; use crate::RaftStorage; use crate::Update; @@ -62,7 +63,7 @@ impl, S: RaftStorage> Ra // log info, then replication is g2g. let msg_prev_index_is_min = msg.prev_log_index == u64::min_value(); let msg_index_and_term_match = - (msg.prev_log_index == self.last_log_index) && (msg.prev_log_term == self.last_log_term); + (msg.prev_log_index == self.last_log.index) && (msg.prev_log_term == self.last_log.term); if msg_prev_index_is_min || msg_index_and_term_match { self.append_log_entries(&msg.entries).await?; self.replicate_to_state_machine_if_needed(msg.entries).await; @@ -99,8 +100,8 @@ impl, S: RaftStorage> Ra term: self.current_term, success: false, conflict_opt: Some(ConflictOpt { - term: self.last_log_term, - index: self.last_log_index, + term: self.last_log.term, + index: self.last_log.index, }), }); } @@ -110,7 +111,7 @@ impl, S: RaftStorage> Ra if target_entry.term == msg.prev_log_term { // We've found a point of agreement with the leader. If we have any logs present // with an index greater than this, then we must delete them per §5.3. - if self.last_log_index > target_entry.index { + if self.last_log.index > target_entry.index { self.storage .delete_logs_from(target_entry.index + 1, None) .await @@ -139,8 +140,8 @@ impl, S: RaftStorage> Ra index: entry.index, }), None => Some(ConflictOpt { - term: self.last_log_term, - index: self.last_log_index, + term: self.last_log.term, + index: self.last_log.index, }), }; if report_metrics { @@ -191,8 +192,10 @@ impl, S: RaftStorage> Ra // Replicate entries to log (same as append, but in follower mode). self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?; if let Some(entry) = entries.last() { - self.last_log_index = entry.index; - self.last_log_term = entry.term; + self.last_log = LogId { + term: entry.term, + index: entry.index, + }; } Ok(()) } @@ -272,7 +275,7 @@ impl, S: RaftStorage> Ra /// from the AppendEntries RPC handler. #[tracing::instrument(level = "trace", skip(self))] async fn initial_replicate_to_state_machine(&mut self) { - let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1; + let stop = std::cmp::min(self.commit_index, self.last_log.index) + 1; let start = self.last_applied + 1; let storage = self.storage.clone(); diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 9c8f280d3..4a194239d 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -62,7 +62,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage pub(super) async fn commit_initial_leader_entry(&mut self) -> RaftResult<()> { // If the cluster has just formed, and the current index is 0, then commit the current // config, else a blank payload. - let req: ClientWriteRequest = if self.core.last_log_index == 0 { + let req: ClientWriteRequest = if self.core.last_log.index == 0 { ClientWriteRequest::new_config(self.core.membership.clone()) } else { ClientWriteRequest::new_blank_payload() @@ -71,8 +71,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Check to see if we have any config change logs newer than our commit index. If so, then // we need to drive the commitment of the config change to the cluster. let mut pending_config = None; // The inner bool represents `is_in_joint_consensus`. - if self.core.last_log_index > self.core.commit_index { - let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log_index + 1); + if self.core.last_log.index > self.core.commit_index { + let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log.index + 1); pending_config = self .core .storage @@ -93,7 +93,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Commit the initial payload to the cluster. let (tx_payload_committed, rx_payload_committed) = oneshot::channel(); let entry = self.append_payload_to_log(req.entry).await?; - self.core.last_log_term = self.core.current_term; // This only ever needs to be updated once per term. + self.core.last_log.term = self.core.current_term; // This only ever needs to be updated once per term. let cr_entry = ClientRequestEntry::from_entry(entry, tx_payload_committed); self.replicate_client_request(cr_entry).await; self.leader_report_metrics(); @@ -253,7 +253,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "trace", skip(self, payload))] pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload) -> RaftResult> { let entry = Entry { - index: self.core.last_log_index + 1, + index: self.core.last_log.index + 1, term: self.core.current_term, payload, }; @@ -262,7 +262,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .append_entry_to_log(&entry) .await .map_err(|err| self.core.map_fatal_storage_error(err))?; - self.core.last_log_index = entry.index; + self.core.last_log.index = entry.index; Ok(entry) } diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 118bd0a01..03a567263 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -165,7 +165,7 @@ impl, S: RaftStorage> Ra mut snapshot: Box, ) -> RaftResult<()> { snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?; - let delete_through = if self.last_log_index > req.last_included.index { + let delete_through = if self.last_log.index > req.last_included.index { Some(req.last_included.index) } else { None @@ -182,8 +182,7 @@ impl, S: RaftStorage> Ra .map_err(|err| self.map_fatal_storage_error(err))?; let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; self.update_membership(membership)?; - self.last_log_index = req.last_included.index; - self.last_log_term = req.last_included.term; + self.last_log = req.last_included; self.last_applied = req.last_included.index; self.snapshot_last_included = req.last_included; self.report_metrics(Update::Ignore); diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index b392ca2d3..be7acfa7e 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -105,10 +105,8 @@ pub struct RaftCore, S: RaftSt /// first-come-first-served basis. See §5.4.1 for additional restriction on votes. voted_for: Option, - /// The index of the last entry to be appended to the log. - last_log_index: u64, - /// The term of the last entry to be appended to the log. - last_log_term: u64, + /// The last entry to be appended to the log. + last_log: LogId, /// The node's current snapshot state. snapshot_state: Option>, @@ -174,8 +172,7 @@ impl, S: RaftStorage> Ra current_term: 0, current_leader: None, voted_for: None, - last_log_index: 0, - last_log_term: 0, + last_log: LogId { term: 0, index: 0 }, snapshot_state: None, snapshot_last_included: LogId { term: 0, index: 0 }, entries_cache: Default::default(), @@ -197,8 +194,8 @@ impl, S: RaftStorage> Ra async fn main(mut self) -> RaftResult<()> { tracing::trace!("raft node is initializing"); let state = self.storage.get_initial_state().await.map_err(|err| self.map_fatal_storage_error(err))?; - self.last_log_index = state.last_log_index; - self.last_log_term = state.last_log_term; + self.last_log.index = state.last_log_index; + self.last_log.term = state.last_log_term; self.current_term = state.hard_state.current_term; self.voted_for = state.hard_state.voted_for; self.membership = state.membership; @@ -216,7 +213,7 @@ impl, S: RaftStorage> Ra self.report_metrics(Update::Ignore); } - let has_log = self.last_log_index != u64::min_value(); + let has_log = self.last_log.index != u64::min_value(); let single = self.membership.members.len() == 1; let is_candidate = self.membership.contains(&self.id); @@ -284,7 +281,7 @@ impl, S: RaftStorage> Ra id: self.id, state: self.target_state, current_term: self.current_term, - last_log_index: self.last_log_index, + last_log_index: self.last_log.index, last_applied: self.last_applied, current_leader: self.current_leader, membership_config: self.membership.clone(), diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 1ddf36407..9d30d2241 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -32,15 +32,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage target, self.core.current_term, self.core.config.clone(), - self.core.last_log_index, - self.core.last_log_term, + self.core.last_log.index, + self.core.last_log.term, self.core.commit_index, self.core.network.clone(), self.core.storage.clone(), self.replicationtx.clone(), ); ReplicationState { - matched: (self.core.current_term, self.core.last_log_index).into(), + matched: (self.core.current_term, self.core.last_log.index).into(), replstream, remove_after_commit: None, } @@ -241,7 +241,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // this node is me, the leader if *id == self.core.id { // TODO: can it be sure that self.core.last_log_term is the term of this leader? - rst.push((self.core.last_log_index, self.core.last_log_term)); + rst.push((self.core.last_log.index, self.core.last_log.term)); continue; } @@ -288,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if let Some(snapshot) = current_snapshot_opt { // If snapshot exists, ensure its distance from the leader's last log index is <= half // of the configured snapshot threshold, else create a new snapshot. - if snapshot_is_within_half_of_threshold(&snapshot.included.index, &self.core.last_log_index, &threshold) { + if snapshot_is_within_half_of_threshold(&snapshot.included.index, &self.core.last_log.index, &threshold) { let _ = tx.send(snapshot); return Ok(()); } diff --git a/async-raft/src/core/vote.rs b/async-raft/src/core/vote.rs index 27b78eace..af33050ea 100644 --- a/async-raft/src/core/vote.rs +++ b/async-raft/src/core/vote.rs @@ -59,7 +59,7 @@ impl, S: RaftStorage> Ra // Check if candidate's log is at least as up-to-date as this node's. // If candidate's log is not at least as up-to-date as this node, then reject. let client_is_uptodate = - (msg.last_log_term >= self.last_log_term) && (msg.last_log_index >= self.last_log_index); + (msg.last_log_term >= self.last_log.term) && (msg.last_log_index >= self.last_log.index); if !client_is_uptodate { tracing::trace!( { candidate = msg.candidate_id }, @@ -154,8 +154,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let rpc = VoteRequest::new( self.core.current_term, self.core.id, - self.core.last_log_index, - self.core.last_log_term, + self.core.last_log.index, + self.core.last_log.term, ); let (network, tx_inner) = (self.core.network.clone(), tx.clone()); let _ = tokio::spawn(