Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Raft::trigger()::allow_next_revert() allow to reset replication for next detected follower log revert #1259

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,13 @@ where
ExternalCommand::TriggerTransferLeader { to } => {
self.engine.trigger_transfer_leader(to);
}
ExternalCommand::AllowNextRevert { to, allow } => {
if let Ok(mut l) = self.engine.leader_handler() {
l.replication_handler().allow_next_revert(to, allow);
} else {
tracing::warn!("AllowNextRevert: current node is not a Leader");
}
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
let res = self.sm_handle.send(sm_cmd);
if let Err(e) = res {
Expand Down
11 changes: 11 additions & 0 deletions openraft/src/core/raft_msg/external_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
/// Submit a command to inform RaftCore to transfer leadership to the specified node.
TriggerTransferLeader { to: C::NodeId },

/// Allow or not the next revert of the replication to the specified node.
AllowNextRevert { to: C::NodeId, allow: bool },

/// Send a [`sm::Command`] to [`sm::worker::Worker`].
/// This command is run in the sm task.
StateMachineCommand { sm_cmd: sm::Command<C> },
Expand Down Expand Up @@ -72,6 +75,14 @@ where C: RaftTypeConfig
ExternalCommand::TriggerTransferLeader { to } => {
write!(f, "TriggerTransferLeader: to {}", to)
}
ExternalCommand::AllowNextRevert { to, allow } => {
write!(
f,
"{}-on-next-log-revert: to {}",
if *allow { "AllowReset" } else { "Panic" },
to
)
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
write!(f, "StateMachineCommand: {}", sm_cmd)
}
Expand Down
25 changes: 25 additions & 0 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,31 @@ where C: RaftTypeConfig
prog_entry.update_conflicting(conflict.index);
}

/// Enable one-time replication reset for a specific node upon log reversion detection.
///
/// This method sets a flag to allow the replication process to be reset once for the specified
/// target node when a log reversion is detected. This is typically used to handle scenarios
/// where a follower node's log has unexpectedly reverted to a previous state.
///
/// # Behavior
///
/// - Sets the `reset_on_reversion` flag to `true` for the specified node in the leader's
/// progress tracker.
/// - This flag will be consumed upon the next log reversion detection, allowing for a one-time
/// reset.
/// - If the node is not found in the progress tracker, this method ignore it.
pub(crate) fn allow_next_revert(&mut self, target: C::NodeId, allow: bool) {
let Some(prog_entry) = self.leader.progress.get_mut(&target) else {
tracing::warn!(
"target node {} not found in progress tracker, when {}",
target,
func_name!()
);
return;
};
prog_entry.reset_on_reversion = allow;
}

/// Update replication progress when a response is received.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result<ReplicationResult<C>, String>) {
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
targets: vec![ReplicationProgress(3, ProgressEntry {
matching: None,
inflight: Inflight::None,
searching_end: 4
searching_end: 4,
reset_on_reversion: false,
})]
},
Command::AppendInputEntries {
Expand Down Expand Up @@ -128,7 +129,8 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> {
targets: vec![ReplicationProgress(3, ProgressEntry {
matching: None,
inflight: Inflight::None,
searching_end: 7
searching_end: 7,
reset_on_reversion: false,
})]
},
Command::Replicate {
Expand Down
24 changes: 21 additions & 3 deletions openraft/src/progress/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ where C: RaftTypeConfig

/// One plus the max log index on the following node that might match the leader log.
pub(crate) searching_end: u64,

/// If true, reset the progress, by setting [`Self::matching`] to `None`, when the follower's
/// log is found reverted to an early state.
///
/// This allows the target node to clean its data and wait for the leader to replicate all data
/// to it.
///
/// This flag will be cleared after the progress entry is reset.
pub(crate) reset_on_reversion: bool,
}

impl<C> ProgressEntry<C>
Expand All @@ -40,6 +49,7 @@ where C: RaftTypeConfig
matching: matching.clone(),
inflight: Inflight::None,
searching_end: matching.next_index(),
reset_on_reversion: false,
}
}

Expand All @@ -51,6 +61,7 @@ where C: RaftTypeConfig
matching: None,
inflight: Inflight::None,
searching_end: end,
reset_on_reversion: false,
}
}

Expand Down Expand Up @@ -117,8 +128,14 @@ where C: RaftTypeConfig
//
// - If log reversion is allowed, just restart the binary search from the beginning.
// - Otherwise, panic it.
{
#[cfg(feature = "loosen-follower-log-revert")]

let allow_reset = if cfg!(feature = "loosen-follower-log-revert") {
true
} else {
self.reset_on_reversion
};

if allow_reset {
if conflict < self.matching.next_index() {
tracing::warn!(
"conflict {} < last matching {}: follower log is reverted; with 'loosen-follower-log-revert' enabled, this is allowed.",
Expand All @@ -127,8 +144,9 @@ where C: RaftTypeConfig
);

self.matching = None;
self.reset_on_reversion = false;
}

} else {
debug_assert!(
conflict >= self.matching.next_index(),
"follower log reversion is not allowed \
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ where C: RaftTypeConfig
&self.inner.config
}

/// Return a handle to manually trigger raft actions, such as elect or build snapshot.
/// Return a [`Trigger`] handle to manually trigger raft actions, such as elect or build
/// snapshot.
///
/// Example:
/// ```ignore
Expand Down
37 changes: 37 additions & 0 deletions openraft/src/raft/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,41 @@ where C: RaftTypeConfig
.send_external_command(ExternalCommand::TriggerTransferLeader { to }, "transfer_leader")
.await
}

/// Request the RaftCore to allow to reset replication for a specific node when log revert is
/// detected.
///
/// - `allow=true`: This method instructs the RaftCore to allow the target node's log to revert
/// to a previous state for one time.
/// - `allow=false`: This method instructs the RaftCore to panic if the target node's log revert
///
/// ### Behavior
///
/// - If this node is the Leader, it will attempt to replicate logs to the target node from the
/// beginning.
/// - If this node is not the Leader, the request is ignored.
/// - If the target node is not found, the request is ignored.
///
/// ### Automatic Replication Reset
///
/// When the [`loosen-follower-log-revert`](`crate::docs::feature_flags#
/// feature-flag-loosen-follower-log-revert) feature flag is enabled, the Leader automatically
/// reset replication if it detects that the target node's log has reverted. This
/// feature is primarily useful in testing environments.
///
/// ### Production Considerations
///
/// In production environments, state reversion is a critical issue that should not be
/// automatically handled. However, there may be scenarios where a Follower's data is
/// intentionally removed and needs to rejoin the cluster(without membership changes). In such
/// cases, the Leader should reinitialize replication for that node with the following steps:
/// - Shut down the target node.
/// - call [`Self::allow_next_revert`] on the Leader.
/// - Clear the target node's data directory.
/// - Restart the target node.
pub async fn allow_next_revert(&self, to: &C::NodeId, allow: bool) -> Result<(), Fatal<C>> {
self.raft_inner
.send_external_command(ExternalCommand::AllowNextRevert { to: to.clone(), allow }, func_name!())
.await
}
}
27 changes: 0 additions & 27 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ where

match &replication_result.0 {
Ok(matching) => {
self.validate_matching(matching);
self.matching = matching.clone();
}
Err(_conflict) => {
Expand All @@ -569,32 +568,6 @@ where
});
}

/// Validate the value for updating matching log id.
///
/// If the matching log id is reverted to a smaller value:
/// - log a warning message if [`loosen-follower-log-revert`] feature flag is enabled;
/// - otherwise panic, consider it as a bug.
///
/// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert
fn validate_matching(&self, matching: &Option<LogId<C::NodeId>>) {
if cfg!(feature = "loosen-follower-log-revert") {
if &self.matching > matching {
tracing::warn!(
"follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed",
self.matching.display(),
matching.display(),
);
}
} else {
debug_assert!(
&self.matching <= matching,
"follower log is reverted from {} to {}",
self.matching.display(),
matching.display(),
);
}
}

/// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and
/// should not send out anything before backoff interval expired.
///
Expand Down
1 change: 1 addition & 0 deletions tests/tests/replication/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ mod t50_append_entries_backoff_rejoin;
mod t51_append_entries_too_large;
#[cfg(feature = "loosen-follower-log-revert")]
mod t60_feature_loosen_follower_log_revert;
mod t61_allow_follower_log_revert;
68 changes: 68 additions & 0 deletions tests/tests/replication/t61_allow_follower_log_revert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::Config;

use crate::fixtures::ut_harness;
use crate::fixtures::RaftRouter;

/// With `Trigger::allow_next_revert()` the leader allows follower to revert its log to an
/// earlier state for one time.
#[tracing::instrument]
#[test_harness::test(harness = ut_harness)]
async fn allow_follower_log_revert() -> Result<()> {
let config = Arc::new(
Config {
enable_tick: false,
enable_heartbeat: false,
// Make sure the replication is done in more than one steps
max_payload_entries: 1,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {1}).await?;

tracing::info!(log_index, "--- write 10 logs");
{
log_index += router.client_request_many(0, "0", 10).await?;
for i in [0, 1] {
router.wait(&i, timeout()).applied_index(Some(log_index), format!("{} writes", 10)).await?;
}
}
tracing::info!(log_index, "--- allow next detected log revert");
{
let n0 = router.get_raft_handle(&0)?;
n0.trigger().allow_next_revert(&1, true).await?;
}

tracing::info!(log_index, "--- erase Learner-1 and restart");
{
let (_raft, _ls, _sm) = router.remove_node(1).unwrap();
let (log, sm) = openraft_memstore::new_mem_store();

router.new_raft_node_with_sto(1, log, sm).await;
router.add_learner(0, 1).await?;
log_index += 1; // add learner
}

tracing::info!(log_index, "--- write another 10 logs, leader should not panic");
{
log_index += router.client_request_many(0, "0", 10).await?;
for i in [0, 1] {
router.wait(&i, timeout()).applied_index(Some(log_index), format!("{} writes", 10)).await?;
}
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}
Loading