diff --git a/src/admin.rs b/src/admin.rs index 5c820ee7f..b82b29401 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -174,6 +174,7 @@ where let columns = vec![ ("database", DataType::Text), ("user", DataType::Text), + ("cl_idle", DataType::Numeric), ("cl_active", DataType::Numeric), ("cl_waiting", DataType::Numeric), ("cl_cancel_req", DataType::Numeric), diff --git a/src/main.rs b/src/main.rs index db934747b..0b2e1d59f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -133,7 +133,7 @@ async fn main() { let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new())); // Statistics reporting. - let (tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel(100_000); REPORTER.store(Arc::new(Reporter::new(tx.clone()))); // Connection pool that allows to query all shards and replicas. diff --git a/src/stats.rs b/src/stats.rs index 83aa9cb5b..fde4071b8 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,9 +1,10 @@ use arc_swap::ArcSwap; /// Statistics and reporting. -use log::info; +use log::{error, info, trace}; use once_cell::sync::Lazy; use parking_lot::Mutex; use std::collections::HashMap; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::pool::get_number_of_addresses; @@ -43,7 +44,7 @@ enum EventName { /// Event data sent to the collector /// from clients and servers. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Event { /// The name of the event being reported. name: EventName, @@ -79,6 +80,25 @@ impl Reporter { Reporter { tx: tx } } + /// Send statistics to the task keeping track of stats. + fn send(&self, event: Event) { + let name = event.name; + let result = self.tx.try_send(event); + + match result { + Ok(_) => trace!( + "{:?} event reported successfully, capacity: {}", + name, + self.tx.capacity() + ), + + Err(err) => match err { + TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), + TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), + }, + }; + } + /// Report a query executed by a client against /// a server identified by the `address_id`. pub fn query(&self, process_id: i32, address_id: usize) { @@ -89,7 +109,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event); } /// Report a transaction executed by a client against @@ -102,7 +122,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Report data sent to a server identified by `address_id`. @@ -115,7 +135,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Report data received from a server identified by `address_id`. @@ -128,7 +148,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Time spent waiting to get a healthy connection from the pool @@ -142,7 +162,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a client identified by `process_id` waiting for a connection @@ -155,7 +175,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a client identified by `process_id` is done waiting for a connection @@ -168,7 +188,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a client identified by `process_id` is done querying the server @@ -181,7 +201,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a client identified by `process_id` is disconecting from the pooler. @@ -194,7 +214,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a server connection identified by `process_id` for @@ -208,7 +228,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a server connection identified by `process_id` for @@ -222,7 +242,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a server connection identified by `process_id` for @@ -236,7 +256,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a server connection identified by `process_id` for @@ -250,7 +270,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } /// Reports a server connection identified by `process_id` is disconecting from the pooler. @@ -263,7 +283,7 @@ impl Reporter { address_id: address_id, }; - let _ = self.tx.try_send(event); + self.send(event) } }