Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] Update proposer metrics #19655

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 65 additions & 29 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver};
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
use sui_macros::fail_point;
use tokio::{
sync::{broadcast, watch},
time::Instant,
};
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, warn};

use crate::{
Expand Down Expand Up @@ -275,11 +272,34 @@ impl Core {
/// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the
/// pending ancestors list.
fn add_accepted_blocks(&mut self, accepted_blocks: Vec<VerifiedBlock>) {
// Advance the threshold clock. If advanced to a new round then send a signal that a new quorum has been received.
if let Some(new_round) = self
.threshold_clock
.add_blocks(accepted_blocks.iter().map(|b| b.reference()).collect())
// Get max round of accepted blocks. This will be equal to the threshold
// clock round, either by advancing the threshold clock round by being
// greater than current clock round or by equaling the current clock round.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the case? Blocks older than current threshold clock round can get accepted as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only added the case in the comment for greater and equal but blocks less than the clock round are essentially ignored by threshold clock

let max_accepted_round = accepted_blocks
.iter()
.map(|b| b.round())
.max()
.unwrap_or(GENESIS_ROUND);
// Therefore the leader round for which proposals will wait will be max accepted round - 1
// or saturate to GENESIS_ROUND.
let accepted_proposal_leader_round = max_accepted_round.saturating_sub(1);

// Ignore checking for leader blocks with rounds less than the current
// threshold clock round - 1.
let proposal_leaders_exist = if accepted_proposal_leader_round
>= self.threshold_clock.get_round().saturating_sub(1)
{
self.leaders_exist(accepted_proposal_leader_round)
} else {
false
};

// Advance the threshold clock. If advanced to a new round then send a
// signal that a new quorum has been received.
if let Some(new_round) = self.threshold_clock.add_blocks(
accepted_blocks.iter().map(|b| b.reference()).collect(),
proposal_leaders_exist,
) {
// notify that threshold clock advanced to new round
self.signals.new_round(new_round);
}
Expand Down Expand Up @@ -351,10 +371,22 @@ impl Core {
// There must be a quorum of blocks from the previous round.
let quorum_round = self.threshold_clock.get_round().saturating_sub(1);

let leader_authority = &self
.context
.committee
.authority(self.first_leader(quorum_round))
.hostname;

// Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
// or because we are actually ready to produce the block (leader exists and min delay has passed).
if !force {
if !self.leaders_exist(quorum_round) {
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_count
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use a separate metric for counting the number of times leader is not found. block_proposal_leader_wait_count is tied to block_proposal_leader_wait_ms, so when the average wait is ~250ms, we know the leader is missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the confusion for me with these metrics is that it doesn't just include leader wait time, it includes the quorum receive wait time which can make this metric a little misleading. Separating them brings more clarity. Though I guess we could always subtract this metric from quorum receive latency.

.with_label_values(&[leader_authority])
.inc();
return None;
}

Expand All @@ -369,27 +401,18 @@ impl Core {
}
}

let leader_authority = &self
.context
.committee
.authority(self.first_leader(quorum_round))
.hostname;
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_ms
.with_label_values(&[leader_authority])
.inc_by(
Instant::now()
.saturating_duration_since(self.threshold_clock.get_quorum_ts())
.as_millis() as u64,
);
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_count
.with_label_values(&[leader_authority])
.inc();
if let Some(leader_ts) = self.threshold_clock.get_proposal_leaders_ts() {
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_ms
.with_label_values(&[leader_authority])
.inc_by(
leader_ts
.saturating_duration_since(self.threshold_clock.get_quorum_ts())
.as_millis() as u64,
);
}

// TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex
// because we synchronized a bulk of blocks) we can decide here whether we want to produce blocks per round
Expand Down Expand Up @@ -477,6 +500,19 @@ impl Core {
// Ensure the new block and its ancestors are persisted, before broadcasting it.
self.dag_state.write().flush();

let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms());
let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms());
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
current_proposal_duration
.checked_sub(previous_proposal_duration)
.unwrap_or_else(|| Duration::from_millis(0))
.as_secs_f64(),
);

// Update internal state.
self.last_proposed_block = verified_block.clone();

Expand Down
7 changes: 7 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub(crate) struct NodeMetrics {
pub(crate) proposed_block_ancestors_depth: HistogramVec,
pub(crate) highest_verified_authority_round: IntGaugeVec,
pub(crate) lowest_verified_authority_round: IntGaugeVec,
pub(crate) block_proposal_interval: Histogram,
pub(crate) block_proposal_leader_wait_ms: IntCounterVec,
pub(crate) block_proposal_leader_wait_count: IntCounterVec,
pub(crate) block_timestamp_drift_wait_ms: IntCounterVec,
Expand Down Expand Up @@ -232,6 +233,12 @@ impl NodeMetrics {
&["authority"],
registry,
).unwrap(),
block_proposal_interval: register_histogram_with_registry!(
"block_proposal_interval",
"Intervals (in secs) between block proposals.",
FINE_GRAINED_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
block_proposal_leader_wait_ms: register_int_counter_vec_with_registry!(
"block_proposal_leader_wait_ms",
"Total time in ms spent waiting for a leader when proposing blocks.",
Expand Down
151 changes: 98 additions & 53 deletions consensus/core/src/threshold_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) struct ThresholdClock {
aggregator: StakeAggregator<QuorumThreshold>,
round: Round,
quorum_ts: Instant,
// Records the first time the leaders of round - 1 exist for proposal.
proposal_leaders_ts: Option<Instant>,
context: Arc<Context>,
}

Expand All @@ -24,33 +26,59 @@ impl ThresholdClock {
aggregator: StakeAggregator::new(),
round,
quorum_ts: Instant::now(),
proposal_leaders_ts: None,
context,
}
}

/// Add the block references that have been successfully processed and advance the round accordingly. If the round
/// has indeed advanced then the new round is returned, otherwise None is returned.
pub(crate) fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
pub(crate) fn add_blocks(
&mut self,
blocks: Vec<BlockRef>,
proposal_leaders_exist: bool,
) -> Option<Round> {
let previous_round = self.round;
for block_ref in blocks {
self.add_block(block_ref);
self.add_block(block_ref, proposal_leaders_exist);
}
(self.round > previous_round).then_some(self.round)
}

pub(crate) fn add_block(&mut self, block: BlockRef) {
pub(crate) fn get_round(&self) -> Round {
self.round
}

pub(crate) fn get_quorum_ts(&self) -> Instant {
self.quorum_ts
}

pub(crate) fn get_proposal_leaders_ts(&self) -> Option<Instant> {
self.proposal_leaders_ts
}

fn add_block(&mut self, block: BlockRef, proposal_leaders_exist: bool) {
match block.round.cmp(&self.round) {
// Blocks with round less then what we currently build are irrelevant here
Ordering::Less => {}
// If we processed block for round r, we also have stored 2f+1 blocks from r-1
Ordering::Greater => {
self.aggregator.clear();
self.aggregator.add(block.author, &self.context.committee);
if proposal_leaders_exist {
self.proposal_leaders_ts = Some(Instant::now());
} else {
self.proposal_leaders_ts = None;
}
self.round = block.round;
}
Ordering::Equal => {
if proposal_leaders_exist && self.proposal_leaders_ts.is_none() {
self.proposal_leaders_ts = Some(Instant::now());
}
if self.aggregator.add(block.author, &self.context.committee) {
self.aggregator.clear();
self.proposal_leaders_ts = None;
// We have seen 2f+1 blocks for current round, advance
self.round = block.round + 1;

Expand All @@ -66,74 +94,89 @@ impl ThresholdClock {
}
}
}

pub(crate) fn get_round(&self) -> Round {
self.round
}

pub(crate) fn get_quorum_ts(&self) -> Instant {
self.quorum_ts
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use consensus_config::AuthorityIndex;
use tokio::time::sleep;

use super::*;
use crate::block::BlockDigest;
use consensus_config::AuthorityIndex;

#[tokio::test]
async fn test_threshold_clock_add_block() {
let context = Arc::new(Context::new_for_test(4).0);
let mut aggregator = ThresholdClock::new(0, context);

aggregator.add_block(BlockRef::new(
0,
AuthorityIndex::new_for_test(0),
BlockDigest::default(),
));
aggregator.add_block(
BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
true,
);
let leader_ts_round_0 = aggregator
.get_proposal_leaders_ts()
.expect("Leader ts should be set");
assert_eq!(aggregator.get_round(), 0);
aggregator.add_block(BlockRef::new(
0,
AuthorityIndex::new_for_test(1),
BlockDigest::default(),
));
aggregator.add_block(
BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
true,
);
assert_eq!(aggregator.get_round(), 0);
aggregator.add_block(BlockRef::new(
0,
AuthorityIndex::new_for_test(2),
BlockDigest::default(),
));
assert_eq!(
leader_ts_round_0,
aggregator
.get_proposal_leaders_ts()
.expect("Leader ts should be set")
);
aggregator.add_block(
BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
true,
);
assert!(aggregator.get_proposal_leaders_ts().is_none());
assert_eq!(aggregator.get_round(), 1);
aggregator.add_block(BlockRef::new(
1,
AuthorityIndex::new_for_test(0),
BlockDigest::default(),
));
sleep(Duration::from_millis(10)).await;
aggregator.add_block(
BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
true,
);
let leader_ts_round_1 = aggregator
.get_proposal_leaders_ts()
.expect("Leader ts should be set");
assert!(leader_ts_round_1 > leader_ts_round_0);
assert_eq!(aggregator.get_round(), 1);
aggregator.add_block(BlockRef::new(
1,
AuthorityIndex::new_for_test(3),
BlockDigest::default(),
));
aggregator.add_block(
BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
true,
);
assert_eq!(
leader_ts_round_1,
aggregator
.get_proposal_leaders_ts()
.expect("Leader ts should be set")
);
assert_eq!(aggregator.get_round(), 1);
aggregator.add_block(BlockRef::new(
2,
AuthorityIndex::new_for_test(1),
BlockDigest::default(),
));
sleep(Duration::from_millis(10)).await;
aggregator.add_block(
BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
true,
);
let leader_ts_round_2 = aggregator
.get_proposal_leaders_ts()
.expect("Leader ts should be set");
assert!(leader_ts_round_2 > leader_ts_round_1);
assert_eq!(aggregator.get_round(), 2);
aggregator.add_block(BlockRef::new(
1,
AuthorityIndex::new_for_test(1),
BlockDigest::default(),
));
aggregator.add_block(
BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
true,
);
assert_eq!(aggregator.get_round(), 2);
aggregator.add_block(BlockRef::new(
5,
AuthorityIndex::new_for_test(2),
BlockDigest::default(),
));
aggregator.add_block(
BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
false,
);
assert!(aggregator.get_proposal_leaders_ts().is_none());
assert_eq!(aggregator.get_round(), 5);
}

Expand All @@ -150,10 +193,12 @@ mod tests {
BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::default()),
BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
];

let result = aggregator.add_blocks(block_refs);
let result = aggregator.add_blocks(block_refs, true);
assert!(aggregator.get_proposal_leaders_ts().is_some());
assert_eq!(Some(5), result);
}
}
Loading