Skip to content

Commit

Permalink
Merge pull request #784 from drmingdrmer/57-metrics
Browse files Browse the repository at this point in the history
Refator: always flush full metrics
  • Loading branch information
drmingdrmer authored Apr 24, 2023
2 parents ff8c233 + e691ac4 commit 0bdf5b7
Show file tree
Hide file tree
Showing 13 changed files with 5 additions and 274 deletions.
2 changes: 1 addition & 1 deletion cluster_benchmark/tests/benchmark/bench_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async fn do_bench(bench_config: &BenchConfig) -> anyhow::Result<()> {
handles.push(h)
}

leader.wait(timeout()).log_at_least(Some(total as u64), "commit all written logs").await?;
leader.wait(timeout()).log_at_least(Some(total), "commit all written logs").await?;

let elapsed = now.elapsed();

Expand Down
42 changes: 4 additions & 38 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StorageIOError;
use crate::Update;
use crate::Vote;

/// A temp struct to hold the data for a node that is being applied.
Expand Down Expand Up @@ -213,7 +212,7 @@ where
let res = self.do_main(rx_shutdown).instrument(span).await;

// Flush buffered metrics
self.report_metrics(Update::AsIs);
self.report_metrics(None);

tracing::info!("update the metrics for shutdown");
{
Expand Down Expand Up @@ -243,7 +242,7 @@ where
self.run_engine_commands().await?;

// Initialize metrics.
self.report_metrics(Update::Update(None));
self.report_metrics(None);

self.runtime_loop(rx_shutdown).await
}
Expand Down Expand Up @@ -495,34 +494,13 @@ where
/// Then clear flags about the cached changes, to avoid unnecessary metrics report.
#[tracing::instrument(level = "debug", skip_all)]
pub fn flush_metrics(&mut self) {
if !self.engine.output.metrics_flags.changed() {
return;
}

let leader_metrics = if self.engine.output.metrics_flags.replication {
let replication_metrics = self.leader_data.as_ref().map(|x| x.replication_metrics.clone());
Update::Update(replication_metrics)
} else {
#[allow(clippy::collapsible_else_if)]
if self.leader_data.is_some() {
Update::AsIs
} else {
Update::Update(None)
}
};

let leader_metrics = self.leader_data.as_ref().map(|x| x.replication_metrics.clone());
self.report_metrics(leader_metrics);
self.engine.output.metrics_flags.reset();
}

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&self, replication: Update<Option<Versioned<ReplicationMetrics<C::NodeId>>>>) {
let replication = match replication {
Update::Update(v) => v,
Update::AsIs => self.tx_metrics.borrow().replication.clone(),
};

pub(crate) fn report_metrics(&self, replication: Option<Versioned<ReplicationMetrics<C::NodeId>>>) {
let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand All @@ -542,14 +520,6 @@ where
replication,
};

{
let curr = self.tx_metrics.borrow();
if m == *curr {
tracing::debug!("metrics not changed: {}", m.summary());
return;
}
}

tracing::debug!("report_metrics: {}", m.summary());
let res = self.tx_metrics.send(m);

Expand Down Expand Up @@ -1245,12 +1215,10 @@ where
sm::Response::InstallSnapshot(meta) => {
if let Some(meta) = meta {
self.engine.state.io_state_mut().update_applied(meta.last_log_id);
self.engine.output.metrics_flags.set_data_changed();
}
}
sm::Response::Apply(res) => {
self.engine.state.io_state_mut().update_applied(Some(res.last_applied));
self.engine.output.metrics_flags.set_data_changed();

self.handle_apply_result(res);
}
Expand Down Expand Up @@ -1374,8 +1342,6 @@ where
// This method is only called after `update_progress()`.
// And this node may become a non-leader after `update_progress()`
}

self.engine.output.metrics_flags.set_replication_changed()
}

/// If a message is sent by a previous server state but is received by current server state,
Expand Down
29 changes: 0 additions & 29 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::LeaderId;
use crate::LogId;
use crate::MetricsChangeFlags;
use crate::Node;
use crate::NodeId;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -164,34 +163,6 @@ where
impl<C> Command<C>
where C: RaftTypeConfig
{
/// Update the flag of the metrics that needs to be updated when this command is executed.
pub(crate) fn update_metrics_flags(&self, flags: &mut MetricsChangeFlags) {
match &self {
Command::BecomeLeader { .. } => flags.set_cluster_changed(),
Command::QuitLeader => flags.set_cluster_changed(),
Command::AppendEntry { .. } => flags.set_data_changed(),
Command::AppendInputEntries { entries } => {
debug_assert!(!entries.is_empty());
flags.set_data_changed()
}
Command::AppendBlankLog { .. } => flags.set_data_changed(),
Command::ReplicateCommitted { .. } => {}
Command::LeaderCommit { .. } => flags.set_data_changed(),
Command::FollowerCommit { .. } => flags.set_data_changed(),
Command::Replicate { .. } => {}
Command::RebuildReplicationStreams { .. } => flags.set_replication_changed(),
Command::UpdateProgressMetrics { .. } => flags.set_replication_changed(),
Command::SaveVote { .. } => flags.set_data_changed(),
Command::SendVote { .. } => {}
Command::PurgeLog { .. } => flags.set_data_changed(),
Command::DeleteConflictLog { .. } => flags.set_data_changed(),
Command::BuildSnapshot { .. } => flags.set_data_changed(),
Command::InstallSnapshot { .. } => flags.set_data_changed(),
Command::CancelSnapshot { .. } => {}
Command::Respond { .. } => {}
}
}

#[allow(dead_code)]
#[rustfmt::skip]
pub(crate) fn kind(&self) -> CommandKind {
Expand Down
6 changes: 0 additions & 6 deletions openraft/src/engine/engine_output.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use std::collections::VecDeque;

use crate::engine::Command;
use crate::MetricsChangeFlags;
use crate::RaftTypeConfig;

/// The entry of output from Engine to the runtime.
#[derive(Debug, Default)]
pub(crate) struct EngineOutput<C>
where C: RaftTypeConfig
{
/// Tracks what kind of metrics changed
pub(crate) metrics_flags: MetricsChangeFlags,

/// Command queue that need to be executed by `RaftRuntime`.
pub(crate) commands: VecDeque<Command<C>>,
}
Expand All @@ -21,14 +17,12 @@ where C: RaftTypeConfig
{
pub(crate) fn new(command_buffer_size: usize) -> Self {
Self {
metrics_flags: MetricsChangeFlags::default(),
commands: VecDeque::with_capacity(command_buffer_size),
}
}

/// Push a command to the queue.
pub(crate) fn push_command(&mut self, cmd: Command<C>) {
cmd.update_metrics_flags(&mut self.metrics_flags);
self.commands.push_back(cmd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ fn test_leader_append_entries_fast_commit() -> anyhow::Result<()> {
eng.vote_handler().become_leading();

eng.output.clear_commands();
eng.output.metrics_flags.reset();

// log id will be assigned by eng.
eng.leader_handler()?.leader_append_entries(vec![
Expand Down Expand Up @@ -306,7 +305,6 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow
eng.state.server_state = eng.calc_server_state();

eng.output.clear_commands();
eng.output.metrics_flags.reset();

// log id will be assigned by eng.
eng.leader_handler()?.leader_append_entries(vec![
Expand Down Expand Up @@ -392,7 +390,6 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a
eng.state.server_state = eng.calc_server_state();

eng.output.clear_commands();
eng.output.metrics_flags.reset();

// log id will be assigned by eng.
eng.leader_handler()?.leader_append_entries(vec![
Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/handler/snapshot_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ where C: RaftTypeConfig
}

self.state.snapshot_meta = meta;
self.output.metrics_flags.set_data_changed();

true
}
Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub(crate) mod time_state;
#[cfg(test)]
mod tests {
mod append_entries_test;
mod command_test;
mod elect_test;
mod handle_vote_req_test;
mod handle_vote_resp_test;
Expand Down
59 changes: 0 additions & 59 deletions openraft/src/engine/tests/command_test.rs

This file was deleted.

25 changes: 0 additions & 25 deletions openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LogId;
use crate::Membership;
use crate::MetricsChangeFlags;
use crate::Vote;

fn m1() -> Membership<u64, ()> {
Expand Down Expand Up @@ -53,14 +52,6 @@ fn test_elect() -> anyhow::Result<()> {
);

assert_eq!(ServerState::Leader, eng.state.server_state);
assert_eq!(
MetricsChangeFlags {
replication: true,
local_data: true,
cluster: true,
},
eng.output.metrics_flags
);

assert_eq!(
vec![
Expand Down Expand Up @@ -116,14 +107,6 @@ fn test_elect() -> anyhow::Result<()> {
);

assert_eq!(ServerState::Leader, eng.state.server_state);
assert_eq!(
MetricsChangeFlags {
replication: true,
local_data: true,
cluster: true,
},
eng.output.metrics_flags
);

assert_eq!(
vec![
Expand Down Expand Up @@ -175,14 +158,6 @@ fn test_elect() -> anyhow::Result<()> {
);

assert_eq!(ServerState::Candidate, eng.state.server_state);
assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: true,
cluster: false,
},
eng.output.metrics_flags
);

assert_eq!(
vec![Command::SaveVote { vote: Vote::new(1, 1) }, Command::SendVote {
Expand Down
Loading

0 comments on commit 0bdf5b7

Please sign in to comment.