Skip to content

Commit

Permalink
Change: A restarted leader should enter leader state at once, without…
Browse files Browse the repository at this point in the history
… another round of election

- Test: single-node restart test does not expect the node to run
  election any more.

- Refactor: add VoteHandler to handle vote related operations.

- Change: make ServerState default value `Learner`.

- Fix: #607
  • Loading branch information
drmingdrmer committed Jan 4, 2023
1 parent 4332722 commit 3d5e001
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 32 deletions.
12 changes: 8 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
tracing::debug!("raft node is initializing");

self.engine.startup();
// No output commands
self.run_engine_commands::<Entry<C>>(&[]).await?;

// To ensure that restarted nodes don't disrupt a stable cluster.
self.set_next_election_time(false);
if self.engine.state.server_state == ServerState::Follower {
self.set_next_election_time(false);
}

// Initialize metrics.
self.report_metrics(Update::Update(None));
Expand Down Expand Up @@ -976,16 +978,18 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
&'e Ent: Into<Entry<C>>,
{
if tracing::enabled!(Level::DEBUG) {
tracing::debug!("run command: start...");
tracing::debug!("commands: start...");
for c in self.engine.output.commands.iter() {
tracing::debug!("run command: {:?}", c);
tracing::debug!("commands: {:?}", c);
}
tracing::debug!("commands: end...");
}

let mut curr = 0;
let mut commands = vec![];
swap(&mut self.engine.output.commands, &mut commands);
for cmd in commands {
tracing::debug!("run command: {:?}", cmd);
self.run_command(input_entries, &mut curr, &cmd).await?;
}

Expand Down
10 changes: 3 additions & 7 deletions openraft/src/core/server_state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
/// All possible states of a Raft node.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Default)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum ServerState {
/// The node is completely passive; replicating entries, but neither voting nor timing out.
#[default]
Learner,
/// The node is replicating logs from the leader.
Follower,
Expand All @@ -14,12 +16,6 @@ pub enum ServerState {
Shutdown,
}

impl Default for ServerState {
fn default() -> Self {
Self::Follower
}
}

impl ServerState {
/// Check if currently in learner state.
pub fn is_learner(&self) -> bool {
Expand Down
51 changes: 38 additions & 13 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use crate::core::ServerState;
use crate::engine::vote_handler::VoteHandler;
use crate::engine::Command;
use crate::entry::RaftEntry;
use crate::error::InitializeError;
Expand Down Expand Up @@ -74,6 +75,17 @@ where
pub(crate) commands: Vec<Command<NID, N>>,
}

impl<NID, N> EngineOutput<NID, N>
where
NID: NodeId,
N: Node,
{
pub(crate) fn push_command(&mut self, cmd: Command<NID, N>) {
cmd.update_metrics_flags(&mut self.metrics_flags);
self.commands.push(cmd)
}
}

/// Raft protocol algorithm.
///
/// It implement the complete raft algorithm except does not actually update any states.
Expand Down Expand Up @@ -119,22 +131,29 @@ where
// TODO: test it
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn startup(&mut self) {
// On startup, do not assume a leader. Becoming a leader require initialization on several fields.
// TODO: allows starting up as a leader. After `server_state` is removed from Engine.
// Allows starting up as a leader.

// Previously it is a leader. restore it as leader at once
if self.is_leader() {
self.switch_internal_server_state();
self.update_server_state_if_changed();
self.update_replications();
return;
}

let server_state = if self.state.membership_state.effective.is_voter(&self.config.id) {
ServerState::Follower
} else {
ServerState::Learner
};

self.state.server_state = server_state;

tracing::debug!(
"startup: id={} target_state: {:?}",
self.config.id,
self.state.server_state
);

self.state.server_state = server_state;
}

/// Initialize a node by appending the first log.
Expand Down Expand Up @@ -702,7 +721,10 @@ where
}

// A leader that is removed will be shut down when this membership log is committed.
if self.internal_server_state.is_leading() {
// TODO: currently only a leader has replication setup.
// It's better to setup replication for both leader and candidate.
// e.g.: if self.internal_server_state.is_leading() {
if self.is_leader() {
self.update_replications()
}

Expand Down Expand Up @@ -1010,12 +1032,7 @@ where

/// Vote is granted by a quorum, leader established.
fn establish_leader(&mut self) {
self.state.vote.commit();
// Saving the vote that is granted by a quorum, AKA committed vote, is not necessary by original raft.
// Openraft insists doing this because:
// - Voting is not in the hot path, thus no performance penalty.
// - Leadership won't be lost if a leader restarted quick enough.
self.push_command(Command::SaveVote { vote: self.state.vote });
self.vote_handler().commit();

self.update_server_state_if_changed();
self.update_replications();
Expand Down Expand Up @@ -1297,7 +1314,15 @@ where
}

fn push_command(&mut self, cmd: Command<NID, N>) {
cmd.update_metrics_flags(&mut self.output.metrics_flags);
self.output.commands.push(cmd)
self.output.push_command(cmd)
}

// --- handlers ---

fn vote_handler(&mut self) -> VoteHandler<NID, N> {
VoteHandler {
state: &mut self.state,
output: &mut self.output,
}
}
}
1 change: 1 addition & 0 deletions openraft/src/engine/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
tracing::info!("--- not in election. just ignore");
{
let mut eng = eng();
eng.state.server_state = ServerState::Follower;
eng.state.vote = Vote::new(2, 1);
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12()));

Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
mod command;
mod engine_impl;
mod log_id_list;
mod vote_handler;

#[cfg(test)] mod calc_purge_upto_test;
#[cfg(test)] mod elect_test;
Expand Down
70 changes: 68 additions & 2 deletions openraft/src/engine/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use std::sync::Arc;

use maplit::btreeset;

use crate::engine::Command;
use crate::engine::Engine;
use crate::progress::entry::ProgressEntry;
use crate::EffectiveMembership;
use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::MetricsChangeFlags;
use crate::ServerState;
use crate::Vote;

fn log_id(term: u64, index: u64) -> LogId<u64> {
LogId::<u64> {
Expand All @@ -28,11 +31,74 @@ fn m34() -> Membership<u64, ()> {
fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.config.id = 2;
// This will be overrided
eng.state.server_state = ServerState::Leader;
// This will be overridden
eng.state.server_state = ServerState::default();
eng
}

#[test]
fn test_startup_as_leader() -> anyhow::Result<()> {
let mut eng = eng();
// self.id==2 is a voter:
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23()));
// Committed vote makes it a leader at startup.
eng.state.vote = Vote::new_committed(1, 2);

eng.startup();

assert_eq!(ServerState::Leader, eng.state.server_state);

assert_eq!(
MetricsChangeFlags {
replication: true,
local_data: false,
cluster: true,
},
eng.output.metrics_flags
);

assert_eq!(
vec![
//
Command::BecomeLeader,
Command::UpdateReplicationStreams {
targets: vec![(3, ProgressEntry {
matching: None,
searching: None
})]
}
],
eng.output.commands
);

Ok(())
}

#[test]
fn test_startup_candidate_becomes_follower() -> anyhow::Result<()> {
let mut eng = eng();
// self.id==2 is a voter:
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23()));
// Non-committed vote makes it a candidate at startup.
eng.state.vote = Vote::new(1, 2);

eng.startup();

assert_eq!(ServerState::Follower, eng.state.server_state);

assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: false,
cluster: false,
},
eng.output.metrics_flags
);

assert_eq!(0, eng.output.commands.len());

Ok(())
}
#[test]
fn test_startup_as_follower() -> anyhow::Result<()> {
let mut eng = eng();
Expand Down
34 changes: 34 additions & 0 deletions openraft/src/engine/vote_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::engine::engine_impl::EngineOutput;
use crate::engine::Command;
use crate::Node;
use crate::NodeId;
use crate::RaftState;

/// Handle raft vote related operations
pub(crate) struct VoteHandler<'st, 'out, NID, N>
where
NID: NodeId,
N: Node,
{
pub(crate) state: &'st mut RaftState<NID, N>,
pub(crate) output: &'out mut EngineOutput<NID, N>,
}

impl<'st, 'out, NID, N> VoteHandler<'st, 'out, NID, N>
where
NID: NodeId,
N: Node,
{
/// Mark the vote as committed, i.e., being granted and saved by a quorum.
///
/// The committed vote, is not necessary in original raft.
/// Openraft insists doing this because:
/// - Voting is not in the hot path, thus no performance penalty.
/// - Leadership won't be lost if a leader restarted quick enough.
pub(crate) fn commit(&mut self) {
debug_assert!(!self.state.vote.committed);

self.state.vote.commit();
self.output.push_command(Command::SaveVote { vote: self.state.vote });
}
}
6 changes: 0 additions & 6 deletions openraft/tests/life_cycle/t90_issue_607_single_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Duration;

use maplit::btreeset;
use openraft::Config;
use openraft::ServerState;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;
Expand Down Expand Up @@ -39,11 +38,6 @@ async fn single_restart() -> anyhow::Result<()> {
node.shutdown().await?;

router.new_raft_node_with_sto(0, sto).await;
// leader appends a blank log.
log_index += 1;

router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?;
router.wait(&0, timeout()).log(Some(log_index), "node-0 restarted").await?;
}

tracing::info!("--- write to 1 log after restart");
Expand Down

0 comments on commit 3d5e001

Please sign in to comment.