Skip to content

Commit

Permalink
More metrics on PP loop
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 14, 2024
1 parent cfe427d commit 2dfb9e9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
17 changes: 16 additions & 1 deletion crates/worker/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@

/// Optional to have but adds description/help message to the metrics emitted to
/// the metrics' sink.
use metrics::{describe_counter, Unit};
use metrics::{describe_counter, describe_histogram, Unit};

pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.total";
pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total";
pub const PARTITION_TIMER_DUE_HANDLED: &str = "restate.partition.timer_due_handled.total";
pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total";
pub const PARTITION_STORAGE_TX_COMMITTED: &str = "restate.partition.storage_tx_committed.total";

pub const PP_APPLY_RECORD_DURATION: &str = "restate.partition.apply_record_duration.seconds";
pub const PP_APPLY_ACTIONS_DURATION: &str = "restate.partition.apply_actions_duration.seconds";

pub const PARTITION_LABEL: &str = "partition";

pub(crate) fn describe_metrics() {
describe_counter!(
PARTITION_APPLY_COMMAND,
Expand All @@ -44,4 +49,14 @@ pub(crate) fn describe_metrics() {
Unit::Count,
"Storage transactions committed by applying partition state machine commands"
);
describe_histogram!(
PP_APPLY_RECORD_DURATION,
Unit::Seconds,
"Time spent processing a single bifrost message"
);
describe_histogram!(
PP_APPLY_ACTIONS_DURATION,
Unit::Seconds,
"Time spent applying actions/effects in a single iteration"
);
}
17 changes: 14 additions & 3 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_TIMER_DUE_HANDLED};
use crate::metric_definitions::{
PARTITION_ACTUATOR_HANDLED, PARTITION_LABEL, PARTITION_TIMER_DUE_HANDLED,
PP_APPLY_ACTIONS_DURATION, PP_APPLY_RECORD_DURATION,
};
use crate::partition::leadership::{ActionEffect, LeadershipState};
use crate::partition::state_machine::{ActionCollector, Effects, StateMachine};
use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction};
use assert2::let_assert;
use futures::StreamExt;
use metrics::counter;
use metrics::{counter, histogram};
use restate_core::metadata;
use restate_network::Networking;
use restate_partition_store::{PartitionStore, RocksDBTransaction};
use restate_types::identifiers::{PartitionId, PartitionKey};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::time::Instant;
use tracing::{debug, instrument, trace, Span};

mod action_effect_handler;
Expand Down Expand Up @@ -132,10 +136,13 @@ where
networking,
);

let mut cancellation = std::pin::pin!(cancellation_watcher());
let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string()));
loop {
tokio::select! {
_ = cancellation_watcher() => break,
_ = &mut cancellation => break,
record = log_reader.read_next() => {
let command_start = Instant::now();
let record = record?;
trace!(lsn = %record.0, "Processing bifrost record for '{}': {:?}", record.1.command.name(), record.1.header);

Expand Down Expand Up @@ -182,10 +189,14 @@ where
debug!(leader_epoch = %new_esn.leader_epoch, "Partition leadership lost to {}", announce_leader.node_id);
}
}
histogram!(PP_APPLY_RECORD_DURATION, PARTITION_LABEL => partition_id_str).record(command_start.elapsed());
} else {
// Commit our changes and notify actuators about actions if we are the leader
transaction.commit().await?;
histogram!(PP_APPLY_RECORD_DURATION, PARTITION_LABEL => partition_id_str).record(command_start.elapsed());
let actions_start = Instant::now();
state.handle_actions(action_collector.drain(..)).await?;
histogram!(PP_APPLY_ACTIONS_DURATION).record(actions_start.elapsed());
}
},
action_effect = action_effect_stream.next() => {
Expand Down

0 comments on commit 2dfb9e9

Please sign in to comment.