diff --git a/Cargo.lock b/Cargo.lock index c1c5124091..fa89992f8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1921,9 +1921,11 @@ version = "0.0.1" dependencies = [ "clarity", "hashbrown 0.14.3", + "lazy_static", "libc", "libstackerdb", "mutants", + "prometheus", "rand 0.8.5", "rand_core 0.6.4", "secp256k1", @@ -3451,10 +3453,12 @@ dependencies = [ "clap 4.5.0", "clarity", "hashbrown 0.14.3", + "lazy_static", "libsigner", "libstackerdb", "num-traits", "polynomial", + "prometheus", "rand 0.8.5", "rand_core 0.6.4", "reqwest", @@ -3470,6 +3474,7 @@ dependencies = [ "stacks-common", "stackslib", "thiserror", + "tiny_http", "toml 0.5.11", "tracing", "tracing-subscriber", diff --git a/libsigner/Cargo.toml b/libsigner/Cargo.toml index 8a0e5fc657..7da9801674 100644 --- a/libsigner/Cargo.toml +++ b/libsigner/Cargo.toml @@ -18,8 +18,10 @@ path = "./src/libsigner.rs" [dependencies] clarity = { path = "../clarity" } hashbrown = { workspace = true } +lazy_static = "1.4.0" libc = "0.2" libstackerdb = { path = "../libstackerdb" } +prometheus = { version = "0.9", optional = true } serde = "1" serde_derive = "1" serde_stacker = "0.1" @@ -50,3 +52,6 @@ sha2 = { version = "0.10", features = ["asm"] } [target.'cfg(any(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")), any(target_os = "windows")))'.dependencies] sha2 = { version = "0.10" } + +[features] +monitoring_prom = ["prometheus"] \ No newline at end of file diff --git a/stacks-signer/Cargo.toml b/stacks-signer/Cargo.toml index 57b2e80804..087a0a4472 100644 --- a/stacks-signer/Cargo.toml +++ b/stacks-signer/Cargo.toml @@ -24,8 +24,10 @@ backoff = "0.4" clarity = { path = "../clarity" } clap = { version = "4.1.1", features = ["derive", "env"] } hashbrown = { workspace = true } +lazy_static = "1.4.0" libsigner = { path = "../libsigner" } libstackerdb = { path = "../libstackerdb" } +prometheus = { version = "0.9", optional = true } rand_core = "0.6" reqwest = { version = "0.11.22", default-features = false, features = ["blocking", "json", "rustls-tls"] } serde = "1" @@ -37,6 +39,7 @@ slog-term = "2.6.0" stacks-common = { path = "../stacks-common" } stackslib = { path = "../stackslib" } thiserror = "1.0" +tiny_http = { version = "0.12", optional = true } toml = "0.5.6" tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } @@ -60,3 +63,6 @@ features = ["arbitrary_precision", "unbounded_depth"] [dependencies.secp256k1] version = "0.24.3" features = ["serde", "recovery"] + +[features] +monitoring_prom = ["libsigner/monitoring_prom", "prometheus", "tiny_http"] \ No newline at end of file diff --git a/stacks-signer/src/client/stacks_client.rs b/stacks-signer/src/client/stacks_client.rs index cae3126394..9cef3da9ac 100644 --- a/stacks-signer/src/client/stacks_client.rs +++ b/stacks-signer/src/client/stacks_client.rs @@ -207,6 +207,8 @@ impl StacksClient { estimated_len: Some(tx.tx_len()), transaction_payload: to_hex(&tx.payload.serialize_to_vec()), }; + let timer = + crate::monitoring::new_rpc_call_timer(&self.fees_transaction_path(), &self.http_origin); let send_request = || { self.stacks_node_client .post(self.fees_transaction_path()) @@ -219,6 +221,7 @@ impl StacksClient { if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } + timer.stop_and_record(); let fee_estimate_response = response.json::()?; let fee = fee_estimate_response .estimations @@ -268,6 +271,8 @@ impl StacksClient { block, chain_id: self.chain_id, }; + let timer = + crate::monitoring::new_rpc_call_timer(&self.block_proposal_path(), &self.http_origin); let send_request = || { self.stacks_node_client .post(self.block_proposal_path()) @@ -279,6 +284,7 @@ impl StacksClient { }; let response = retry_with_exponential_backoff(send_request)?; + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } @@ -355,6 +361,8 @@ impl StacksClient { /// Get the current peer info data from the stacks node pub fn get_peer_info(&self) -> Result { debug!("Getting stacks node info..."); + let timer = + crate::monitoring::new_rpc_call_timer(&self.core_info_path(), &self.http_origin); let send_request = || { self.stacks_node_client .get(self.core_info_path()) @@ -362,6 +370,7 @@ impl StacksClient { .map_err(backoff::Error::transient) }; let response = retry_with_exponential_backoff(send_request)?; + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } @@ -402,6 +411,10 @@ impl StacksClient { reward_cycle: u64, ) -> Result>, ClientError> { debug!("Getting reward set for reward cycle {reward_cycle}..."); + let timer = crate::monitoring::new_rpc_call_timer( + &self.reward_set_path(reward_cycle), + &self.http_origin, + ); let send_request = || { self.stacks_node_client .get(self.reward_set_path(reward_cycle)) @@ -409,6 +422,7 @@ impl StacksClient { .map_err(backoff::Error::transient) }; let response = retry_with_exponential_backoff(send_request)?; + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } @@ -419,6 +433,8 @@ impl StacksClient { /// Retreive the current pox data from the stacks node pub fn get_pox_data(&self) -> Result { debug!("Getting pox data..."); + #[cfg(feature = "monitoring_prom")] + let timer = crate::monitoring::new_rpc_call_timer(&self.pox_path(), &self.http_origin); let send_request = || { self.stacks_node_client .get(self.pox_path()) @@ -426,6 +442,8 @@ impl StacksClient { .map_err(backoff::Error::transient) }; let response = retry_with_exponential_backoff(send_request)?; + #[cfg(feature = "monitoring_prom")] + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } @@ -458,11 +476,13 @@ impl StacksClient { } /// Helper function to retrieve the account info from the stacks node for a specific address - fn get_account_entry( + pub fn get_account_entry( &self, address: &StacksAddress, ) -> Result { debug!("Getting account info..."); + let timer = + crate::monitoring::new_rpc_call_timer(&self.accounts_path(address), &self.http_origin); let send_request = || { self.stacks_node_client .get(self.accounts_path(address)) @@ -470,6 +490,7 @@ impl StacksClient { .map_err(backoff::Error::transient) }; let response = retry_with_exponential_backoff(send_request)?; + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } @@ -536,6 +557,8 @@ impl StacksClient { pub fn submit_transaction(&self, tx: &StacksTransaction) -> Result { let txid = tx.txid(); let tx = tx.serialize_to_vec(); + let timer = + crate::monitoring::new_rpc_call_timer(&self.transaction_path(), &self.http_origin); let send_request = || { self.stacks_node_client .post(self.transaction_path()) @@ -548,6 +571,7 @@ impl StacksClient { }) }; let response = retry_with_exponential_backoff(send_request)?; + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } @@ -576,12 +600,14 @@ impl StacksClient { let body = json!({"sender": self.stacks_address.to_string(), "arguments": args}).to_string(); let path = self.read_only_path(contract_addr, contract_name, function_name); + let timer = crate::monitoring::new_rpc_call_timer(&path, &self.http_origin); let response = self .stacks_node_client .post(path) .header("Content-Type", "application/json") .body(body) .send()?; + timer.stop_and_record(); if !response.status().is_success() { return Err(ClientError::RequestFailure(response.status())); } diff --git a/stacks-signer/src/config.rs b/stacks-signer/src/config.rs index a028d190d8..819eeadb39 100644 --- a/stacks-signer/src/config.rs +++ b/stacks-signer/src/config.rs @@ -186,6 +186,8 @@ pub struct GlobalConfig { pub auth_password: String, /// The path to the signer's database file pub db_path: PathBuf, + /// Metrics endpoint + pub metrics_endpoint: Option, } /// Internal struct for loading up the config file @@ -221,6 +223,8 @@ struct RawConfigFile { pub auth_password: String, /// The path to the signer's database file or :memory: for an in-memory database pub db_path: String, + /// Metrics endpoint + pub metrics_endpoint: Option, } impl RawConfigFile { @@ -298,6 +302,19 @@ impl TryFrom for GlobalConfig { let sign_timeout = raw_data.sign_timeout_ms.map(Duration::from_millis); let db_path = raw_data.db_path.into(); + let metrics_endpoint = match raw_data.metrics_endpoint { + Some(endpoint) => Some( + endpoint + .to_socket_addrs() + .map_err(|_| ConfigError::BadField("endpoint".to_string(), endpoint.clone()))? + .next() + .ok_or_else(|| { + ConfigError::BadField("endpoint".to_string(), endpoint.clone()) + })?, + ), + None => None, + }; + Ok(Self { node_host: raw_data.node_host, endpoint, @@ -315,6 +332,7 @@ impl TryFrom for GlobalConfig { max_tx_fee_ustx: raw_data.max_tx_fee_ustx, auth_password: raw_data.auth_password, db_path, + metrics_endpoint, }) } } @@ -345,6 +363,10 @@ impl GlobalConfig { 0 => "default".to_string(), _ => (self.tx_fee_ustx as f64 / 1_000_000.0).to_string(), }; + let metrics_endpoint = match &self.metrics_endpoint { + Some(endpoint) => endpoint.to_string(), + None => "None".to_string(), + }; format!( r#" Stacks node host: {node_host} @@ -354,6 +376,7 @@ Public key: {public_key} Network: {network} Database path: {db_path} DKG transaction fee: {tx_fee} uSTX +Metrics endpoint: {metrics_endpoint} "#, node_host = self.node_host, endpoint = self.endpoint, @@ -361,7 +384,8 @@ DKG transaction fee: {tx_fee} uSTX public_key = StacksPublicKey::from_private(&self.stacks_private_key).to_hex(), network = self.network, db_path = self.db_path.to_str().unwrap_or_default(), - tx_fee = tx_fee + tx_fee = tx_fee, + metrics_endpoint = metrics_endpoint, ) } } @@ -384,6 +408,7 @@ pub fn build_signer_config_tomls( mut port_start: usize, max_tx_fee_ustx: Option, tx_fee_ustx: Option, + mut metrics_port_start: Option, ) -> Vec { let mut signer_config_tomls = vec![]; @@ -438,6 +463,17 @@ tx_fee_ustx = {tx_fee_ustx} ) } + if let Some(metrics_port) = metrics_port_start { + let metrics_endpoint = format!("localhost:{}", metrics_port); + signer_config_toml = format!( + r#" +{signer_config_toml} +metrics_endpoint = "{metrics_endpoint}" +"# + ); + metrics_port_start = Some(metrics_port + 1); + } + signer_config_tomls.push(signer_config_toml); } @@ -469,6 +505,7 @@ mod tests { 3000, None, None, + Some(4000), ); let config = @@ -477,6 +514,7 @@ mod tests { assert_eq!(config.auth_password, "melon"); assert!(config.max_tx_fee_ustx.is_none()); assert!(config.tx_fee_ustx.is_none()); + assert_eq!(config.metrics_endpoint, Some("localhost:4000".to_string())); } #[test] @@ -501,6 +539,7 @@ mod tests { 3000, None, None, + None, ); let config = @@ -526,6 +565,7 @@ mod tests { 3000, max_tx_fee_ustx, tx_fee_ustx, + None, ); let config = @@ -546,6 +586,7 @@ mod tests { 3000, max_tx_fee_ustx, None, + None, ); let config = @@ -570,6 +611,7 @@ mod tests { 3000, None, tx_fee_ustx, + None, ); let config = @@ -598,6 +640,7 @@ Public key: 03bc489f27da3701d9f9e577c88de5567cf4023111b7577042d55cde4d823a3505 Network: testnet Database path: :memory: DKG transaction fee: 0.01 uSTX +Metrics endpoint: 0.0.0.0:9090 "# ) ); diff --git a/stacks-signer/src/lib.rs b/stacks-signer/src/lib.rs index 9dcd0a069f..d326eef10b 100644 --- a/stacks-signer/src/lib.rs +++ b/stacks-signer/src/lib.rs @@ -34,3 +34,6 @@ pub mod runloop; pub mod signer; /// The state module for the signer pub mod signerdb; + +/// The monitoring server for the signer +pub mod monitoring; diff --git a/stacks-signer/src/main.rs b/stacks-signer/src/main.rs index 858ea9c66d..3573ac1d61 100644 --- a/stacks-signer/src/main.rs +++ b/stacks-signer/src/main.rs @@ -93,6 +93,10 @@ fn spawn_running_signer(path: &PathBuf) -> SpawnedSigner { let (cmd_send, cmd_recv) = channel(); let (res_send, res_recv) = channel(); let ev = SignerEventReceiver::new(config.network.is_mainnet()); + #[cfg(feature = "monitoring_prom")] + { + stacks_signer::monitoring::start_serving_monitoring_metrics(config.clone()).ok(); + } let runloop = RunLoop::from(config); let mut signer: Signer, RunLoop, SignerEventReceiver> = Signer::new(runloop, ev, cmd_recv, res_send); @@ -305,6 +309,7 @@ fn handle_generate_files(args: GenerateFilesArgs) { 3000, None, None, + None, ); debug!("Built {:?} signer config tomls.", signer_config_tomls.len()); for (i, file_contents) in signer_config_tomls.iter().enumerate() { diff --git a/stacks-signer/src/monitoring/mod.rs b/stacks-signer/src/monitoring/mod.rs new file mode 100644 index 0000000000..484a09d77c --- /dev/null +++ b/stacks-signer/src/monitoring/mod.rs @@ -0,0 +1,188 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#[cfg(feature = "monitoring_prom")] +use ::prometheus::HistogramTimer; +#[cfg(feature = "monitoring_prom")] +use slog::slog_error; +#[cfg(not(feature = "monitoring_prom"))] +use slog::slog_warn; +#[cfg(feature = "monitoring_prom")] +use stacks_common::error; +#[cfg(not(feature = "monitoring_prom"))] +use stacks_common::warn; + +use crate::config::GlobalConfig; + +#[cfg(feature = "monitoring_prom")] +mod prometheus; + +#[cfg(feature = "monitoring_prom")] +mod server; + +/// Update stacks tip height gauge +#[allow(unused_variables)] +pub fn update_stacks_tip_height(height: i64) { + #[cfg(feature = "monitoring_prom")] + prometheus::STACKS_TIP_HEIGHT_GAUGE.set(height); +} + +/// Update the current reward cycle +#[allow(unused_variables)] +pub fn update_reward_cycle(reward_cycle: i64) { + #[cfg(feature = "monitoring_prom")] + prometheus::CURRENT_REWARD_CYCLE.set(reward_cycle); +} + +/// Increment the block validation responses counter +#[allow(unused_variables)] +pub fn increment_block_validation_responses(accepted: bool) { + #[cfg(feature = "monitoring_prom")] + { + let label_value = if accepted { "accepted" } else { "rejected" }; + prometheus::BLOCK_VALIDATION_RESPONSES + .with_label_values(&[label_value]) + .inc(); + } +} + +/// Increment the block responses sent counter +#[allow(unused_variables)] +pub fn increment_block_responses_sent(accepted: bool) { + #[cfg(feature = "monitoring_prom")] + { + let label_value = if accepted { "accepted" } else { "rejected" }; + prometheus::BLOCK_RESPONSES_SENT + .with_label_values(&[label_value]) + .inc(); + } +} + +/// Increment the signer inbound messages counter +#[allow(unused_variables)] +pub fn increment_signer_inbound_messages(amount: i64) { + #[cfg(feature = "monitoring_prom")] + prometheus::SIGNER_INBOUND_MESSAGES.inc_by(amount); +} + +/// Increment the coordinator inbound messages counter +#[allow(unused_variables)] +pub fn increment_coordinator_inbound_messages(amount: i64) { + #[cfg(feature = "monitoring_prom")] + prometheus::COORDINATOR_INBOUND_MESSAGES.inc_by(amount); +} + +/// Increment the number of inbound packets received +#[allow(unused_variables)] +pub fn increment_inbound_packets(amount: i64) { + #[cfg(feature = "monitoring_prom")] + prometheus::INBOUND_PACKETS_RECEIVED.inc_by(amount); +} + +/// Increment the number of commands processed +#[allow(unused_variables)] +pub fn increment_commands_processed(command_type: &str) { + #[cfg(feature = "monitoring_prom")] + prometheus::COMMANDS_PROCESSED + .with_label_values(&[command_type]) + .inc(); +} + +/// Increment the number of DKG votes submitted +#[allow(unused_variables)] +pub fn increment_dkg_votes_submitted() { + #[cfg(feature = "monitoring_prom")] + prometheus::DGK_VOTES_SUBMITTED.inc(); +} + +/// Increment the number of commands processed +#[allow(unused_variables)] +pub fn increment_operation_results(operation_type: &str) { + #[cfg(feature = "monitoring_prom")] + prometheus::OPERATION_RESULTS + .with_label_values(&[operation_type]) + .inc(); +} + +/// Increment the number of block proposals received +#[allow(unused_variables)] +pub fn increment_block_proposals_received() { + #[cfg(feature = "monitoring_prom")] + prometheus::BLOCK_PROPOSALS_RECEIVED.inc(); +} + +/// Update the stx balance of the signer +#[allow(unused_variables)] +pub fn update_signer_stx_balance(balance: i64) { + #[cfg(feature = "monitoring_prom")] + prometheus::SIGNER_STX_BALANCE.set(balance); +} + +/// Update the signer nonce metric +#[allow(unused_variables)] +pub fn update_signer_nonce(nonce: u64) { + #[cfg(feature = "monitoring_prom")] + prometheus::SIGNER_NONCE.set(nonce as i64); +} + +/// Start a new RPC call timer. +/// The `origin` parameter is the base path of the RPC call, e.g. `http://node.com`. +/// The `origin` parameter is removed from `full_path` when storing in prometheus. +#[cfg(feature = "monitoring_prom")] +pub fn new_rpc_call_timer(full_path: &str, origin: &String) -> HistogramTimer { + let path = &full_path[origin.len()..]; + let histogram = prometheus::SIGNER_RPC_CALL_LATENCIES_HISTOGRAM.with_label_values(&[path]); + histogram.start_timer() +} + +/// NoOp timer uses for monitoring when the monitoring feature is not enabled. +pub struct NoOpTimer; +impl NoOpTimer { + /// NoOp method to stop recording when the monitoring feature is not enabled. + pub fn stop_and_record(&self) {} +} + +/// Stop and record the no-op timer. +#[cfg(not(feature = "monitoring_prom"))] +pub fn new_rpc_call_timer(_full_path: &str, _origin: &String) -> NoOpTimer { + NoOpTimer +} + +/// Start serving monitoring metrics. +/// This will only serve the metrics if the `monitoring_prom` feature is enabled. +#[allow(unused_variables)] +pub fn start_serving_monitoring_metrics(config: GlobalConfig) -> Result<(), String> { + #[cfg(feature = "monitoring_prom")] + { + if config.metrics_endpoint.is_none() { + return Ok(()); + } + let thread = std::thread::Builder::new() + .name("signer_metrics".to_string()) + .spawn(move || { + if let Err(monitoring_err) = server::MonitoringServer::start(&config) { + error!("Monitoring: Error in metrics server: {:?}", monitoring_err); + } + }); + } + #[cfg(not(feature = "monitoring_prom"))] + { + if config.metrics_endpoint.is_some() { + warn!("Not starting monitoring metrics server as the monitoring_prom feature is not enabled"); + } + } + Ok(()) +} diff --git a/stacks-signer/src/monitoring/prometheus.rs b/stacks-signer/src/monitoring/prometheus.rs new file mode 100644 index 0000000000..c78db1299d --- /dev/null +++ b/stacks-signer/src/monitoring/prometheus.rs @@ -0,0 +1,105 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use lazy_static::lazy_static; +use prometheus::{ + gather, histogram_opts, opts, register_histogram_vec, register_int_counter, + register_int_counter_vec, register_int_gauge, Encoder, HistogramVec, IntCounter, IntCounterVec, + IntGauge, TextEncoder, +}; + +lazy_static! { + pub static ref STACKS_TIP_HEIGHT_GAUGE: IntGauge = register_int_gauge!(opts!( + "stacks_signer_stacks_node_height", + "The current height of the Stacks node" + )) + .unwrap(); + pub static ref BLOCK_VALIDATION_RESPONSES: IntCounterVec = register_int_counter_vec!( + "stacks_signer_block_validation_responses", + "The number of block validation responses. `response_type` is either 'accepted' or 'rejected'", + &["response_type"] + ) + .unwrap(); + pub static ref BLOCK_RESPONSES_SENT: IntCounterVec = register_int_counter_vec!( + "stacks_signer_block_responses_sent", + "The number of block responses sent. `response_type` is either 'accepted' or 'rejected'", + &["response_type"] + ) + .unwrap(); + pub static ref SIGNER_INBOUND_MESSAGES: IntCounter = register_int_counter!(opts!( + "stacks_signer_inbound_messages", + "The number of inbound messages received by the signer" + )) + .unwrap(); + pub static ref COORDINATOR_INBOUND_MESSAGES: IntCounter = register_int_counter!(opts!( + "stacks_signer_coordinator_inbound_messages", + "The number of inbound messages received as a coordinator" + )) + .unwrap(); + pub static ref INBOUND_PACKETS_RECEIVED: IntCounter = register_int_counter!(opts!( + "stacks_signer_inbound_packets_received", + "The number of inbound packets received by the signer" + )) + .unwrap(); + pub static ref COMMANDS_PROCESSED: IntCounterVec = register_int_counter_vec!( + "stacks_signer_commands_processed", + "The number of commands processed by the signer", + &["command_type"] + ) + .unwrap(); + pub static ref DGK_VOTES_SUBMITTED: IntCounter = register_int_counter!(opts!( + "stacks_signer_dgk_votes_submitted", + "The number of DGK votes submitted by the signer" + )) + .unwrap(); + pub static ref OPERATION_RESULTS: IntCounterVec = register_int_counter_vec!( + "stacks_signer_operation_results_dkg", + "The number of DKG operation results", + &["operation_type"] + ) + .unwrap(); + pub static ref BLOCK_PROPOSALS_RECEIVED: IntCounter = register_int_counter!(opts!( + "stacks_signer_block_proposals_received", + "The number of block proposals received by the signer" + )) + .unwrap(); + pub static ref CURRENT_REWARD_CYCLE: IntGauge = register_int_gauge!(opts!( + "stacks_signer_current_reward_cycle", + "The current reward cycle" + )).unwrap(); + pub static ref SIGNER_STX_BALANCE: IntGauge = register_int_gauge!(opts!( + "stacks_signer_stx_balance", + "The current STX balance of the signer" + )).unwrap(); + pub static ref SIGNER_NONCE: IntGauge = register_int_gauge!(opts!( + "stacks_signer_nonce", + "The current nonce of the signer" + )).unwrap(); + + pub static ref SIGNER_RPC_CALL_LATENCIES_HISTOGRAM: HistogramVec = register_histogram_vec!(histogram_opts!( + "stacks_signer_node_rpc_call_latencies_histogram", + "Time (seconds) measuring round-trip RPC call latency to the Stacks node" + // Will use DEFAULT_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] by default + ), &["path"]).unwrap(); +} + +pub fn gather_metrics_string() -> String { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + let metrics_families = gather(); + encoder.encode(&metrics_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} diff --git a/stacks-signer/src/monitoring/server.rs b/stacks-signer/src/monitoring/server.rs new file mode 100644 index 0000000000..4330f2f8d7 --- /dev/null +++ b/stacks-signer/src/monitoring/server.rs @@ -0,0 +1,254 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::net::SocketAddr; +use std::time::Instant; + +use clarity::util::hash::to_hex; +use clarity::util::secp256k1::Secp256k1PublicKey; +use slog::{slog_debug, slog_error, slog_info, slog_warn}; +use stacks_common::{debug, error, info, warn}; +use tiny_http::{Response as HttpResponse, Server as HttpServer}; + +use super::{update_reward_cycle, update_signer_stx_balance}; +use crate::client::{ClientError, StacksClient}; +use crate::config::{GlobalConfig, Network}; +use crate::monitoring::prometheus::gather_metrics_string; +use crate::monitoring::{update_signer_nonce, update_stacks_tip_height}; + +#[derive(Debug)] +/// Monitoring server errors +pub enum MonitoringError { + /// Already bound to an address + AlreadyBound, + /// Server terminated + Terminated, + /// No endpoint configured + EndpointNotConfigured, + /// Error fetching metrics from stacks node + FetchError(ClientError), +} + +/// Metrics and monitoring server +pub struct MonitoringServer { + http_server: HttpServer, + local_addr: SocketAddr, + stacks_client: StacksClient, + last_metrics_poll: Instant, + network: Network, + public_key: Secp256k1PublicKey, + stacks_node_client: reqwest::blocking::Client, + stacks_node_origin: String, +} + +impl MonitoringServer { + pub fn new( + http_server: HttpServer, + local_addr: SocketAddr, + stacks_client: StacksClient, + network: Network, + public_key: Secp256k1PublicKey, + stacks_node_origin: String, + ) -> Self { + Self { + http_server, + local_addr, + stacks_client, + last_metrics_poll: Instant::now(), + network, + public_key, + stacks_node_client: reqwest::blocking::Client::new(), + stacks_node_origin, + } + } + + /// Start and run the metrics server + pub fn start(config: &GlobalConfig) -> Result<(), MonitoringError> { + let Some(endpoint) = config.metrics_endpoint else { + return Err(MonitoringError::EndpointNotConfigured); + }; + let stacks_client = StacksClient::from(config); + let http_server = HttpServer::http(endpoint).map_err(|_| MonitoringError::AlreadyBound)?; + let public_key = Secp256k1PublicKey::from_private(&config.stacks_private_key); + let mut server = MonitoringServer::new( + http_server, + endpoint, + stacks_client, + config.network.clone(), + public_key, + format!("http://{}", config.node_host), + ); + server.update_metrics()?; + server.main_loop() + } + + // /// Start and run the metrics server + // pub fn run(endpoint: SocketAddr, stacks_client: StacksClient) -> Result<(), MonitoringError> { + // let http_server = HttpServer::http(endpoint).map_err(|_| MonitoringError::AlreadyBound)?; + // let mut server = PrometheusMetrics::new(http_server, endpoint, stacks_client); + // server.main_loop() + // } + + /// Main listener loop of metrics server + pub fn main_loop(&mut self) -> Result<(), MonitoringError> { + info!("{}: Starting Prometheus metrics server", self); + loop { + if let Err(err) = self.refresh_metrics() { + error!("Monitoring: Error refreshing metrics: {:?}", err); + } + let request = match self.http_server.recv() { + Ok(request) => request, + Err(err) => { + error!("Monitoring: Error receiving request: {:?}", err); + return Err(MonitoringError::Terminated); + } + }; + + debug!("{}: received request {}", self, request.url()); + + if request.url() == "/metrics" { + let response = HttpResponse::from_string(gather_metrics_string()); + request.respond(response).expect("Failed to send response"); + continue; + } + + if request.url() == "/info" { + request + .respond(HttpResponse::from_string(self.get_info_response())) + .expect("Failed to respond to request"); + continue; + } + + // return 200 OK for "/" + if request.url() == "/" { + request + .respond(HttpResponse::from_string("OK")) + .expect("Failed to respond to request"); + continue; + } + + // Run heartbeat check to test connection to the node + if request.url() == "/heartbeat" { + let (msg, status) = if self.heartbeat() { + ("OK", 200) + } else { + ("Failed", 500) + }; + request + .respond(HttpResponse::from_string(msg).with_status_code(status)) + .expect("Failed to respond to request"); + continue; + } + + // unknown request, return 404 + request + .respond(HttpResponse::from_string("Not Found").with_status_code(404)) + .expect("Failed to respond to request"); + } + } + + /// Check to see if metrics need to be refreshed + fn refresh_metrics(&mut self) -> Result<(), MonitoringError> { + let now = Instant::now(); + if now.duration_since(self.last_metrics_poll).as_secs() > 60 { + self.last_metrics_poll = now; + self.update_metrics()?; + } + Ok(()) + } + + /// Update metrics by making RPC calls to the Stacks node + fn update_metrics(&self) -> Result<(), MonitoringError> { + debug!("{}: Updating metrics", self); + let peer_info = self + .stacks_client + .get_peer_info() + .map_err(|e| MonitoringError::FetchError(e))?; + if let Ok(height) = i64::try_from(peer_info.stacks_tip_height) { + update_stacks_tip_height(height); + } else { + warn!( + "Failed to parse stacks tip height: {}", + peer_info.stacks_tip_height + ); + } + let pox_info = self + .stacks_client + .get_pox_data() + .map_err(|e| MonitoringError::FetchError(e))?; + if let Ok(reward_cycle) = i64::try_from(pox_info.reward_cycle_id) { + update_reward_cycle(reward_cycle); + } + let signer_stx_addr = self.stacks_client.get_signer_address(); + let account_entry = self + .stacks_client + .get_account_entry(&signer_stx_addr) + .map_err(|e| MonitoringError::FetchError(e))?; + let balance = i64::from_str_radix(&account_entry.balance[2..], 16).map_err(|e| { + MonitoringError::FetchError(ClientError::MalformedClarityValue(format!( + "Failed to parse balance: {} with err: {}", + &account_entry.balance, e, + ))) + })?; + if let Ok(nonce) = u64::try_from(account_entry.nonce) { + update_signer_nonce(nonce); + } else { + warn!("Failed to parse nonce: {}", account_entry.nonce); + } + update_signer_stx_balance(balance); + Ok(()) + } + + /// Build a JSON response for non-metrics requests + fn get_info_response(&self) -> String { + // let public_key = Secp256k1PublicKey::from_private(&self.stacks_client.publ); + serde_json::to_string(&serde_json::json!({ + "signerPublicKey": to_hex(&self.public_key.to_bytes_compressed()), + "network": self.network.to_string(), + "stxAddress": self.stacks_client.get_signer_address().to_string(), + })) + .expect("Failed to serialize JSON") + } + + /// Poll the Stacks node's `v2/info` endpoint to validate the connection + fn heartbeat(&self) -> bool { + let url = format!("{}/v2/info", self.stacks_node_origin); + let response = self.stacks_node_client.get(&url).send(); + match response { + Ok(response) => { + if response.status().is_success() { + true + } else { + warn!( + "Monitoring: Heartbeat failed with status: {}", + response.status() + ); + false + } + } + Err(err) => { + warn!("Monitoring: Heartbeat failed with error: {:?}", err); + false + } + } + } +} + +impl std::fmt::Display for MonitoringServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Signer monitoring server ({})", self.local_addr) + } +} diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 4908965c7c..4f0b6d254d 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -413,6 +413,7 @@ impl Signer { fn execute_command(&mut self, stacks_client: &StacksClient, command: &Command) { match command { Command::Dkg => { + crate::monitoring::increment_commands_processed(&"dkg"); if self.approved_aggregate_public_key.is_some() { debug!("Reward cycle #{} Signer #{}: Already have an aggregate key. Ignoring DKG command.", self.reward_cycle, self.signer_id); return; @@ -449,6 +450,7 @@ impl Signer { is_taproot, merkle_root, } => { + crate::monitoring::increment_commands_processed(&"sign"); if self.approved_aggregate_public_key.is_none() { debug!("{self}: Cannot sign a block without an approved aggregate public key. Ignore it."); return; @@ -548,6 +550,7 @@ impl Signer { ) { let mut block_info = match block_validate_response { BlockValidateResponse::Ok(block_validate_ok) => { + crate::monitoring::increment_block_validation_responses(true); let signer_signature_hash = block_validate_ok.signer_signature_hash; // For mutability reasons, we need to take the block_info out of the map and add it back after processing let mut block_info = match self @@ -578,6 +581,7 @@ impl Signer { block_info } BlockValidateResponse::Reject(block_validate_reject) => { + crate::monitoring::increment_block_validation_responses(false); let signer_signature_hash = block_validate_reject.signer_signature_hash; let mut block_info = match self .signer_db @@ -680,6 +684,9 @@ impl Signer { packets: &[Packet], current_reward_cycle: u64, ) { + if let Ok(packets_len) = packets.len().try_into() { + crate::monitoring::increment_inbound_packets(packets_len); + } let signer_outbound_messages = self .state_machine .process_inbound_messages(packets) @@ -1036,20 +1043,25 @@ impl Signer { // Signers only every trigger non-taproot signing rounds over blocks. Ignore SignTaproot results match operation_result { OperationResult::Sign(signature) => { + crate::monitoring::increment_operation_results(&"sign"); debug!("{self}: Received signature result"); self.process_signature(signature); } OperationResult::SignTaproot(_) => { + crate::monitoring::increment_operation_results(&"sign_taproot"); debug!("{self}: Received a signature result for a taproot signature. Nothing to broadcast as we currently sign blocks with a FROST signature."); } OperationResult::Dkg(aggregate_key) => { + crate::monitoring::increment_operation_results(&"dkg"); self.process_dkg(stacks_client, aggregate_key); } OperationResult::SignError(e) => { + crate::monitoring::increment_operation_results(&"sign_error"); warn!("{self}: Received a Sign error: {e:?}"); self.process_sign_error(e); } OperationResult::DkgError(e) => { + crate::monitoring::increment_operation_results(&"dkg_error"); warn!("{self}: Received a DKG error: {e:?}"); // TODO: process these errors and track malicious signers to report } @@ -1204,6 +1216,7 @@ impl Signer { signer_transactions.push(new_transaction); let signer_message = SignerMessage::Transactions(signer_transactions); self.stackerdb.send_message_with_retry(signer_message)?; + crate::monitoring::increment_dkg_votes_submitted(); info!("{self}: Broadcasted DKG vote transaction ({txid}) to stacker DB"); Ok(()) } @@ -1219,9 +1232,11 @@ impl Signer { }; let block_submission = if block_vote.rejected { + crate::monitoring::increment_block_responses_sent(false); // We signed a rejection message. Return a rejection message BlockResponse::rejected(block_vote.signer_signature_hash, signature.clone()) } else { + crate::monitoring::increment_block_responses_sent(true); // we agreed to sign the block hash. Return an approval message BlockResponse::accepted(block_vote.signer_signature_hash, signature.clone()) }; diff --git a/stacks-signer/src/tests/conf/signer-0.toml b/stacks-signer/src/tests/conf/signer-0.toml index 32183e0e79..19002c1914 100644 --- a/stacks-signer/src/tests/conf/signer-0.toml +++ b/stacks-signer/src/tests/conf/signer-0.toml @@ -4,3 +4,4 @@ endpoint = "localhost:30000" network = "testnet" auth_password = "12345" db_path = ":memory:" +metrics_endpoint = "0.0.0.0:9090" \ No newline at end of file diff --git a/testnet/stacks-node/Cargo.toml b/testnet/stacks-node/Cargo.toml index 72cc8d2491..bceb484cd7 100644 --- a/testnet/stacks-node/Cargo.toml +++ b/testnet/stacks-node/Cargo.toml @@ -62,7 +62,7 @@ name = "stacks-events" path = "src/stacks_events.rs" [features] -monitoring_prom = ["stacks/monitoring_prom"] +monitoring_prom = ["stacks/monitoring_prom", "libsigner/monitoring_prom"] slog_json = ["stacks/slog_json", "stacks-common/slog_json", "clarity/slog_json"] prod-genesis-chainstate = [] default = [] diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index 68515d130c..c6edf9bce9 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -125,6 +125,7 @@ impl SignerTest { 3000, Some(100_000), None, + Some(9000), ); let mut running_signers = Vec::new(); @@ -511,6 +512,23 @@ impl SignerTest { entries.public_keys } + fn get_signer_metrics(&self) -> String { + #[cfg(feature = "monitoring_prom")] + { + let client = reqwest::blocking::Client::new(); + let res = client + .get("http://localhost:9000/metrics") + .send() + .unwrap() + .text() + .unwrap(); + + return res; + } + #[cfg(not(feature = "monitoring_prom"))] + return String::new(); + } + fn generate_invalid_transactions(&self) -> Vec { let host = self .running_nodes @@ -741,6 +759,7 @@ impl SignerTest { 3000 + signer_idx, Some(100_000), None, + Some(9000 + signer_idx), ) .pop() .unwrap(); @@ -782,6 +801,10 @@ fn spawn_signer( let config = SignerConfig::load_from_str(data).unwrap(); let ev = SignerEventReceiver::new(config.network.is_mainnet()); let endpoint = config.endpoint; + #[cfg(feature = "monitoring_prom")] + { + stacks_signer::monitoring::start_serving_monitoring_metrics(config.clone()).ok(); + } let runloop: stacks_signer::runloop::RunLoop = stacks_signer::runloop::RunLoop::from(config); let mut signer: Signer< RunLoopCommand, @@ -1329,7 +1352,8 @@ fn stackerdb_block_proposal() { .init(); info!("------------------------- Test Setup -------------------------"); - let mut signer_test = SignerTest::new(5); + let num_signers = 5; + let mut signer_test = SignerTest::new(num_signers); let timeout = Duration::from_secs(200); let short_timeout = Duration::from_secs(30); @@ -1347,6 +1371,17 @@ fn stackerdb_block_proposal() { .0 .verify(&key, proposed_signer_signature_hash.as_bytes())); + // Test prometheus metrics response + #[cfg(feature = "monitoring_prom")] + { + let metrics_response = signer_test.get_signer_metrics(); + + // Because 5 signers are running in the same process, the prometheus metrics + // are incremented once for every signer. This is why we expect the metric to be + // `5`, even though there is only one block proposed. + let expected_result = format!("stacks_signer_block_proposals_received {}", num_signers); + assert!(metrics_response.contains(&expected_result)); + } signer_test.shutdown(); }