From e691ac40717b3271e59386523f9f38fbf9290431 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 24 Apr 2023 11:01:18 +0800 Subject: [PATCH] Refactor: always flush full metrics Metrics reporting has become infrequent; therefore, the "report only changed metrics" mechanism can be removed. --- .../tests/benchmark/bench_cluster.rs | 2 +- openraft/src/core/raft_core.rs | 42 ++----------- openraft/src/engine/command.rs | 29 --------- openraft/src/engine/engine_output.rs | 6 -- .../leader_handler/append_entries_test.rs | 3 - .../engine/handler/snapshot_handler/mod.rs | 1 - openraft/src/engine/mod.rs | 1 - openraft/src/engine/tests/command_test.rs | 59 ------------------- openraft/src/engine/tests/elect_test.rs | 25 -------- .../src/engine/tests/handle_vote_resp_test.rs | 49 --------------- openraft/src/engine/tests/initialize_test.rs | 11 ---- openraft/src/lib.rs | 2 - openraft/src/raft_types.rs | 49 --------------- 13 files changed, 5 insertions(+), 274 deletions(-) delete mode 100644 openraft/src/engine/tests/command_test.rs diff --git a/cluster_benchmark/tests/benchmark/bench_cluster.rs b/cluster_benchmark/tests/benchmark/bench_cluster.rs index 8ff02f644..d3999abdf 100644 --- a/cluster_benchmark/tests/benchmark/bench_cluster.rs +++ b/cluster_benchmark/tests/benchmark/bench_cluster.rs @@ -121,7 +121,7 @@ async fn do_bench(bench_config: &BenchConfig) -> anyhow::Result<()> { handles.push(h) } - leader.wait(timeout()).log_at_least(Some(total as u64), "commit all written logs").await?; + leader.wait(timeout()).log_at_least(Some(total), "commit all written logs").await?; let elapsed = now.elapsed(); diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 747e85aa0..e85bd6d46 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -93,7 +93,6 @@ use crate::RaftNetworkFactory; use crate::RaftTypeConfig; use crate::StorageError; use crate::StorageIOError; -use crate::Update; use crate::Vote; /// A temp struct to hold the data for a node that is being applied. @@ -213,7 +212,7 @@ where let res = self.do_main(rx_shutdown).instrument(span).await; // Flush buffered metrics - self.report_metrics(Update::AsIs); + self.report_metrics(None); tracing::info!("update the metrics for shutdown"); { @@ -243,7 +242,7 @@ where self.run_engine_commands().await?; // Initialize metrics. - self.report_metrics(Update::Update(None)); + self.report_metrics(None); self.runtime_loop(rx_shutdown).await } @@ -495,34 +494,13 @@ where /// Then clear flags about the cached changes, to avoid unnecessary metrics report. #[tracing::instrument(level = "debug", skip_all)] pub fn flush_metrics(&mut self) { - if !self.engine.output.metrics_flags.changed() { - return; - } - - let leader_metrics = if self.engine.output.metrics_flags.replication { - let replication_metrics = self.leader_data.as_ref().map(|x| x.replication_metrics.clone()); - Update::Update(replication_metrics) - } else { - #[allow(clippy::collapsible_else_if)] - if self.leader_data.is_some() { - Update::AsIs - } else { - Update::Update(None) - } - }; - + let leader_metrics = self.leader_data.as_ref().map(|x| x.replication_metrics.clone()); self.report_metrics(leader_metrics); - self.engine.output.metrics_flags.reset(); } /// Report a metrics payload on the current state of the Raft node. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn report_metrics(&self, replication: Update>>>) { - let replication = match replication { - Update::Update(v) => v, - Update::AsIs => self.tx_metrics.borrow().replication.clone(), - }; - + pub(crate) fn report_metrics(&self, replication: Option>>) { let m = RaftMetrics { running_state: Ok(()), id: self.id, @@ -542,14 +520,6 @@ where replication, }; - { - let curr = self.tx_metrics.borrow(); - if m == *curr { - tracing::debug!("metrics not changed: {}", m.summary()); - return; - } - } - tracing::debug!("report_metrics: {}", m.summary()); let res = self.tx_metrics.send(m); @@ -1245,12 +1215,10 @@ where sm::Response::InstallSnapshot(meta) => { if let Some(meta) = meta { self.engine.state.io_state_mut().update_applied(meta.last_log_id); - self.engine.output.metrics_flags.set_data_changed(); } } sm::Response::Apply(res) => { self.engine.state.io_state_mut().update_applied(Some(res.last_applied)); - self.engine.output.metrics_flags.set_data_changed(); self.handle_apply_result(res); } @@ -1374,8 +1342,6 @@ where // This method is only called after `update_progress()`. // And this node may become a non-leader after `update_progress()` } - - self.engine.output.metrics_flags.set_replication_changed() } /// If a message is sent by a previous server state but is received by current server state, diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 25ba4f6be..0d7871822 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -15,7 +15,6 @@ use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::LeaderId; use crate::LogId; -use crate::MetricsChangeFlags; use crate::Node; use crate::NodeId; use crate::RaftTypeConfig; @@ -164,34 +163,6 @@ where impl Command where C: RaftTypeConfig { - /// Update the flag of the metrics that needs to be updated when this command is executed. - pub(crate) fn update_metrics_flags(&self, flags: &mut MetricsChangeFlags) { - match &self { - Command::BecomeLeader { .. } => flags.set_cluster_changed(), - Command::QuitLeader => flags.set_cluster_changed(), - Command::AppendEntry { .. } => flags.set_data_changed(), - Command::AppendInputEntries { entries } => { - debug_assert!(!entries.is_empty()); - flags.set_data_changed() - } - Command::AppendBlankLog { .. } => flags.set_data_changed(), - Command::ReplicateCommitted { .. } => {} - Command::LeaderCommit { .. } => flags.set_data_changed(), - Command::FollowerCommit { .. } => flags.set_data_changed(), - Command::Replicate { .. } => {} - Command::RebuildReplicationStreams { .. } => flags.set_replication_changed(), - Command::UpdateProgressMetrics { .. } => flags.set_replication_changed(), - Command::SaveVote { .. } => flags.set_data_changed(), - Command::SendVote { .. } => {} - Command::PurgeLog { .. } => flags.set_data_changed(), - Command::DeleteConflictLog { .. } => flags.set_data_changed(), - Command::BuildSnapshot { .. } => flags.set_data_changed(), - Command::InstallSnapshot { .. } => flags.set_data_changed(), - Command::CancelSnapshot { .. } => {} - Command::Respond { .. } => {} - } - } - #[allow(dead_code)] #[rustfmt::skip] pub(crate) fn kind(&self) -> CommandKind { diff --git a/openraft/src/engine/engine_output.rs b/openraft/src/engine/engine_output.rs index 4bdac7770..143c37d63 100644 --- a/openraft/src/engine/engine_output.rs +++ b/openraft/src/engine/engine_output.rs @@ -1,7 +1,6 @@ use std::collections::VecDeque; use crate::engine::Command; -use crate::MetricsChangeFlags; use crate::RaftTypeConfig; /// The entry of output from Engine to the runtime. @@ -9,9 +8,6 @@ use crate::RaftTypeConfig; pub(crate) struct EngineOutput where C: RaftTypeConfig { - /// Tracks what kind of metrics changed - pub(crate) metrics_flags: MetricsChangeFlags, - /// Command queue that need to be executed by `RaftRuntime`. pub(crate) commands: VecDeque>, } @@ -21,14 +17,12 @@ where C: RaftTypeConfig { pub(crate) fn new(command_buffer_size: usize) -> Self { Self { - metrics_flags: MetricsChangeFlags::default(), commands: VecDeque::with_capacity(command_buffer_size), } } /// Push a command to the queue. pub(crate) fn push_command(&mut self, cmd: Command) { - cmd.update_metrics_flags(&mut self.metrics_flags); self.commands.push_back(cmd) } diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index 34bba1167..abe14c682 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -160,7 +160,6 @@ fn test_leader_append_entries_fast_commit() -> anyhow::Result<()> { eng.vote_handler().become_leading(); eng.output.clear_commands(); - eng.output.metrics_flags.reset(); // log id will be assigned by eng. eng.leader_handler()?.leader_append_entries(vec![ @@ -306,7 +305,6 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow eng.state.server_state = eng.calc_server_state(); eng.output.clear_commands(); - eng.output.metrics_flags.reset(); // log id will be assigned by eng. eng.leader_handler()?.leader_append_entries(vec![ @@ -392,7 +390,6 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a eng.state.server_state = eng.calc_server_state(); eng.output.clear_commands(); - eng.output.metrics_flags.reset(); // log id will be assigned by eng. eng.leader_handler()?.leader_append_entries(vec![ diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 502f78234..4991b5921 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -58,7 +58,6 @@ where C: RaftTypeConfig } self.state.snapshot_meta = meta; - self.output.metrics_flags.set_data_changed(); true } diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index 28fa47c95..d3e5126ce 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -40,7 +40,6 @@ pub(crate) mod time_state; #[cfg(test)] mod tests { mod append_entries_test; - mod command_test; mod elect_test; mod handle_vote_req_test; mod handle_vote_resp_test; diff --git a/openraft/src/engine/tests/command_test.rs b/openraft/src/engine/tests/command_test.rs deleted file mode 100644 index f923fcbbd..000000000 --- a/openraft/src/engine/tests/command_test.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::engine::testing::blank_ent; -use crate::engine::testing::UTCfg; -use crate::engine::Command; -use crate::engine::Respond; -use crate::error::InstallSnapshotError; -use crate::progress::Inflight; -use crate::raft::VoteRequest; -use crate::raft_types::MetricsChangeFlags; -use crate::testing::log_id; -use crate::Membership; -use crate::SnapshotMeta; -use crate::StoredMembership; -use crate::Vote; - -#[test] -#[rustfmt::skip] -fn test_command_update_metrics_flags() -> anyhow::Result<()> { - // - fn t(c: Command, repl: bool, data: bool, cluster: bool) { - let mut flags = MetricsChangeFlags::default(); - - c.update_metrics_flags(&mut flags); - assert_eq!(repl, flags.replication, "{:?}", c); - assert_eq!(data, flags.local_data, "{:?}", c); - assert_eq!(cluster, flags.cluster, "{:?}", c); - } - - t(Command::BecomeLeader, false, false, true); - t(Command::QuitLeader, false, false, true); - t(Command::AppendEntry { entry: blank_ent(1, 1) }, false, true, false); - t(Command::AppendInputEntries { entries: vec![blank_ent(1, 1)] }, false, true, false); - t(Command::AppendBlankLog { log_id: log_id(1,1) }, false, true, false); - t(Command::ReplicateCommitted { committed: None }, false, false, false); - t(Command::LeaderCommit { already_committed: None, upto: log_id(1,2) }, false, true, false); - t(Command::FollowerCommit { already_committed: None, upto: log_id(1,2) }, false, true, false); - t(Command::Replicate { target: 3, req: Inflight::None }, false, false, false); - t(Command::RebuildReplicationStreams{ targets: vec![] }, true, false, false); - t(Command::UpdateProgressMetrics{ target: 0, matching: log_id(1,2), }, true, false, false); - t(Command::SaveVote{ vote: Vote::new(1,2) }, false, true, false); - t(Command::SendVote{ vote_req: VoteRequest { vote: Vote::new(1,2), last_log_id: None } }, false, false, false); - t(Command::PurgeLog{ upto: log_id(1, 2) }, false, true, false); - t(Command::DeleteConflictLog{ since: log_id(1,2) }, false, true, false); - t(Command::InstallSnapshot{ snapshot_meta: SnapshotMeta { - last_log_id: None, - last_membership: StoredMembership::new(Some(log_id(1,2)), Membership::new(vec![], ())), - snapshot_id: "".to_string(), - } }, false, true, false); - t(Command::CancelSnapshot{ snapshot_meta: SnapshotMeta { - last_log_id: None, - last_membership: StoredMembership::new(Some(log_id(1,2)), Membership::new(vec![], ())), - snapshot_id: "".to_string(), - } }, false, false, false); - t(Command::BuildSnapshot{} , false, true, false); - - let (tx, _rx) = tokio::sync::oneshot::channel(); - t(Command::Respond { when:None, resp: Respond::::new::>(Ok(()), tx) }, false, false, false); - - Ok(()) -} diff --git a/openraft/src/engine/tests/elect_test.rs b/openraft/src/engine/tests/elect_test.rs index 3c5d7a67a..a634a6474 100644 --- a/openraft/src/engine/tests/elect_test.rs +++ b/openraft/src/engine/tests/elect_test.rs @@ -16,7 +16,6 @@ use crate::CommittedLeaderId; use crate::EffectiveMembership; use crate::LogId; use crate::Membership; -use crate::MetricsChangeFlags; use crate::Vote; fn m1() -> Membership { @@ -53,14 +52,6 @@ fn test_elect() -> anyhow::Result<()> { ); assert_eq!(ServerState::Leader, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); assert_eq!( vec![ @@ -116,14 +107,6 @@ fn test_elect() -> anyhow::Result<()> { ); assert_eq!(ServerState::Leader, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); assert_eq!( vec![ @@ -175,14 +158,6 @@ fn test_elect() -> anyhow::Result<()> { ); assert_eq!(ServerState::Candidate, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: true, - cluster: false, - }, - eng.output.metrics_flags - ); assert_eq!( vec![Command::SaveVote { vote: Vote::new(1, 1) }, Command::SendVote { diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index 01a0f4d96..945659577 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -18,7 +18,6 @@ use crate::CommittedLeaderId; use crate::EffectiveMembership; use crate::LogId; use crate::Membership; -use crate::MetricsChangeFlags; use crate::Vote; fn m12() -> Membership { @@ -58,14 +57,6 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { assert!(eng.internal_server_state.is_following()); assert_eq!(ServerState::Follower, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: false, - cluster: false, - }, - eng.output.metrics_flags - ); assert_eq!(0, eng.output.take_commands().len()); } @@ -95,14 +86,6 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { ); assert_eq!(ServerState::Candidate, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: false, - cluster: false, - }, - eng.output.metrics_flags - ); assert!(eng.output.take_commands().is_empty()); } @@ -131,14 +114,6 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { assert!(eng.internal_server_state.is_following()); assert_eq!(ServerState::Follower, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: true, - cluster: false, - }, - eng.output.metrics_flags - ); assert_eq!( vec![Command::SaveVote { vote: Vote::new(3, 2) },], @@ -171,14 +146,6 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { ); assert_eq!(ServerState::Candidate, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: false, - cluster: false, - }, - eng.output.metrics_flags - ); assert!(eng.output.take_commands().is_empty()); } @@ -208,14 +175,6 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { ); assert_eq!(ServerState::Candidate, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: false, - cluster: false, - }, - eng.output.metrics_flags - ); assert_eq!(0, eng.output.take_commands().len()); } @@ -245,14 +204,6 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { ); assert_eq!(ServerState::Leader, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); assert_eq!( vec![ diff --git a/openraft/src/engine/tests/initialize_test.rs b/openraft/src/engine/tests/initialize_test.rs index 3e3240a7c..c125aeb49 100644 --- a/openraft/src/engine/tests/initialize_test.rs +++ b/openraft/src/engine/tests/initialize_test.rs @@ -18,7 +18,6 @@ use crate::vote::CommittedLeaderId; use crate::Entry; use crate::LogId; use crate::Membership; -use crate::MetricsChangeFlags; use crate::Vote; #[test] @@ -57,16 +56,6 @@ fn test_initialize_single_node() -> anyhow::Result<()> { assert_eq!(Some(&log_id(1, 1)), eng.state.last_log_id()); assert_eq!(ServerState::Leader, eng.state.server_state); - assert_eq!( - MetricsChangeFlags { - // Command::UpdateReplicationStreams will set this flag. - // Although there is no replication to create. - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); assert_eq!(&m1(), eng.state.membership_state.effective().membership()); assert_eq!( diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index c3907e016..3f8bad791 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -107,10 +107,8 @@ pub use crate::raft::Raft; pub use crate::raft::RaftTypeConfig; pub use crate::raft_state::MembershipState; pub use crate::raft_state::RaftState; -pub(crate) use crate::raft_types::MetricsChangeFlags; pub use crate::raft_types::SnapshotId; pub use crate::raft_types::SnapshotSegmentId; -pub use crate::raft_types::Update; pub use crate::storage::LogState; pub use crate::storage::RaftLogReader; pub use crate::storage::RaftSnapshotBuilder; diff --git a/openraft/src/raft_types.rs b/openraft/src/raft_types.rs index cbc228246..5a963b399 100644 --- a/openraft/src/raft_types.rs +++ b/openraft/src/raft_types.rs @@ -32,52 +32,3 @@ impl Display for SnapshotSegmentId { write!(f, "{}+{}", self.id, self.offset) } } - -// An update action with option to update with some value or just leave it as is. -#[derive(Debug, Clone, PartialOrd, PartialEq, Eq)] -pub enum Update { - Update(T), - AsIs, -} - -/// Describes the need to update some aspect of the metrics. -#[derive(Debug, Clone)] -#[derive(Default)] -#[derive(PartialEq, Eq)] -pub(crate) struct MetricsChangeFlags { - /// Replication state changes. E.g., adding/removing replication, progress update. - pub(crate) replication: bool, - - /// Local data changes. Such as vote, log, snapshot. - pub(crate) local_data: bool, - - /// State related to cluster: server state, leader, membership etc. - pub(crate) cluster: bool, -} - -impl MetricsChangeFlags { - pub(crate) fn changed(&self) -> bool { - self.replication || self.local_data || self.cluster - } - - pub(crate) fn reset(&mut self) { - self.replication = false; - self.local_data = false; - self.cluster = false; - } - - /// Includes state of replication to other nodes. - pub(crate) fn set_replication_changed(&mut self) { - self.replication = true - } - - /// Includes raft log, snapshot, state machine etc. - pub(crate) fn set_data_changed(&mut self) { - self.local_data = true - } - - /// Includes node role, membership config, leader node etc. - pub(crate) fn set_cluster_changed(&mut self) { - self.cluster = true - } -}