Skip to content

Commit

Permalink
Change: use Vote to define Leader and Candidate
Browse files Browse the repository at this point in the history
`Vote` is a similar concept to paxos proposer-id.
It is defined by a tuple of `(term, uncommitted|committed, node_id)`.

A Candidate creates an uncommitted `Vote` to identify it, as a
replacement of `(term, candidate_id)`.

A Leader has a `Vote` that is committed, i.e., the vote is granted by a
quorum.

The rule for overriding a `Vote` is defined by a **partially ordered**
relation: a node is only allowed to grant a greater vote.
The partial order relation covers all behavior of the `vote` in raft
spec. This way wee make the test very easy to be done.

With `Vote`, checking validity of a request is quite simple: just by
`self.vote <= rpc.vote`.

- Rename: save_hard_state() and read_hard_state() to save_vote() and read_vote().

- Replace `term, node_id` pair with `Vote` in RaftCore and RPC struct-s.

- Fix: request handler for append-entries and install-snapshot should
  save the vote they received.
  • Loading branch information
drmingdrmer committed Jan 25, 2022
1 parent 9a8b750 commit 58f2491
Show file tree
Hide file tree
Showing 32 changed files with 576 additions and 439 deletions.
4 changes: 2 additions & 2 deletions guide/src/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ or a wrapper of a remote sql DB.

- Read/write raft state, e.g., term or vote.
```rust
fn save_hard_state(hs:&HardState)
fn read_hard_state() -> Result<Option<HardState>>
fn save_vote(vote:&Vote)
fn read_vote() -> Result<Option<Vote>>
```

- Read/write logs.
Expand Down
19 changes: 9 additions & 10 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use anyerror::AnyError;
use openraft::async_trait::async_trait;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::storage::HardState;
use openraft::storage::LogState;
use openraft::storage::Snapshot;
use openraft::AppData;
Expand All @@ -30,6 +29,7 @@ use openraft::SnapshotMeta;
use openraft::StateMachineChanges;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -93,7 +93,7 @@ pub struct MemStore {
sm: RwLock<MemStoreStateMachine>,

/// The current hard state.
hs: RwLock<Option<HardState>>,
vote: RwLock<Option<Vote>>,

snapshot_idx: Arc<Mutex<u64>>,

Expand All @@ -106,14 +106,13 @@ impl MemStore {
pub async fn new() -> Self {
let log = RwLock::new(BTreeMap::new());
let sm = RwLock::new(MemStoreStateMachine::default());
let hs = RwLock::new(None);
let current_snapshot = RwLock::new(None);

Self {
last_purged_log_id: RwLock::new(None),
log,
sm,
hs,
vote: RwLock::new(None),
snapshot_idx: Arc::new(Mutex::new(0)),
current_snapshot,
}
Expand All @@ -133,16 +132,16 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
type SnapshotData = Cursor<Vec<u8>>;

#[tracing::instrument(level = "trace", skip(self))]
async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError> {
tracing::debug!(?hs, "save_hard_state");
let mut h = self.hs.write().await;
async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> {
tracing::debug!(?vote, "save_vote");
let mut h = self.vote.write().await;

*h = Some(hs.clone());
*h = Some(*vote);
Ok(())
}

async fn read_hard_state(&self) -> Result<Option<HardState>, StorageError> {
Ok(self.hs.read().await.clone())
async fn read_vote(&self) -> Result<Option<Vote>, StorageError> {
Ok(*self.vote.read().await)
}

async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
) -> Result<(), InitializeError> {
// TODO(xp): simplify this condition

if self.core.last_log_id.is_some() || self.core.current_term != 0 {
if self.core.last_log_id.is_some() || self.core.vote.term != 0 {
tracing::error!(
last_log_id=?self.core.last_log_id, self.core.current_term,
last_log_id=?self.core.last_log_id, %self.core.vote,
"rejecting init_with_config request as last_log_index is not None or current_term is not 0");
return Err(InitializeError::NotAllowed);
}
Expand Down Expand Up @@ -249,7 +249,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// TODO(xp): transfer leadership
self.core.set_target_state(State::Learner);
self.core.current_leader = None;
return;
}

Expand Down
75 changes: 25 additions & 50 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,72 +21,47 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
///
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
#[tracing::instrument(level = "debug", skip(self, msg))]
#[tracing::instrument(level = "debug", skip(self, req))]
pub(super) async fn handle_append_entries_request(
&mut self,
msg: AppendEntriesRequest<D>,
req: AppendEntriesRequest<D>,
) -> Result<AppendEntriesResponse, AppendEntriesError> {
tracing::debug!(last_log_id=?self.last_log_id, ?self.last_applied, msg=%msg.summary(), "handle_append_entries_request");
tracing::debug!(last_log_id=?self.last_log_id, ?self.last_applied, msg=%req.summary(), "handle_append_entries_request");

let msg_entries = msg.entries.as_slice();
let msg_entries = req.entries.as_slice();

// Partial order compare: smaller than or incomparable
#[allow(clippy::neg_cmp_op_on_partial_ord)]
if !(req.vote >= self.vote) {
tracing::debug!(%self.vote, %req.vote, "AppendEntries RPC term is less than current term");

// If message's term is less than most recent term, then we do not honor the request.
if msg.term < self.current_term {
tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
return Ok(AppendEntriesResponse {
term: self.current_term,
vote: self.vote,
success: false,
conflict: false,
});
}

self.update_next_election_timeout(true);

// Caveat: Because we can not just delete `log[prev_log_id.index..]`, (which results in loss of committed
// entry), the commit index must be update only after append-entries
// and must point to a log entry that is consistent to leader.
// Or there would be chance applying an uncommitted entry:
//
// ```
// R0 1,1 1,2 3,3
// R1 1,1 1,2 2,3
// R2 1,1 1,2 3,3
// ```
//
// - R0 to R1 append_entries: entries=[{1,2}], prev_log_id = {1,1}, commit_index = 3
// - R1 accepted this append_entries request but was not aware of that entry {2,3} is inconsistent to leader.
// Then it will update commit_index to 3 and apply {2,3}

// TODO(xp): cleanup commit index at sender side.
let valid_commit_index = msg_entries.last().map(|x| Some(x.log_id)).unwrap_or_else(|| msg.prev_log_id);
let valid_committed = std::cmp::min(msg.leader_commit, valid_commit_index);

tracing::debug!("start to check and update to latest term/leader");
{
let mut report_metrics = false;

if msg.term > self.current_term {
self.update_current_term(msg.term, Some(msg.leader_id));
self.save_hard_state().await?;
report_metrics = true;
}
if req.vote > self.vote {
self.vote = req.vote;
self.save_vote().await?;

// Update current leader if needed.
if self.current_leader != Some(msg.leader_id) {
report_metrics = true;
// If not follower, become follower.
if !self.target_state.is_follower() && !self.target_state.is_learner() {
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.current_leader = Some(msg.leader_id);

if report_metrics {
self.report_metrics(Update::AsIs);
}
self.report_metrics(Update::AsIs);
}

// Transition to follower state if needed.
if !self.target_state.is_follower() && !self.target_state.is_learner() {
self.set_target_state(State::Follower);
}
// Caveat: [commit-index must not advance the last known consistent log](https://datafuselabs.github.io/openraft/replication.html#caveat-commit-index-must-not-advance-the-last-known-consistent-log)

// TODO(xp): cleanup commit index at sender side.
let valid_commit_index = msg_entries.last().map(|x| Some(x.log_id)).unwrap_or_else(|| req.prev_log_id);
let valid_committed = std::cmp::min(req.leader_commit, valid_commit_index);

tracing::debug!("begin log consistency check");

Expand All @@ -95,7 +70,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// +----------------+------------------------+
// ` 0 ` last_applied ` last_log_id

let res = self.append_apply_log_entries(msg.prev_log_id, msg_entries, valid_committed).await?;
let res = self.append_apply_log_entries(req.prev_log_id, msg_entries, valid_committed).await?;

Ok(res)
}
Expand Down Expand Up @@ -220,7 +195,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

return Ok(AppendEntriesResponse {
term: self.current_term,
vote: self.vote,
success: false,
conflict: true,
});
Expand Down Expand Up @@ -252,7 +227,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.report_metrics(Update::AsIs);

Ok(AppendEntriesResponse {
term: self.current_term,
vote: self.vote,
success: true,
conflict: false,
})
Expand Down
10 changes: 6 additions & 4 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::RaftRespTx;
use crate::replication::RaftEvent;
use crate::vote::Vote;
use crate::AppData;
use crate::AppDataResponse;
use crate::MessageSummary;
Expand Down Expand Up @@ -101,8 +102,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

let rpc = AppendEntriesRequest {
term: self.core.current_term,
leader_id: self.core.id,
vote: self.core.vote,
prev_log_id: node.matched,
entries: vec![],
leader_commit: self.core.committed,
Expand Down Expand Up @@ -157,8 +157,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
};

// If we receive a response with a greater term, then revert to follower and abort this request.
if data.term != self.core.current_term {
self.core.update_current_term(data.term, None);
if data.vote.term != self.core.vote.term {
self.core.vote = Vote::new_uncommitted(data.vote.term, None);
// TODO(xp): deal with storage error
self.core.save_vote().await.unwrap();
// TODO(xp): if receives error about a higher term, it should stop at once?
self.core.set_target_state(State::Follower);
}
Expand Down
48 changes: 15 additions & 33 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,25 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
&mut self,
req: InstallSnapshotRequest,
) -> Result<InstallSnapshotResponse, InstallSnapshotError> {
// If message's term is less than most recent term, then we do not honor the request.
if req.term < self.current_term {
return Ok(InstallSnapshotResponse {
term: self.current_term,
});
#[allow(clippy::neg_cmp_op_on_partial_ord)]
if !(req.vote >= self.vote) {
tracing::debug!(%self.vote, %req.vote, "InstallSnapshot RPC term is less than current term");

return Ok(InstallSnapshotResponse { vote: self.vote });
}

// Update election timeout.
self.update_next_election_timeout(true);

// Update current term if needed.
let mut report_metrics = false;
if self.current_term != req.term {
self.update_current_term(req.term, None);
self.save_hard_state().await?;
report_metrics = true;
}

// Update current leader if needed.
if self.current_leader != Some(req.leader_id) {
report_metrics = true;
}
if req.vote > self.vote {
self.vote = req.vote;
self.save_vote().await?;

self.current_leader = Some(req.leader_id);

// If not follower, become follower.
if !self.target_state.is_follower() && !self.target_state.is_learner() {
self.set_target_state(State::Follower); // State update will emit metrics.
}
// If not follower, become follower.
if !self.target_state.is_follower() && !self.target_state.is_learner() {
self.set_target_state(State::Follower); // State update will emit metrics.
}

if report_metrics {
self.report_metrics(Update::AsIs);
}

Expand Down Expand Up @@ -134,9 +122,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// If this was a small snapshot, and it is already done, then finish up.
if req.done {
self.finalize_snapshot_installation(req, snapshot).await?;
return Ok(InstallSnapshotResponse {
term: self.current_term,
});
return Ok(InstallSnapshotResponse { vote: self.vote });
}

// Else, retain snapshot components for later segments & respond.
Expand All @@ -145,9 +131,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
id,
snapshot,
});
Ok(InstallSnapshotResponse {
term: self.current_term,
})
Ok(InstallSnapshotResponse { vote: self.vote })
}

#[tracing::instrument(level = "debug", skip(self, req, snapshot), fields(req=%req.summary()))]
Expand Down Expand Up @@ -188,9 +172,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
} else {
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
}
Ok(InstallSnapshotResponse {
term: self.current_term,
})
Ok(InstallSnapshotResponse { vote: self.vote })
}

/// Finalize the installation of a new snapshot.
Expand Down
Loading

0 comments on commit 58f2491

Please sign in to comment.