Skip to content

Commit

Permalink
fix: handle-vote should compare last_log_id in dictionary order, not …
Browse files Browse the repository at this point in the history
…in vector order

A log `{term:2, index:1}` is definitely greater than log `{term:1, index:2}` in raft spec.
Comparing log id in the way of `term1 >= term2 && index1 >= index2` blocks election:
no one can become a leader.
  • Loading branch information
drmingdrmer committed Sep 9, 2021
1 parent 734eec6 commit a48a328
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 8 deletions.
37 changes: 29 additions & 8 deletions async-raft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
last_log_id: self.last_log_id,
});
}

Expand All @@ -44,6 +45,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
last_log_id: self.last_log_id,
});
}
}
Expand All @@ -58,19 +60,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.save_hard_state().await?;
}

// TODO: bug: (2,1), (1,2)
// Check if candidate's log is at least as up-to-date as this node's.
// If candidate's log is not at least as up-to-date as this node, then reject.
let client_is_uptodate =
(msg.last_log_id.term >= self.last_log_id.term) && (msg.last_log_id.index >= self.last_log_id.index);
if !client_is_uptodate {
if msg.last_log_id < self.last_log_id {
tracing::debug!(
{ candidate = msg.candidate_id },
"rejecting vote request as candidate's log is not up-to-date"
);
return Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
last_log_id: self.last_log_id,
});
}

Expand All @@ -83,11 +83,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
Some(candidate_id) if candidate_id == &msg.candidate_id => Ok(VoteResponse {
term: self.current_term,
vote_granted: true,
last_log_id: self.last_log_id,
}),
// This node has already voted for a different candidate.
Some(_) => Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
last_log_id: self.last_log_id,
}),
// This node has not yet voted for the current term, so vote for the candidate.
None => {
Expand All @@ -99,6 +101,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
Ok(VoteResponse {
term: self.current_term,
vote_granted: true,
last_log_id: self.last_log_id,
})
}
}
Expand All @@ -107,15 +110,33 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> CandidateState<'a, D, R, N, S> {
/// Handle response from a vote request sent to a peer.
#[tracing::instrument(level = "trace", skip(self, res, target))]
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_vote_response(&mut self, res: VoteResponse, target: NodeId) -> RaftResult<()> {
// If peer's term is greater than current term, revert to follower state.

if res.term > self.core.current_term {
self.core.update_current_term(res.term, None);
self.core.update_current_leader(UpdateCurrentLeader::Unknown);
self.core.set_target_state(State::Follower);
self.core.save_hard_state().await?;
tracing::debug!("reverting to follower state due to greater term observed in RequestVote RPC response");

self.core.update_current_leader(UpdateCurrentLeader::Unknown);

// If a quorum of nodes have higher `last_log_id`, I have no chance to become a leader.
// TODO(xp): This is a simplified impl: revert to follower as soon as seeing a higher `last_log_id`.
// When reverted to follower, it waits for heartbeat for 2 second before starting a new round of
// election.
if self.core.last_log_id < res.last_log_id {
self.core.set_target_state(State::Follower);
tracing::debug!("reverting to follower state due to greater term observed in RequestVote RPC response");
} else {
tracing::debug!(
id = %self.core.id,
self_term=%self.core.current_term,
res_term=%res.term,
self_last_log_id=%self.core.last_log_id,
res_last_log_id=%res.last_log_id,
"I have lower term but higher or euqal last_log_id, keep trying to elect"
);
}
return Ok(());
}

Expand Down
4 changes: 4 additions & 0 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,12 @@ impl VoteRequest {
pub struct VoteResponse {
/// The current term of the responding node, for the candidate to update itself.
pub term: u64,

/// Will be true if the candidate received a vote from the responder.
pub vote_granted: bool,

/// The last log id stored on the remote voter.
pub last_log_id: LogId,
}

//////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
98 changes: 98 additions & 0 deletions async-raft/tests/elect_compare_last_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::Entry;
use async_raft::raft::EntryConfigChange;
use async_raft::raft::EntryPayload;
use async_raft::raft::MembershipConfig;
use async_raft::storage::HardState;
use async_raft::Config;
use async_raft::LogId;
use async_raft::RaftStorage;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::btreeset;

#[macro_use]
mod fixtures;

/// The last_log in a vote request must be greater or equal than the local one.
///
/// - Fake a cluster with two node: with last log {2,1} and {1,2}.
/// - Bring up the cluster and only node 0 can become leader.
///
/// RUST_LOG=async_raft,memstore,elect_compare_last_log=trace cargo test -p async-raft --test elect_compare_last_log
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn elect_compare_last_log() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));

let sto0 = router.new_store(0);
let sto1 = router.new_store(1);

tracing::info!("--- fake store: sto0: last log: 2,1");
{
sto0.save_hard_state(&HardState {
current_term: 10,
voted_for: None,
})
.await?;

sto0.append_to_log(&[&Entry {
log_id: LogId { term: 2, index: 1 },
payload: EntryPayload::ConfigChange(EntryConfigChange {
membership: MembershipConfig {
members: btreeset! {0,1},
members_after_consensus: None,
},
}),
}])
.await?;
}

tracing::info!("--- fake store: sto1: last log: 1,2");
{
sto1.save_hard_state(&HardState {
current_term: 10,
voted_for: None,
})
.await?;

sto1.append_to_log(&[
&Entry {
log_id: LogId { term: 1, index: 1 },
payload: EntryPayload::ConfigChange(EntryConfigChange {
membership: MembershipConfig {
members: btreeset! {0,1},
members_after_consensus: None,
},
}),
},
&Entry {
log_id: LogId { term: 1, index: 2 },
payload: EntryPayload::Blank,
},
])
.await?;
}

tracing::info!("--- bring up cluster and elect");

router.new_raft_node_with_sto(0, sto0.clone()).await;
router.new_raft_node_with_sto(1, sto1.clone()).await;

router
.wait_for_state(&btreeset! {0}, State::Leader, timeout(), "only node 0 becomes leader")
.await?;

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(5000))
}
4 changes: 4 additions & 0 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ impl RaftRouter {
self.new_raft_node_with_sto(id, memstore).await
}

pub fn new_store(self: &Arc<Self>, id: u64) -> Arc<MemStore> {
Arc::new(MemStore::new(id))
}

pub async fn new_raft_node_with_sto(self: &Arc<Self>, id: NodeId, sto: Arc<MemStore>) {
let node = Raft::new(id, self.config.clone(), self.clone(), sto.clone());
let mut rt = self.routing_table.write().await;
Expand Down

0 comments on commit a48a328

Please sign in to comment.