diff --git a/Cargo.lock b/Cargo.lock index 9124996f..dbf0a14b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic_enum" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6227a8d6fdb862bcb100c4314d0d9579e5cd73fa6df31a2e6f6e1acd3c5f1207" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -720,6 +731,7 @@ version = "1.0.0" dependencies = [ "arc-swap", "async-trait", + "atomic_enum", "base64", "bb8", "bytes", diff --git a/Cargo.toml b/Cargo.toml index e5b1de66..3f0cbec6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ exitcode = "1.1.2" futures = "0.3" socket2 = { version = "0.4.7", features = ["all"] } nix = "0.26.2" +atomic_enum = "0.2.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5.0" diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index da759383..15621e87 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -26,6 +26,8 @@ x-common-env-pg: services: main: image: kubernetes/pause + ports: + - 6432 pg1: <<: *common-definition-pg diff --git a/src/admin.rs b/src/admin.rs index feea3a15..03af755c 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,10 +1,11 @@ use crate::pool::BanReason; -/// Admin database. use bytes::{Buf, BufMut, BytesMut}; use log::{error, info, trace}; use nix::sys::signal::{self, Signal}; use nix::unistd::Pid; use std::collections::HashMap; +/// Admin database. +use std::sync::atomic::Ordering; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::Instant; @@ -12,9 +13,7 @@ use crate::config::{get_config, reload_config, VERSION}; use crate::errors::Error; use crate::messages::*; use crate::pool::{get_all_pools, get_pool}; -use crate::stats::{ - get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState, -}; +use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState}; use crate::ClientServerMap; pub fn generate_server_info_for_admin() -> BytesMut { @@ -158,7 +157,14 @@ where "free_clients".to_string(), client_stats .keys() - .filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle) + .filter(|client_id| { + client_stats + .get(client_id) + .unwrap() + .state + .load(Ordering::Relaxed) + == ClientState::Idle + }) .count() .to_string(), ])); @@ -166,7 +172,14 @@ where "used_clients".to_string(), client_stats .keys() - .filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active) + .filter(|client_id| { + client_stats + .get(client_id) + .unwrap() + .state + .load(Ordering::Relaxed) + == ClientState::Active + }) .count() .to_string(), ])); @@ -178,7 +191,14 @@ where "free_servers".to_string(), server_stats .keys() - .filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle) + .filter(|server_id| { + server_stats + .get(server_id) + .unwrap() + .state + .load(Ordering::Relaxed) + == ServerState::Idle + }) .count() .to_string(), ])); @@ -186,7 +206,14 @@ where "used_servers".to_string(), server_stats .keys() - .filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active) + .filter(|server_id| { + server_stats + .get(server_id) + .unwrap() + .state + .load(Ordering::Relaxed) + == ServerState::Active + }) .count() .to_string(), ])); @@ -248,28 +275,15 @@ where let mut res = BytesMut::new(); res.put(row_description(&columns)); - for (user_pool, pool) in get_all_pools() { - let def = HashMap::default(); - let pool_stats = all_pool_stats - .get(&(user_pool.db.clone(), user_pool.user.clone())) - .unwrap_or(&def); - let pool_config = &pool.settings; + for ((_user_pool, _pool), pool_stats) in all_pool_stats { let mut row = vec![ - user_pool.db.clone(), - user_pool.user.clone(), - pool_config.pool_mode.to_string(), + pool_stats.database(), + pool_stats.user(), + pool_stats.pool_mode().to_string(), ]; - for column in &columns[3..columns.len()] { - let value = match column.0 { - "maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(), - "maxwait_us" => { - (pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string() - } - _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(), - }; - row.push(value); - } + pool_stats.populate_row(&mut row); + pool_stats.clear_maxwait(); res.put(data_row(&row)); } @@ -400,7 +414,7 @@ where for (id, pool) in get_all_pools().iter() { for address in pool.get_addresses_from_host(host) { if !pool.is_banned(&address) { - pool.ban(&address, BanReason::AdminBan(duration_seconds), -1); + pool.ban(&address, BanReason::AdminBan(duration_seconds), None); res.put(data_row(&vec![ id.db.clone(), id.user.clone(), @@ -617,7 +631,6 @@ where ("avg_wait_time", DataType::Numeric), ]; - let all_stats = get_address_stats(); let mut res = BytesMut::new(); res.put(row_description(&columns)); @@ -625,15 +638,10 @@ where for shard in 0..pool.shards() { for server in 0..pool.servers(shard) { let address = pool.address(shard, server); - let stats = match all_stats.get(&address.id) { - Some(stats) => stats.clone(), - None => HashMap::new(), - }; let mut row = vec![address.name(), user_pool.db.clone(), user_pool.user.clone()]; - for column in &columns[3..] { - row.push(stats.get(column.0).unwrap_or(&0).to_string()); - } + let stats = address.stats.clone(); + stats.populate_row(&mut row); res.put(data_row(&row)); } @@ -673,16 +681,16 @@ where for (_, client) in new_map { let row = vec![ - format!("{:#010X}", client.client_id), - client.pool_name, - client.username, - client.application_name.clone(), - client.state.to_string(), - client.transaction_count.to_string(), - client.query_count.to_string(), - client.error_count.to_string(), + format!("{:#010X}", client.client_id()), + client.pool_name(), + client.username(), + client.application_name(), + client.state.load(Ordering::Relaxed).to_string(), + client.transaction_count.load(Ordering::Relaxed).to_string(), + client.query_count.load(Ordering::Relaxed).to_string(), + client.error_count.load(Ordering::Relaxed).to_string(), Instant::now() - .duration_since(client.connect_time) + .duration_since(client.connect_time()) .as_secs() .to_string(), ]; @@ -724,19 +732,20 @@ where res.put(row_description(&columns)); for (_, server) in new_map { + let application_name = server.application_name.read(); let row = vec![ - format!("{:#010X}", server.server_id), - server.pool_name, - server.username, - server.address_name, - server.application_name, - server.state.to_string(), - server.transaction_count.to_string(), - server.query_count.to_string(), - server.bytes_sent.to_string(), - server.bytes_received.to_string(), + format!("{:#010X}", server.server_id()), + server.pool_name(), + server.username(), + server.address_name(), + application_name.clone(), + server.state.load(Ordering::Relaxed).to_string(), + server.transaction_count.load(Ordering::Relaxed).to_string(), + server.query_count.load(Ordering::Relaxed).to_string(), + server.bytes_sent.load(Ordering::Relaxed).to_string(), + server.bytes_received.load(Ordering::Relaxed).to_string(), Instant::now() - .duration_since(server.connect_time) + .duration_since(server.connect_time()) .as_secs() .to_string(), ]; diff --git a/src/client.rs b/src/client.rs index 68f6f575..f9f4e015 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,8 +3,8 @@ use crate::pool::BanReason; /// Handle clients by pretending to be a PostgreSQL server. use bytes::{Buf, BufMut, BytesMut}; use log::{debug, error, info, trace, warn}; - use std::collections::HashMap; +use std::sync::Arc; use std::time::Instant; use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; use tokio::net::TcpStream; @@ -19,7 +19,7 @@ use crate::messages::*; use crate::pool::{get_pool, ClientServerMap, ConnectionPool}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; -use crate::stats::{get_reporter, Reporter}; +use crate::stats::{ClientStats, PoolStats, ServerStats}; use crate::tls::Tls; use tokio_rustls::server::TlsStream; @@ -66,8 +66,8 @@ pub struct Client { #[allow(dead_code)] parameters: HashMap, - /// Statistics - stats: Reporter, + /// Statistics related to this client + stats: Arc, /// Clients want to talk to admin database. admin: bool, @@ -75,8 +75,8 @@ pub struct Client { /// Last address the client talked to. last_address_id: Option, - /// Last server process id we talked to. - last_server_id: Option, + /// Last server process stats we talked to. + last_server_stats: Option>, /// Connected to server connected_to_server: bool, @@ -135,6 +135,10 @@ pub async fn client_entrypoint( if !client.is_admin() { let _ = drain.send(-1).await; + + if result.is_err() { + client.stats.disconnect(); + } } result @@ -183,6 +187,10 @@ pub async fn client_entrypoint( if !client.is_admin() { let _ = drain.send(-1).await; + + if result.is_err() { + client.stats.disconnect(); + } } result @@ -233,6 +241,10 @@ pub async fn client_entrypoint( if !client.is_admin() { let _ = drain.send(-1).await; + + if result.is_err() { + client.stats.disconnect(); + } } result @@ -258,8 +270,11 @@ pub async fn client_entrypoint( if !client.is_admin() { let _ = drain.send(-1).await; - } + if result.is_err() { + client.stats.disconnect(); + } + } result } @@ -382,7 +397,6 @@ where shutdown: Receiver<()>, admin_only: bool, ) -> Result, Error> { - let stats = get_reporter(); let parameters = parse_startup(bytes.clone())?; // This parameter is mandatory by the protocol. @@ -537,6 +551,25 @@ where ready_for_query(&mut write).await?; trace!("Startup OK"); + let pool_stats = match get_pool(pool_name, username) { + Some(pool) => { + if !admin { + pool.stats + } else { + Arc::new(PoolStats::default()) + } + } + None => Arc::new(PoolStats::default()), + }; + + let stats = Arc::new(ClientStats::new( + process_id, + application_name, + username, + pool_name, + tokio::time::Instant::now(), + pool_stats, + )); Ok(Client { read: BufReader::new(read), @@ -552,7 +585,7 @@ where stats, admin, last_address_id: None, - last_server_id: None, + last_server_stats: None, pool_name: pool_name.clone(), username: username.clone(), application_name: application_name.to_string(), @@ -583,10 +616,10 @@ where secret_key, client_server_map, parameters: HashMap::new(), - stats: get_reporter(), + stats: Arc::new(ClientStats::default()), admin: false, last_address_id: None, - last_server_id: None, + last_server_stats: None, pool_name: String::from("undefined"), username: String::from("undefined"), application_name: String::from("undefined"), @@ -627,12 +660,8 @@ where // The query router determines where the query is going to go, // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); - self.stats.client_register( - self.process_id, - self.pool_name.clone(), - self.username.clone(), - self.application_name.clone(), - ); + + self.stats.register(self.stats.clone()); // Our custom protocol loop. // We expect the client to either start a transaction with regular queries @@ -656,6 +685,8 @@ where &mut self.write, "terminating connection due to administrator command" ).await?; + self.stats.disconnect(); + return Ok(()) } @@ -708,6 +739,9 @@ where 'X' => { debug!("Client disconnecting"); + + self.stats.disconnect(); + return Ok(()); } @@ -757,7 +791,7 @@ where current_shard, ), ) - .await?; + .await?; } else { custom_protocol_response_ok(&mut self.write, "SET SHARD").await?; } @@ -802,10 +836,13 @@ where }; debug!("Waiting for connection from pool"); + if !self.admin { + self.stats.waiting(); + } // Grab a server from the pool. let connection = match pool - .get(query_router.shard(), query_router.role(), self.process_id) + .get(query_router.shard(), query_router.role(), &self.stats) .await { Ok(conn) => { @@ -817,6 +854,8 @@ where // but we were unable to grab a connection from the pool // We'll send back an error message and clean the extended // protocol buffer + self.stats.idle(); + if message[0] as char == 'S' { error!("Got Sync message but failed to get a connection from the pool"); self.buffer.clear(); @@ -825,7 +864,7 @@ where .await?; error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}", - self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err); + self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err); continue; } }; @@ -840,11 +879,10 @@ where self.connected_to_server = true; // Update statistics - self.stats - .client_active(self.process_id, server.server_id()); + self.stats.active(); self.last_address_id = Some(address.id); - self.last_server_id = Some(server.server_id()); + self.last_server_stats = Some(server.stats()); debug!( "Client {:?} talking to server {:?}", @@ -885,6 +923,7 @@ where Ok(Err(err)) => { // Client disconnected inside a transaction. // Clean up the server and re-use it. + self.stats.disconnect(); server.checkin_cleanup().await?; return Err(err); @@ -917,16 +956,26 @@ where 'Q' => { debug!("Sending query to server"); - self.send_and_receive_loop(code, Some(&message), server, &address, &pool) - .await?; + self.send_and_receive_loop( + code, + Some(&message), + server, + &address, + &pool, + &self.stats.clone(), + ) + .await?; if !server.in_transaction() { // Report transaction executed statistics. - self.stats.transaction(self.process_id, server.server_id()); + self.stats.transaction(); + server.stats().transaction(&self.application_name); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { + self.stats.idle(); + break; } } @@ -935,6 +984,7 @@ where // Terminate 'X' => { server.checkin_cleanup().await?; + self.stats.disconnect(); self.release(); return Ok(()); @@ -987,13 +1037,21 @@ where } } - self.send_and_receive_loop(code, None, server, &address, &pool) - .await?; + self.send_and_receive_loop( + code, + None, + server, + &address, + &pool, + &self.stats.clone(), + ) + .await?; self.buffer.clear(); if !server.in_transaction() { - self.stats.transaction(self.process_id, server.server_id()); + self.stats.transaction(); + server.stats().transaction(&self.application_name); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -1028,7 +1086,9 @@ where // Clear the buffer self.buffer.clear(); - let response = self.receive_server_message(server, &address, &pool).await?; + let response = self + .receive_server_message(server, &address, &pool, &self.stats.clone()) + .await?; match write_all_half(&mut self.write, &response).await { Ok(_) => (), @@ -1039,7 +1099,8 @@ where }; if !server.in_transaction() { - self.stats.transaction(self.process_id, server.server_id()); + self.stats.transaction(); + server.stats().transaction(&self.application_name); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -1060,11 +1121,11 @@ where // The server is no longer bound to us, we can't cancel it's queries anymore. debug!("Releasing server back into the pool"); server.checkin_cleanup().await?; - self.stats.server_idle(server.server_id()); + server.stats().idle(); self.connected_to_server = false; self.release(); - self.stats.client_idle(self.process_id); + self.stats.idle(); } } @@ -1104,6 +1165,7 @@ where server: &mut Server, address: &Address, pool: &ConnectionPool, + client_stats: &ClientStats, ) -> Result<(), Error> { debug!("Sending {} to server", code); @@ -1119,7 +1181,9 @@ where // Read all data the server has to offer, which can be multiple messages // buffered in 8196 bytes chunks. loop { - let response = self.receive_server_message(server, address, pool).await?; + let response = self + .receive_server_message(server, address, pool, client_stats) + .await?; match write_all_half(&mut self.write, &response).await { Ok(_) => (), @@ -1135,10 +1199,10 @@ where } // Report query executed statistics. - self.stats.query( - self.process_id, - server.server_id(), - Instant::now().duration_since(query_start).as_millis(), + client_stats.query(); + server.stats().query( + Instant::now().duration_since(query_start).as_millis() as u64, + &self.application_name, ); Ok(()) @@ -1154,7 +1218,7 @@ where match server.send(message).await { Ok(_) => Ok(()), Err(err) => { - pool.ban(address, BanReason::MessageSendFailed, self.process_id); + pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats)); Err(err) } } @@ -1165,6 +1229,7 @@ where server: &mut Server, address: &Address, pool: &ConnectionPool, + client_stats: &ClientStats, ) -> Result { if pool.settings.user.statement_timeout > 0 { match tokio::time::timeout( @@ -1176,7 +1241,7 @@ where Ok(result) => match result { Ok(message) => Ok(message), Err(err) => { - pool.ban(address, BanReason::MessageReceiveFailed, self.process_id); + pool.ban(address, BanReason::MessageReceiveFailed, Some(client_stats)); error_response_terminal( &mut self.write, &format!("error receiving data from server: {:?}", err), @@ -1191,7 +1256,7 @@ where address, pool.settings.user.username ); server.mark_bad(); - pool.ban(address, BanReason::StatementTimeout, self.process_id); + pool.ban(address, BanReason::StatementTimeout, Some(client_stats)); error_response_terminal(&mut self.write, "pool statement timeout").await?; Err(Error::StatementTimeout) } @@ -1200,7 +1265,7 @@ where match server.recv().await { Ok(message) => Ok(message), Err(err) => { - pool.ban(address, BanReason::MessageReceiveFailed, self.process_id); + pool.ban(address, BanReason::MessageReceiveFailed, Some(client_stats)); error_response_terminal( &mut self.write, &format!("error receiving data from server: {:?}", err), @@ -1220,9 +1285,9 @@ impl Drop for Client { // Dirty shutdown // TODO: refactor, this is not the best way to handle state management. - self.stats.client_disconnecting(self.process_id); - if self.connected_to_server && self.last_server_id.is_some() { - self.stats.server_idle(self.last_server_id.unwrap()); + + if self.connected_to_server && self.last_server_stats.is_some() { + self.last_server_stats.as_ref().unwrap().idle(); } } } diff --git a/src/config.rs b/src/config.rs index a3182e2b..644532ac 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,7 @@ use tokio::io::AsyncReadExt; use crate::errors::Error; use crate::pool::{ClientServerMap, ConnectionPool}; use crate::sharding::ShardingFunction; +use crate::stats::AddressStats; use crate::tls::{load_certs, load_keys}; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -62,7 +63,7 @@ impl PartialEq for Option { } /// Address identifying a PostgreSQL server uniquely. -#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] +#[derive(Clone, Debug)] pub struct Address { /// Unique ID per addressable Postgres server. pub id: usize, @@ -96,6 +97,9 @@ pub struct Address { /// List of addresses to receive mirrored traffic. pub mirrors: Vec
, + + /// Address stats + pub stats: Arc, } impl Default for Address { @@ -112,10 +116,46 @@ impl Default for Address { username: String::from("username"), pool_name: String::from("pool_name"), mirrors: Vec::new(), + stats: Arc::new(AddressStats::default()), } } } +// We need to implement PartialEq by ourselves so we skip stats in the comparison +impl PartialEq for Address { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + && self.host == other.host + && self.port == other.port + && self.shard == other.shard + && self.address_index == other.address_index + && self.replica_number == other.replica_number + && self.database == other.database + && self.role == other.role + && self.username == other.username + && self.pool_name == other.pool_name + && self.mirrors == other.mirrors + } +} +impl Eq for Address {} + +// We need to implement Hash by ourselves so we skip stats in the comparison +impl Hash for Address { + fn hash(&self, state: &mut H) { + self.id.hash(state); + self.host.hash(state); + self.port.hash(state); + self.shard.hash(state); + self.address_index.hash(state); + self.replica_number.hash(state); + self.database.hash(state); + self.role.hash(state); + self.username.hash(state); + self.pool_name.hash(state); + self.mirrors.hash(state); + } +} + impl Address { /// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`. pub fn name(&self) -> String { diff --git a/src/main.rs b/src/main.rs index e2ff5d8d..a59da210 100644 --- a/src/main.rs +++ b/src/main.rs @@ -162,8 +162,7 @@ fn main() -> Result<(), Box> { let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new())); // Statistics reporting. - let (stats_tx, stats_rx) = mpsc::channel(500_000); - REPORTER.store(Arc::new(Reporter::new(stats_tx.clone()))); + REPORTER.store(Arc::new(Reporter::default())); // Connection pool that allows to query all shards and replicas. match ConnectionPool::from_config(client_server_map.clone()).await { @@ -175,7 +174,7 @@ fn main() -> Result<(), Box> { }; tokio::task::spawn(async move { - let mut stats_collector = Collector::new(stats_rx, stats_tx.clone()); + let mut stats_collector = Collector::default(); stats_collector.collect().await; }); diff --git a/src/mirrors.rs b/src/mirrors.rs index ab2b2dc4..128fe220 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -1,11 +1,13 @@ +use std::sync::Arc; + /// A mirrored PostgreSQL client. /// Packets arrive to us through a channel from the main client and we send them to the server. use bb8::Pool; use bytes::{Bytes, BytesMut}; use crate::config::{get_config, Address, Role, User}; -use crate::pool::{ClientServerMap, ServerPool}; -use crate::stats::get_reporter; +use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool}; +use crate::stats::PoolStats; use log::{error, info, trace, warn}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -21,20 +23,24 @@ impl MirroredClient { async fn create_pool(&self) -> Pool { let config = get_config(); let default = std::time::Duration::from_millis(10_000).as_millis() as u64; - let (connection_timeout, idle_timeout) = match config.pools.get(&self.address.pool_name) { - Some(cfg) => ( - cfg.connect_timeout.unwrap_or(default), - cfg.idle_timeout.unwrap_or(default), - ), - None => (default, default), - }; + let (connection_timeout, idle_timeout, cfg) = + match config.pools.get(&self.address.pool_name) { + Some(cfg) => ( + cfg.connect_timeout.unwrap_or(default), + cfg.idle_timeout.unwrap_or(default), + cfg.clone(), + ), + None => (default, default, crate::config::Pool::default()), + }; + + let identifier = PoolIdentifier::new(&self.database, &self.user.username); let manager = ServerPool::new( self.address.clone(), self.user.clone(), self.database.as_str(), ClientServerMap::default(), - get_reporter(), + Arc::new(PoolStats::new(identifier, cfg.clone())), ); Pool::builder() diff --git a/src/pool.rs b/src/pool.rs index 3a6ec3e6..f6f9118b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -22,7 +22,7 @@ use crate::errors::Error; use crate::server::Server; use crate::sharding::ShardingFunction; -use crate::stats::{get_reporter, Reporter}; +use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats}; pub type ProcessId = i32; pub type SecretKey = i32; @@ -51,7 +51,7 @@ pub enum BanReason { /// An identifier for a PgCat pool, /// a database visible to clients. -#[derive(Hash, Debug, Clone, PartialEq, Eq)] +#[derive(Hash, Debug, Clone, PartialEq, Eq, Default)] pub struct PoolIdentifier { // The name of the database clients want to connect to. pub db: String, @@ -161,10 +161,6 @@ pub struct ConnectionPool { /// that should not be queried. banlist: BanList, - /// The statistics aggregator runs in a separate task - /// and receives stats from clients, servers, and the pool. - stats: Reporter, - /// The server information (K messages) have to be passed to the /// clients on startup. We pre-connect to all shards and replicas /// on pool creation and save the K messages here. @@ -185,6 +181,8 @@ pub struct ConnectionPool { /// If the pool has been paused or not. paused: Arc, paused_waiter: Arc, + + pub stats: Arc, } impl ConnectionPool { @@ -201,6 +199,7 @@ impl ConnectionPool { // There is one pool per database/user pair. for user in pool_config.users.values() { let old_pool_ref = get_pool(pool_name, &user.username); + let identifier = PoolIdentifier::new(pool_name, &user.username); match old_pool_ref { Some(pool) => { @@ -211,10 +210,7 @@ impl ConnectionPool { "[pool: {}][user: {}] has not changed", pool_name, user.username ); - new_pools.insert( - PoolIdentifier::new(pool_name, &user.username), - pool.clone(), - ); + new_pools.insert(identifier.clone(), pool.clone()); continue; } } @@ -234,6 +230,10 @@ impl ConnectionPool { .clone() .into_keys() .collect::>(); + let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone())); + + // Allow the pool to be seen in statistics + pool_stats.register(pool_stats.clone()); // Sort by shard number to ensure consistency. shard_ids.sort_by_key(|k| k.parse::().unwrap()); @@ -266,6 +266,7 @@ impl ConnectionPool { username: user.username.clone(), pool_name: pool_name.clone(), mirrors: vec![], + stats: Arc::new(AddressStats::default()), }); address_id += 1; } @@ -283,6 +284,7 @@ impl ConnectionPool { username: user.username.clone(), pool_name: pool_name.clone(), mirrors: mirror_addresses, + stats: Arc::new(AddressStats::default()), }; address_id += 1; @@ -296,7 +298,7 @@ impl ConnectionPool { user.clone(), &shard.database, client_server_map.clone(), - get_reporter(), + pool_stats.clone(), ); let connect_timeout = match pool_config.connect_timeout { @@ -331,9 +333,9 @@ impl ConnectionPool { let pool = ConnectionPool { databases: shards, + stats: pool_stats, addresses, banlist: Arc::new(RwLock::new(banlist)), - stats: get_reporter(), config_hash: new_pool_hash_value, server_info: Arc::new(RwLock::new(BytesMut::new())), settings: PoolSettings { @@ -476,9 +478,9 @@ impl ConnectionPool { /// Get a connection from the pool. pub async fn get( &self, - shard: usize, // shard number - role: Option, // primary or replica - client_process_id: i32, // client id + shard: usize, // shard number + role: Option, // primary or replica + client_stats: &ClientStats, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let mut candidates: Vec<&Address> = self.addresses[shard] .iter() @@ -517,7 +519,7 @@ impl ConnectionPool { // Indicate we're waiting on a server connection from a pool. let now = Instant::now(); - self.stats.client_waiting(client_process_id); + client_stats.waiting(); // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] @@ -527,9 +529,10 @@ impl ConnectionPool { Ok(conn) => conn, Err(err) => { error!("Banning instance {:?}, error: {:?}", address, err); - self.ban(address, BanReason::FailedCheckout, client_process_id); - self.stats - .client_checkout_error(client_process_id, address.id); + self.ban(address, BanReason::FailedCheckout, Some(client_stats)); + address.stats.error(); + client_stats.idle(); + client_stats.checkout_error(); continue; } }; @@ -546,18 +549,18 @@ impl ConnectionPool { // since we last checked the server is ok. // Health checks are pretty expensive. if !require_healthcheck { - self.stats.checkout_time( - now.elapsed().as_micros(), - client_process_id, - server.server_id(), - ); - self.stats - .server_active(client_process_id, server.server_id()); + let checkout_time: u64 = now.elapsed().as_micros() as u64; + client_stats.checkout_time(checkout_time); + server + .stats() + .checkout_time(checkout_time, client_stats.application_name()); + server.stats().active(client_stats.application_name()); + return Ok((conn, address.clone())); } if self - .run_health_check(address, server, now, client_process_id) + .run_health_check(address, server, now, client_stats) .await { return Ok((conn, address.clone())); @@ -565,7 +568,6 @@ impl ConnectionPool { continue; } } - Err(Error::AllServersDown) } @@ -574,11 +576,11 @@ impl ConnectionPool { address: &Address, server: &mut Server, start: Instant, - client_process_id: i32, + client_info: &ClientStats, ) -> bool { debug!("Running health check on server {:?}", address); - self.stats.server_tested(server.server_id()); + server.stats().tested(); match tokio::time::timeout( tokio::time::Duration::from_millis(self.settings.healthcheck_timeout), @@ -589,13 +591,13 @@ impl ConnectionPool { // Check if health check succeeded. Ok(res) => match res { Ok(_) => { - self.stats.checkout_time( - start.elapsed().as_micros(), - client_process_id, - server.server_id(), - ); - self.stats - .server_active(client_process_id, server.server_id()); + let checkout_time: u64 = start.elapsed().as_micros() as u64; + client_info.checkout_time(checkout_time); + server + .stats() + .checkout_time(checkout_time, client_info.application_name()); + server.stats().active(client_info.application_name()); + return true; } @@ -620,14 +622,14 @@ impl ConnectionPool { // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(&address, BanReason::FailedHealthCheck, client_process_id); + self.ban(&address, BanReason::FailedHealthCheck, Some(client_info)); return false; } /// Ban an address (i.e. replica). It no longer will serve /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. - pub fn ban(&self, address: &Address, reason: BanReason, client_id: i32) { + pub fn ban(&self, address: &Address, reason: BanReason, client_info: Option<&ClientStats>) { // Primary can never be banned if address.role == Role::Primary { return; @@ -636,7 +638,10 @@ impl ConnectionPool { let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.write(); error!("Banning {:?}", address); - self.stats.client_ban_error(client_id, address.id); + if let Some(client_info) = client_info { + client_info.ban_error(); + address.stats.error(); + } guard[address.shard].insert(address.clone(), (reason, now)); } @@ -797,7 +802,7 @@ pub struct ServerPool { user: User, database: String, client_server_map: ClientServerMap, - stats: Reporter, + stats: Arc, } impl ServerPool { @@ -806,11 +811,11 @@ impl ServerPool { user: User, database: &str, client_server_map: ClientServerMap, - stats: Reporter, + stats: Arc, ) -> ServerPool { ServerPool { address, - user, + user: user.clone(), database: database.to_string(), client_server_map, stats, @@ -826,34 +831,31 @@ impl ManageConnection for ServerPool { /// Attempts to create a new connection. async fn connect(&self) -> Result { info!("Creating a new server connection {:?}", self.address); - let server_id = rand::random::(); - self.stats.server_register( - server_id, - self.address.id, - self.address.name(), - self.address.pool_name.clone(), - self.address.username.clone(), - ); - self.stats.server_login(server_id); + let stats = Arc::new(ServerStats::new( + self.address.clone(), + self.stats.clone(), + tokio::time::Instant::now(), + )); + + stats.register(stats.clone()); // Connect to the PostgreSQL server. match Server::startup( - server_id, &self.address, &self.user, &self.database, self.client_server_map.clone(), - self.stats.clone(), + stats.clone(), ) .await { Ok(conn) => { - self.stats.server_idle(server_id); + stats.idle(); Ok(conn) } Err(err) => { - self.stats.server_disconnecting(server_id); + stats.disconnect(); Err(err) } } @@ -881,11 +883,3 @@ pub fn get_pool(db: &str, user: &str) -> Option { pub fn get_all_pools() -> HashMap { (*(*POOLS.load())).clone() } - -/// How many total servers we have in the config. -pub fn get_number_of_addresses() -> usize { - get_all_pools() - .iter() - .map(|(_, pool)| pool.databases()) - .sum() -} diff --git a/src/prometheus.rs b/src/prometheus.rs index e596f9f9..6e578bf0 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -5,10 +5,12 @@ use phf::phf_map; use std::collections::HashMap; use std::fmt; use std::net::SocketAddr; +use std::sync::atomic::Ordering; +use std::sync::Arc; use crate::config::Address; use crate::pool::get_all_pools; -use crate::stats::{get_address_stats, get_pool_stats, get_server_stats, ServerInformation}; +use crate::stats::{get_pool_stats, get_server_stats, ServerStats}; struct MetricHelpType { help: &'static str, @@ -220,7 +222,7 @@ impl PrometheusMetric { Self::from_name(&format!("servers_{}", name), value, labels) } - fn from_address(address: &Address, name: &str, value: i64) -> Option> { + fn from_address(address: &Address, name: &str, value: u64) -> Option> { let mut labels = HashMap::new(); labels.insert("host", address.host.clone()); labels.insert("shard", address.shard.to_string()); @@ -231,7 +233,7 @@ impl PrometheusMetric { Self::from_name(&format!("stats_{}", name), value, labels) } - fn from_pool(pool: &(String, String), name: &str, value: i64) -> Option> { + fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option> { let mut labels = HashMap::new(); labels.insert("pool", pool.0.clone()); labels.insert("user", pool.1.clone()); @@ -261,20 +263,18 @@ async fn prometheus_stats(request: Request) -> Result, hype // Adds metrics shown in a SHOW STATS admin command. fn push_address_stats(lines: &mut Vec) { - let address_stats: HashMap> = get_address_stats(); for (_, pool) in get_all_pools() { for shard in 0..pool.shards() { for server in 0..pool.servers(shard) { let address = pool.address(shard, server); - if let Some(address_stats) = address_stats.get(&address.id) { - for (key, value) in address_stats.iter() { - if let Some(prometheus_metric) = - PrometheusMetric::::from_address(address, key, *value) - { - lines.push(prometheus_metric.to_string()); - } else { - warn!("Metric {} not implemented for {}", key, address.name()); - } + let stats = &*address.stats; + for (key, value) in stats.clone() { + if let Some(prometheus_metric) = + PrometheusMetric::::from_address(address, &key, value) + { + lines.push(prometheus_metric.to_string()); + } else { + warn!("Metric {} not implemented for {}", key, address.name()); } } } @@ -286,8 +286,9 @@ fn push_address_stats(lines: &mut Vec) { fn push_pool_stats(lines: &mut Vec) { let pool_stats = get_pool_stats(); for (pool, stats) in pool_stats.iter() { - for (name, value) in stats.iter() { - if let Some(prometheus_metric) = PrometheusMetric::::from_pool(pool, name, *value) + let stats = &**stats; + for (name, value) in stats.clone() { + if let Some(prometheus_metric) = PrometheusMetric::::from_pool(pool, &name, value) { lines.push(prometheus_metric.to_string()); } else { @@ -330,9 +331,9 @@ fn push_database_stats(lines: &mut Vec) { // Adds relevant metrics shown in a SHOW SERVERS admin command. fn push_server_stats(lines: &mut Vec) { let server_stats = get_server_stats(); - let mut server_stats_by_addresses = HashMap::::new(); - for (_, info) in server_stats { - server_stats_by_addresses.insert(info.address_name.clone(), info); + let mut server_stats_by_addresses = HashMap::>::new(); + for (_, stats) in server_stats { + server_stats_by_addresses.insert(stats.address_name(), stats); } for (_, pool) in get_all_pools() { @@ -341,11 +342,23 @@ fn push_server_stats(lines: &mut Vec) { let address = pool.address(shard, server); if let Some(server_info) = server_stats_by_addresses.get(&address.name()) { let metrics = [ - ("bytes_received", server_info.bytes_received), - ("bytes_sent", server_info.bytes_sent), - ("transaction_count", server_info.transaction_count), - ("query_count", server_info.query_count), - ("error_count", server_info.error_count), + ( + "bytes_received", + server_info.bytes_received.load(Ordering::Relaxed), + ), + ("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)), + ( + "transaction_count", + server_info.transaction_count.load(Ordering::Relaxed), + ), + ( + "query_count", + server_info.query_count.load(Ordering::Relaxed), + ), + ( + "error_count", + server_info.error_count.load(Ordering::Relaxed), + ), ]; for (key, value) in metrics { if let Some(prometheus_metric) = diff --git a/src/server.rs b/src/server.rs index b3dbd6f7..d09313ec 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,6 +3,7 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{debug, error, info, trace, warn}; use std::io::Read; +use std::sync::Arc; use std::time::SystemTime; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ @@ -17,12 +18,10 @@ use crate::messages::*; use crate::mirrors::MirroringManager; use crate::pool::ClientServerMap; use crate::scram::ScramSha256; -use crate::stats::Reporter; +use crate::stats::ServerStats; /// Server state. pub struct Server { - server_id: i32, - /// Server host, e.g. localhost, /// port, e.g. 5432, and role, e.g. primary or replica. address: Address, @@ -62,7 +61,7 @@ pub struct Server { connected_at: chrono::naive::NaiveDateTime, /// Reports various metrics, e.g. data sent & received. - stats: Reporter, + stats: Arc, /// Application name using the server at the moment. application_name: String, @@ -77,12 +76,11 @@ impl Server { /// Pretend to be the Postgres client and connect to the server given host, port and credentials. /// Perform the authentication and return the server in a ready for query state. pub async fn startup( - server_id: i32, address: &Address, user: &User, database: &str, client_server_map: ClientServerMap, - stats: Reporter, + stats: Arc, ) -> Result { let mut stream = match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await { @@ -325,7 +323,6 @@ impl Server { write, buffer: BytesMut::with_capacity(8196), server_info, - server_id, process_id, secret_key, in_transaction: false, @@ -396,7 +393,7 @@ impl Server { /// Send messages to the server from the client. pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> { self.mirror_send(messages); - self.stats.data_sent(messages.len(), self.server_id); + self.stats().data_sent(messages.len()); match write_all_half(&mut self.write, messages).await { Ok(_) => { @@ -545,7 +542,7 @@ impl Server { let bytes = self.buffer.clone(); // Keep track of how much data we got from the server for stats. - self.stats.data_received(bytes.len(), self.server_id); + self.stats().data_received(bytes.len()); // Clear the buffer for next query. self.buffer.clear(); @@ -665,18 +662,17 @@ impl Server { } } + /// get Server stats + pub fn stats(&self) -> Arc { + self.stats.clone() + } + /// Get the servers address. #[allow(dead_code)] pub fn address(&self) -> Address { self.address.clone() } - /// Get the server connection identifier - /// Used to uniquely identify connection in statistics - pub fn server_id(&self) -> i32 { - self.server_id - } - // Get server's latest response timestamp pub fn last_activity(&self) -> SystemTime { self.last_activity @@ -708,7 +704,9 @@ impl Drop for Server { /// for a write. fn drop(&mut self) { self.mirror_disconnect(); - self.stats.server_disconnecting(self.server_id); + + // Update statistics + self.stats.disconnect(); let mut bytes = BytesMut::with_capacity(4); bytes.put_u8(b'X'); diff --git a/src/stats.rs b/src/stats.rs index 3f7e9d61..5b7895b4 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,43 +1,46 @@ -use arc_swap::ArcSwap; +use crate::pool::PoolIdentifier; /// Statistics and reporting. -use log::{error, info, trace, warn}; +use arc_swap::ArcSwap; + +use log::{info, warn}; use once_cell::sync::Lazy; +use parking_lot::RwLock; use std::collections::HashMap; + use std::sync::Arc; -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::time::Instant; -use crate::pool::{get_all_pools, get_number_of_addresses}; +// Structs that hold stats for different resources +pub mod address; +pub mod client; +pub mod pool; +pub mod server; +pub use address::AddressStats; +pub use client::{ClientState, ClientStats}; +pub use pool::PoolStats; +pub use server::{ServerState, ServerStats}; /// Convenience types for various stats -type ClientStatesLookup = HashMap; -type ServerStatesLookup = HashMap; -type PoolStatsLookup = HashMap<(String, String), HashMap>; -type AddressStatsLookup = HashMap>; +type ClientStatesLookup = HashMap>; +type ServerStatesLookup = HashMap>; +type PoolStatsLookup = HashMap<(String, String), Arc>; -/// Stats for individual client connections updated every second +/// Stats for individual client connections /// Used in SHOW CLIENTS. -static LATEST_CLIENT_STATS: Lazy> = - Lazy::new(|| ArcSwap::from_pointee(ClientStatesLookup::default())); +static CLIENT_STATS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(ClientStatesLookup::default()))); -/// Stats for individual server connections updated every second +/// Stats for individual server connections /// Used in SHOW SERVERS. -static LATEST_SERVER_STATS: Lazy> = - Lazy::new(|| ArcSwap::from_pointee(ServerStatesLookup::default())); +static SERVER_STATS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default()))); -/// Aggregate stats for each pool (a pool is identified by database name and username) updated every second +/// Aggregate stats for each pool (a pool is identified by database name and username) /// Used in SHOW POOLS. -static LATEST_POOL_STATS: Lazy> = - Lazy::new(|| ArcSwap::from_pointee(PoolStatsLookup::default())); - -/// Aggregate stats for individual database instances, updated every second, averages are calculated every 15 -/// Used in SHOW STATS. -static LATEST_ADDRESS_STATS: Lazy> = - Lazy::new(|| ArcSwap::from_pointee(AddressStatsLookup::default())); +static POOL_STATS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default()))); /// The statistics reporter. An instance is given to each possible source of statistics, -/// e.g. clients, servers, connection pool. +/// e.g. client stats, server stats, connection pool stats. pub static REPORTER: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); @@ -45,989 +48,89 @@ pub static REPORTER: Lazy> = /// 15 seconds. static STAT_PERIOD: u64 = 15000; -/// The various states that a client can be in -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum ClientState { - Idle, - Waiting, - Active, -} -impl std::fmt::Display for ClientState { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - ClientState::Idle => write!(f, "idle"), - ClientState::Waiting => write!(f, "waiting"), - ClientState::Active => write!(f, "active"), - } - } -} - -/// The various states that a server can be in -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum ServerState { - Login, - Active, - Tested, - Idle, -} -impl std::fmt::Display for ServerState { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - ServerState::Login => write!(f, "login"), - ServerState::Active => write!(f, "active"), - ServerState::Tested => write!(f, "tested"), - ServerState::Idle => write!(f, "idle"), - } - } -} - -/// Information we keep track off which can be queried by SHOW CLIENTS -#[derive(Debug, Clone)] -pub struct ClientInformation { - pub state: ClientState, - pub connect_time: Instant, - - /// A random integer assigned to the client and used by stats to track the client - pub client_id: i32, - - pub application_name: String, - pub username: String, - pub pool_name: String, - - /// Total time spent waiting for a connection from pool, measures in microseconds - pub total_wait_time: u64, - - pub transaction_count: u64, - pub query_count: u64, - pub error_count: u64, -} - -/// Information we keep track off which can be queried by SHOW SERVERS -#[derive(Debug, Clone)] -pub struct ServerInformation { - pub state: ServerState, - pub connect_time: Instant, - - /// A random integer assigned to the server and used by stats to track the server - pub server_id: i32, - - pub address_name: String, - pub address_id: usize, - - pub username: String, - pub pool_name: String, - pub application_name: String, - - pub bytes_sent: u64, - pub bytes_received: u64, - - pub transaction_count: u64, - pub query_count: u64, - pub error_count: u64, -} - -/// The names for the events reported -/// to the statistics collector. -#[derive(Debug, Clone)] -enum EventName { - CheckoutTime { - client_id: i32, - server_id: i32, - }, - Query { - client_id: i32, - server_id: i32, - duration_ms: u128, - }, - Transaction { - client_id: i32, - server_id: i32, - }, - - DataSentToServer { - server_id: i32, - }, - DataReceivedFromServer { - server_id: i32, - }, - - ClientRegistered { - client_id: i32, - pool_name: String, - username: String, - application_name: String, - }, - ClientIdle { - client_id: i32, - }, - ClientWaiting { - client_id: i32, - }, - ClientActive { - client_id: i32, - #[allow(dead_code)] - server_id: i32, - }, - ClientDisconnecting { - client_id: i32, - }, - ClientCheckoutError { - client_id: i32, - #[allow(dead_code)] - address_id: usize, - }, - ClientBanError { - client_id: i32, - #[allow(dead_code)] - address_id: usize, - }, - - ServerRegistered { - server_id: i32, - address_id: usize, - address_name: String, - pool_name: String, - username: String, - }, - ServerLogin { - server_id: i32, - }, - ServerIdle { - server_id: i32, - }, - ServerTested { - server_id: i32, - }, - ServerActive { - #[allow(dead_code)] - client_id: i32, - server_id: i32, - }, - ServerDisconnecting { - server_id: i32, - }, - - UpdateStats { - pool_name: String, - username: String, - }, - UpdateAverages { - address_id: usize, - }, -} - -/// Event data sent to the collector -/// from clients and servers. -#[derive(Debug, Clone)] -pub struct Event { - /// The name of the event being reported. - name: EventName, - - /// The value being reported. Meaning differs based on event name. - value: i64, -} - /// The statistics reporter. An instance is given /// to each possible source of statistics, /// e.g. clients, servers, connection pool. -#[derive(Clone, Debug)] -pub struct Reporter { - tx: Sender, -} - -impl Default for Reporter { - fn default() -> Reporter { - let (tx, _rx) = channel(5); - Reporter { tx } - } -} +#[derive(Clone, Debug, Default)] +pub struct Reporter {} impl Reporter { - /// Create a new Reporter instance. - pub fn new(tx: Sender) -> Reporter { - Reporter { tx } - } - - /// Send statistics to the task keeping track of stats. - fn send(&self, event: Event) { - let name = event.name.clone(); - let result = self.tx.try_send(event.clone()); - - match result { - Ok(_) => trace!( - "{:?} event reported successfully, capacity: {} {:?}", - name, - self.tx.capacity(), - event - ), - - Err(err) => match err { - TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), - TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), - }, - }; - } - - /// Report a query executed by a client against a server - pub fn query(&self, client_id: i32, server_id: i32, duration_ms: u128) { - let event = Event { - name: EventName::Query { - client_id, - server_id, - duration_ms, - }, - value: 1, - }; - self.send(event); - } - - /// Report a transaction executed by a client a server - /// we report each individual queries outside a transaction as a transaction - /// We only count the initial BEGIN as a transaction, all queries within do not - /// count as transactions - pub fn transaction(&self, client_id: i32, server_id: i32) { - let event = Event { - name: EventName::Transaction { - client_id, - server_id, - }, - value: 1, - }; - self.send(event); - } - - /// Report data sent to a server - pub fn data_sent(&self, amount_bytes: usize, server_id: i32) { - let event = Event { - name: EventName::DataSentToServer { server_id }, - value: amount_bytes as i64, - }; - self.send(event) - } - - /// Report data received from a server - pub fn data_received(&self, amount_bytes: usize, server_id: i32) { - let event = Event { - name: EventName::DataReceivedFromServer { server_id }, - value: amount_bytes as i64, - }; - self.send(event) - } - - /// Reportes the time spent by a client waiting to get a healthy connection from the pool - pub fn checkout_time(&self, microseconds: u128, client_id: i32, server_id: i32) { - let event = Event { - name: EventName::CheckoutTime { - client_id, - server_id, - }, - value: microseconds as i64, - }; - self.send(event) - } - /// Register a client with the stats system. The stats system uses client_id /// to track and aggregate statistics from all source that relate to that client - pub fn client_register( - &self, - client_id: i32, - pool_name: String, - username: String, - app_name: String, - ) { - let event = Event { - name: EventName::ClientRegistered { - client_id, - pool_name, - username, - application_name: app_name, - }, - value: 1, - }; - self.send(event); - } - - /// Reports a client is waiting for a connection - pub fn client_waiting(&self, client_id: i32) { - let event = Event { - name: EventName::ClientWaiting { client_id }, - value: 1, - }; - self.send(event) - } - - /// Reports a client has had the server assigned to it be banned - pub fn client_ban_error(&self, client_id: i32, address_id: usize) { - let event = Event { - name: EventName::ClientBanError { - client_id, - address_id, - }, - value: 1, - }; - self.send(event) - } - - /// Reports a client has failed to obtain a connection from a connection pool - pub fn client_checkout_error(&self, client_id: i32, address_id: usize) { - let event = Event { - name: EventName::ClientCheckoutError { - client_id, - address_id, - }, - value: 1, - }; - self.send(event) - } - - /// Reports a client is done waiting for a connection and is about to query the server. - pub fn client_active(&self, client_id: i32, server_id: i32) { - let event = Event { - name: EventName::ClientActive { - client_id, - server_id, - }, - value: 1, - }; - self.send(event) - } + fn client_register(&self, client_id: i32, stats: Arc) { + if CLIENT_STATS.read().get(&client_id).is_some() { + warn!("Client {:?} was double registered!", client_id); + return; + } - /// Reports a client is done querying the server and is no longer assigned a server connection - pub fn client_idle(&self, client_id: i32) { - let event = Event { - name: EventName::ClientIdle { client_id }, - value: 1, - }; - self.send(event) + CLIENT_STATS.write().insert(client_id, stats); } /// Reports a client is disconecting from the pooler. - pub fn client_disconnecting(&self, client_id: i32) { - let event = Event { - name: EventName::ClientDisconnecting { client_id }, - value: 1, - }; - self.send(event) + fn client_disconnecting(&self, client_id: i32) { + CLIENT_STATS.write().remove(&client_id); } /// Register a server connection with the stats system. The stats system uses server_id /// to track and aggregate statistics from all source that relate to that server - pub fn server_register( - &self, - server_id: i32, - address_id: usize, - address_name: String, - pool_name: String, - username: String, - ) { - let event = Event { - name: EventName::ServerRegistered { - server_id, - address_id, - address_name, - pool_name, - username, - }, - value: 1, - }; - self.send(event); - } - - /// Reports a server connection has been assigned to a client that - /// is about to query the server - pub fn server_active(&self, client_id: i32, server_id: i32) { - let event = Event { - name: EventName::ServerActive { - client_id, - server_id, - }, - value: 1, - }; - self.send(event) - } - - /// Reports a server connection is no longer assigned to a client - /// and is available for the next client to pick it up - pub fn server_idle(&self, server_id: i32) { - let event = Event { - name: EventName::ServerIdle { server_id }, - value: 1, - }; - self.send(event) - } - - /// Reports a server connection is attempting to login. - pub fn server_login(&self, server_id: i32) { - let event = Event { - name: EventName::ServerLogin { server_id }, - value: 1, - }; - self.send(event) + fn server_register(&self, server_id: i32, stats: Arc) { + SERVER_STATS.write().insert(server_id, stats); } - - /// Reports a server connection is being tested before being given to a client. - pub fn server_tested(&self, server_id: i32) { - let event = Event { - name: EventName::ServerTested { server_id }, - value: 1, - }; - - self.send(event) + /// Reports a server connection is disconecting from the pooler. + fn server_disconnecting(&self, server_id: i32) { + SERVER_STATS.write().remove(&server_id); } - /// Reports a server connection is disconecting from the pooler. - pub fn server_disconnecting(&self, server_id: i32) { - let event = Event { - name: EventName::ServerDisconnecting { server_id }, - value: 1, - }; - self.send(event) + /// Register a pool with the stats system. + fn pool_register(&self, identifier: PoolIdentifier, stats: Arc) { + POOL_STATS + .write() + .insert((identifier.db, identifier.user), stats); } } -/// The statistics collector which is receiving statistics -/// from clients, servers, and the connection pool. There is -/// only one collector (kind of like a singleton). -/// The collector can trigger events on its own, e.g. -/// it updates aggregates every second and averages every -/// 15 seconds. -pub struct Collector { - rx: Receiver, - tx: Sender, -} +/// The statistics collector which used for calculating averages +/// There is only one collector (kind of like a singleton) +/// it updates averages every 15 seconds. +#[derive(Default)] +pub struct Collector {} impl Collector { - /// Create a new collector instance. There should only be one instance - /// at a time. This is ensured by mpsc which allows only one receiver. - pub fn new(rx: Receiver, tx: Sender) -> Collector { - Collector { rx, tx } - } - /// The statistics collection handler. It will collect statistics /// for `address_id`s starting at 0 up to `addresses`. pub async fn collect(&mut self) { info!("Events reporter started"); - let mut client_states = ClientStatesLookup::default(); - let mut server_states = ServerStatesLookup::default(); - let mut pool_stat_lookup = PoolStatsLookup::default(); - - let mut address_stat_lookup = AddressStatsLookup::default(); - let mut address_old_stat_lookup = AddressStatsLookup::default(); - - let tx = self.tx.clone(); - tokio::task::spawn(async move { - let mut interval = - tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15)); - loop { - interval.tick().await; - for (user_pool, _) in get_all_pools() { - let _ = tx.try_send(Event { - name: EventName::UpdateStats { - pool_name: user_pool.db, - username: user_pool.user, - }, - value: 0, - }); - } - } - }); - - let tx = self.tx.clone(); tokio::task::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); loop { interval.tick().await; - for address_id in 0..get_number_of_addresses() { - let _ = tx.try_send(Event { - name: EventName::UpdateAverages { address_id }, - value: 0, - }); + + for stats in SERVER_STATS.read().values() { + stats.address_stats().update_averages(); } } }); - - // The collector loop - loop { - let stat = match self.rx.recv().await { - Some(stat) => stat, - None => { - info!("Events collector is shutting down"); - return; - } - }; - - // Some are counters, some are gauges... - match stat.name { - EventName::Query { - client_id, - server_id, - duration_ms, - } => { - // Update client stats - let app_name = match client_states.get_mut(&client_id) { - Some(client_info) => { - client_info.query_count += stat.value as u64; - client_info.application_name.to_string() - } - None => String::from("Undefined"), - }; - - // Update server stats and pool aggergation stats - match server_states.get_mut(&server_id) { - Some(server_info) => { - server_info.query_count += stat.value as u64; - server_info.application_name = app_name; - - let address_stats = address_stat_lookup - .entry(server_info.address_id) - .or_insert_with(HashMap::default); - let counter = address_stats - .entry("total_query_count".to_string()) - .or_insert(0); - *counter += stat.value; - - let duration = address_stats - .entry("total_query_time".to_string()) - .or_insert(0); - *duration += duration_ms as i64; - } - None => (), - } - } - - EventName::Transaction { - client_id, - server_id, - } => { - // Update client stats - let app_name = match client_states.get_mut(&client_id) { - Some(client_info) => { - client_info.transaction_count += stat.value as u64; - client_info.application_name.to_string() - } - None => String::from("Undefined"), - }; - - // Update server stats and pool aggergation stats - match server_states.get_mut(&server_id) { - Some(server_info) => { - server_info.transaction_count += stat.value as u64; - server_info.application_name = app_name; - - let address_stats = address_stat_lookup - .entry(server_info.address_id) - .or_insert_with(HashMap::default); - let counter = address_stats - .entry("total_xact_count".to_string()) - .or_insert(0); - *counter += stat.value; - } - None => (), - } - } - - EventName::DataSentToServer { server_id } => { - // Update server stats and address aggergation stats - match server_states.get_mut(&server_id) { - Some(server_info) => { - server_info.bytes_sent += stat.value as u64; - - let address_stats = address_stat_lookup - .entry(server_info.address_id) - .or_insert_with(HashMap::default); - let counter = - address_stats.entry("total_sent".to_string()).or_insert(0); - *counter += stat.value; - } - None => (), - } - } - - EventName::DataReceivedFromServer { server_id } => { - // Update server states and address aggergation stats - match server_states.get_mut(&server_id) { - Some(server_info) => { - server_info.bytes_received += stat.value as u64; - - let address_stats = address_stat_lookup - .entry(server_info.address_id) - .or_insert_with(HashMap::default); - let counter = address_stats - .entry("total_received".to_string()) - .or_insert(0); - *counter += stat.value; - } - None => (), - } - } - - EventName::CheckoutTime { - client_id, - server_id, - } => { - // Update client stats - let app_name = match client_states.get_mut(&client_id) { - Some(client_info) => { - client_info.total_wait_time += stat.value as u64; - client_info.application_name.to_string() - } - None => String::from("Undefined"), - }; - - // Update server stats and address aggergation stats - match server_states.get_mut(&server_id) { - Some(server_info) => { - server_info.application_name = app_name; - - let address_stats = address_stat_lookup - .entry(server_info.address_id) - .or_insert_with(HashMap::default); - let counter = address_stats - .entry("total_wait_time".to_string()) - .or_insert(0); - *counter += stat.value; - - let pool_stats = pool_stat_lookup - .entry(( - server_info.pool_name.clone(), - server_info.username.clone(), - )) - .or_insert_with(HashMap::default); - - // We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin - let old_microseconds = - pool_stats.entry("maxwait_us".to_string()).or_insert(0); - if stat.value > *old_microseconds { - *old_microseconds = stat.value; - } - } - None => (), - } - } - - EventName::ClientRegistered { - client_id, - pool_name, - username, - application_name, - } => { - match client_states.get_mut(&client_id) { - Some(_) => warn!("Client {:?} was double registered!", client_id), - None => { - client_states.insert( - client_id, - ClientInformation { - state: ClientState::Idle, - connect_time: Instant::now(), - client_id, - pool_name: pool_name.clone(), - username: username.clone(), - application_name: application_name.clone(), - total_wait_time: 0, - transaction_count: 0, - query_count: 0, - error_count: 0, - }, - ); - } - }; - } - - EventName::ClientBanError { - client_id, - address_id, - } => { - match client_states.get_mut(&client_id) { - Some(client_info) => { - client_info.state = ClientState::Idle; - client_info.error_count += stat.value as u64; - } - None => warn!("Got event {:?} for unregistered client", stat.name), - } - - // Update address aggregation stats - let address_stats = address_stat_lookup - .entry(address_id) - .or_insert_with(HashMap::default); - let counter = address_stats.entry("total_errors".to_string()).or_insert(0); - *counter += stat.value; - } - - EventName::ClientCheckoutError { - client_id, - address_id, - } => { - match client_states.get_mut(&client_id) { - Some(client_info) => { - client_info.state = ClientState::Idle; - client_info.error_count += stat.value as u64; - } - None => warn!("Got event {:?} for unregistered client", stat.name), - } - - // Update address aggregation stats - let address_stats = address_stat_lookup - .entry(address_id) - .or_insert_with(HashMap::default); - let counter = address_stats.entry("total_errors".to_string()).or_insert(0); - *counter += stat.value; - } - - EventName::ClientIdle { client_id } => { - match client_states.get_mut(&client_id) { - Some(client_state) => client_state.state = ClientState::Idle, - None => warn!("Got event {:?} for unregistered client", stat.name), - }; - } - - EventName::ClientWaiting { client_id } => { - match client_states.get_mut(&client_id) { - Some(client_state) => client_state.state = ClientState::Waiting, - None => warn!("Got event {:?} for unregistered client", stat.name), - }; - } - - EventName::ClientActive { - client_id, - server_id: _, - } => { - match client_states.get_mut(&client_id) { - Some(client_state) => client_state.state = ClientState::Active, - None => warn!("Got event {:?} for unregistered client", stat.name), - }; - } - - EventName::ClientDisconnecting { client_id } => { - client_states.remove(&client_id); - } - - EventName::ServerRegistered { - address_name, - server_id, - address_id, - pool_name, - username, - } => { - server_states.insert( - server_id, - ServerInformation { - address_id, - address_name, - server_id, - username, - pool_name, - - state: ServerState::Idle, - application_name: String::from("Undefined"), - connect_time: Instant::now(), - bytes_sent: 0, - bytes_received: 0, - transaction_count: 0, - query_count: 0, - error_count: 0, - }, - ); - } - - EventName::ServerLogin { server_id } => { - match server_states.get_mut(&server_id) { - Some(server_state) => { - server_state.state = ServerState::Login; - server_state.application_name = String::from("Undefined"); - } - None => warn!("Got event {:?} for unregistered server", stat.name), - }; - } - - EventName::ServerTested { server_id } => { - match server_states.get_mut(&server_id) { - Some(server_state) => { - server_state.state = ServerState::Tested; - server_state.application_name = String::from("Undefined"); - } - None => warn!("Got event {:?} for unregistered server", stat.name), - }; - } - - EventName::ServerIdle { server_id } => { - match server_states.get_mut(&server_id) { - Some(server_state) => { - server_state.state = ServerState::Idle; - server_state.application_name = String::from("Undefined"); - } - None => warn!("Got event {:?} for unregistered server", stat.name), - }; - } - - EventName::ServerActive { - client_id, - server_id, - } => { - // Update client stats - let app_name = match client_states.get_mut(&client_id) { - Some(client_info) => client_info.application_name.to_string(), - None => String::from("Undefined"), - }; - - // Update server stats - match server_states.get_mut(&server_id) { - Some(server_state) => { - server_state.state = ServerState::Active; - server_state.application_name = app_name; - } - None => warn!("Got event {:?} for unregistered server", stat.name), - }; - } - - EventName::ServerDisconnecting { server_id } => { - server_states.remove(&server_id); - } - - EventName::UpdateStats { - pool_name, - username, - } => { - let pool_stats = pool_stat_lookup - .entry((pool_name.clone(), username.clone())) - .or_insert_with(HashMap::default); - - // These are re-calculated every iteration of the loop, so we don't want to add values - // from the last iteration. - for stat in &[ - "cl_active", - "cl_waiting", - "cl_idle", - "sv_idle", - "sv_active", - "sv_tested", - "sv_login", - ] { - pool_stats.insert(stat.to_string(), 0); - } - - for (_, client_info) in client_states.iter() { - if client_info.pool_name != pool_name || client_info.username != username { - continue; - } - match client_info.state { - ClientState::Idle => { - let counter = pool_stats.entry("cl_idle".to_string()).or_insert(0); - *counter += 1; - } - ClientState::Waiting => { - let counter = - pool_stats.entry("cl_waiting".to_string()).or_insert(0); - *counter += 1; - } - ClientState::Active => { - let counter = - pool_stats.entry("cl_active".to_string()).or_insert(0); - *counter += 1; - } - }; - } - - for (_, server_info) in server_states.iter() { - if server_info.pool_name != pool_name || server_info.username != username { - continue; - } - match server_info.state { - ServerState::Login => { - let counter = pool_stats.entry("sv_login".to_string()).or_insert(0); - *counter += 1; - } - ServerState::Tested => { - let counter = - pool_stats.entry("sv_tested".to_string()).or_insert(0); - *counter += 1; - } - ServerState::Active => { - let counter = - pool_stats.entry("sv_active".to_string()).or_insert(0); - *counter += 1; - } - ServerState::Idle => { - let counter = pool_stats.entry("sv_idle".to_string()).or_insert(0); - *counter += 1; - } - }; - } - - // The following calls publish the internal stats making it visible - // to clients using admin database to issue queries like `SHOW STATS` - LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); - LATEST_SERVER_STATS.store(Arc::new(server_states.clone())); - LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone())); - - // Clear maxwait after reporting - pool_stat_lookup - .entry((pool_name.clone(), username.clone())) - .or_insert_with(HashMap::default) - .insert("maxwait_us".to_string(), 0); - } - - EventName::UpdateAverages { address_id } => { - let stats = address_stat_lookup - .entry(address_id) - .or_insert_with(HashMap::default); - let old_stats = address_old_stat_lookup - .entry(address_id) - .or_insert_with(HashMap::default); - - // Calculate averages - for stat in &[ - "avg_query_count", - "avg_query_time", - "avg_recv", - "avg_sent", - "avg_errors", - "avg_xact_time", - "avg_xact_count", - "avg_wait_time", - ] { - let total_name = match stat { - &"avg_recv" => "total_received".to_string(), // Because PgBouncer is saving bytes - _ => stat.replace("avg_", "total_"), - }; - - let old_value = old_stats.entry(total_name.clone()).or_insert(0); - let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); - let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second - - stats.insert(stat.to_string(), avg); - *old_value = new_value; - } - LATEST_ADDRESS_STATS.store(Arc::new(address_stat_lookup.clone())); - } - }; - } } } -/// Get a snapshot of client statistics. Updated once a second +/// Get a snapshot of client statistics. /// by the `Collector`. pub fn get_client_stats() -> ClientStatesLookup { - (*(*LATEST_CLIENT_STATS.load())).clone() + CLIENT_STATS.read().clone() } -/// Get a snapshot of server statistics. Updated once a second +/// Get a snapshot of server statistics. /// by the `Collector`. pub fn get_server_stats() -> ServerStatesLookup { - (*(*LATEST_SERVER_STATS.load())).clone() + SERVER_STATS.read().clone() } -/// Get a snapshot of pool statistics. Updated once a second +/// Get a snapshot of pool statistics. /// by the `Collector`. pub fn get_pool_stats() -> PoolStatsLookup { - (*(*LATEST_POOL_STATS.load())).clone() -} - -/// Get a snapshot of address statistics. Updated once a second -/// by the `Collector`. -pub fn get_address_stats() -> AddressStatsLookup { - (*(*LATEST_ADDRESS_STATS.load())).clone() + POOL_STATS.read().clone() } /// Get the statistics reporter used to update stats across the pools/clients. diff --git a/src/stats/address.rs b/src/stats/address.rs new file mode 100644 index 00000000..a5759e10 --- /dev/null +++ b/src/stats/address.rs @@ -0,0 +1,149 @@ +use log::warn; +use std::sync::atomic::*; +use std::sync::Arc; + +/// Internal address stats +#[derive(Debug, Clone, Default)] +pub struct AddressStats { + pub total_xact_count: Arc, + pub total_query_count: Arc, + pub total_received: Arc, + pub total_sent: Arc, + pub total_xact_time: Arc, + pub total_query_time: Arc, + pub total_wait_time: Arc, + pub total_errors: Arc, + pub avg_query_count: Arc, + pub avg_query_time: Arc, + pub avg_recv: Arc, + pub avg_sent: Arc, + pub avg_errors: Arc, + pub avg_xact_time: Arc, + pub avg_xact_count: Arc, + pub avg_wait_time: Arc, +} + +impl IntoIterator for AddressStats { + type Item = (String, u64); + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + vec![ + ( + "total_xact_count".to_string(), + self.total_xact_count.load(Ordering::Relaxed), + ), + ( + "total_query_count".to_string(), + self.total_query_count.load(Ordering::Relaxed), + ), + ( + "total_received".to_string(), + self.total_received.load(Ordering::Relaxed), + ), + ( + "total_sent".to_string(), + self.total_sent.load(Ordering::Relaxed), + ), + ( + "total_xact_time".to_string(), + self.total_xact_time.load(Ordering::Relaxed), + ), + ( + "total_query_time".to_string(), + self.total_query_time.load(Ordering::Relaxed), + ), + ( + "total_wait_time".to_string(), + self.total_wait_time.load(Ordering::Relaxed), + ), + ( + "total_errors".to_string(), + self.total_errors.load(Ordering::Relaxed), + ), + ( + "avg_xact_count".to_string(), + self.avg_xact_count.load(Ordering::Relaxed), + ), + ( + "avg_query_count".to_string(), + self.avg_query_count.load(Ordering::Relaxed), + ), + ( + "avg_recv".to_string(), + self.avg_recv.load(Ordering::Relaxed), + ), + ( + "avg_sent".to_string(), + self.avg_sent.load(Ordering::Relaxed), + ), + ( + "avg_errors".to_string(), + self.avg_errors.load(Ordering::Relaxed), + ), + ( + "avg_xact_time".to_string(), + self.avg_xact_time.load(Ordering::Relaxed), + ), + ( + "avg_query_time".to_string(), + self.avg_query_time.load(Ordering::Relaxed), + ), + ( + "avg_wait_time".to_string(), + self.avg_wait_time.load(Ordering::Relaxed), + ), + ] + .into_iter() + } +} + +impl AddressStats { + pub fn error(&self) { + self.total_errors.fetch_add(1, Ordering::Relaxed); + } + + pub fn update_averages(&self) { + let (totals, averages) = self.fields_iterators(); + for data in totals.iter().zip(averages.iter()) { + let (total, average) = data; + if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| { + let total = total.load(Ordering::Relaxed); + let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second + Some(avg) + }) { + warn!("Could not update averages for addresses stats, {:?}", err); + } + } + } + + pub fn populate_row(&self, row: &mut Vec) { + for (_key, value) in self.clone() { + row.push(value.to_string()); + } + } + + fn fields_iterators(&self) -> (Vec>, Vec>) { + let mut totals: Vec> = Vec::new(); + let mut averages: Vec> = Vec::new(); + + totals.push(self.total_xact_count.clone()); + averages.push(self.avg_xact_count.clone()); + totals.push(self.total_query_count.clone()); + averages.push(self.avg_query_count.clone()); + totals.push(self.total_received.clone()); + averages.push(self.avg_recv.clone()); + totals.push(self.total_sent.clone()); + averages.push(self.avg_sent.clone()); + totals.push(self.total_xact_time.clone()); + averages.push(self.avg_xact_time.clone()); + totals.push(self.total_query_time.clone()); + averages.push(self.avg_query_time.clone()); + totals.push(self.total_wait_time.clone()); + averages.push(self.avg_wait_time.clone()); + totals.push(self.total_errors.clone()); + averages.push(self.avg_errors.clone()); + + (totals, averages) + } +} diff --git a/src/stats/client.rs b/src/stats/client.rs new file mode 100644 index 00000000..89235068 --- /dev/null +++ b/src/stats/client.rs @@ -0,0 +1,182 @@ +use super::PoolStats; +use super::{get_reporter, Reporter}; +use atomic_enum::atomic_enum; +use std::sync::atomic::*; +use std::sync::Arc; +use tokio::time::Instant; +/// The various states that a client can be in +#[atomic_enum] +#[derive(PartialEq)] +pub enum ClientState { + Idle = 0, + Waiting, + Active, +} +impl std::fmt::Display for ClientState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match *self { + ClientState::Idle => write!(f, "idle"), + ClientState::Waiting => write!(f, "waiting"), + ClientState::Active => write!(f, "active"), + } + } +} + +#[derive(Debug, Clone)] +/// Information we keep track of which can be queried by SHOW CLIENTS +pub struct ClientStats { + /// A random integer assigned to the client and used by stats to track the client + client_id: i32, + + /// Data associated with the client, not writable, only set when we construct the ClientStat + application_name: String, + username: String, + pool_name: String, + connect_time: Instant, + + pool_stats: Arc, + reporter: Reporter, + + /// Total time spent waiting for a connection from pool, measures in microseconds + pub total_wait_time: Arc, + + /// Current state of the client + pub state: Arc, + + /// Number of transactions executed by this client + pub transaction_count: Arc, + + /// Number of queries executed by this client + pub query_count: Arc, + + /// Number of errors made by this client + pub error_count: Arc, +} + +impl Default for ClientStats { + fn default() -> Self { + ClientStats { + client_id: 0, + connect_time: Instant::now(), + application_name: String::new(), + username: String::new(), + pool_name: String::new(), + pool_stats: Arc::new(PoolStats::default()), + total_wait_time: Arc::new(AtomicU64::new(0)), + state: Arc::new(AtomicClientState::new(ClientState::Idle)), + transaction_count: Arc::new(AtomicU64::new(0)), + query_count: Arc::new(AtomicU64::new(0)), + error_count: Arc::new(AtomicU64::new(0)), + reporter: get_reporter(), + } + } +} + +impl ClientStats { + pub fn new( + client_id: i32, + application_name: &str, + username: &str, + pool_name: &str, + connect_time: Instant, + pool_stats: Arc, + ) -> Self { + Self { + client_id, + pool_stats, + connect_time, + application_name: application_name.to_string(), + username: username.to_string(), + pool_name: pool_name.to_string(), + ..Default::default() + } + } + + /// Reports a client is disconecting from the pooler and + /// update metrics on the corresponding pool. + pub fn disconnect(&self) { + self.reporter.client_disconnecting(self.client_id); + self.pool_stats + .client_disconnect(self.state.load(Ordering::Relaxed)) + } + + /// Register a client with the stats system. The stats system uses client_id + /// to track and aggregate statistics from all source that relate to that client + pub fn register(&self, stats: Arc) { + self.reporter.client_register(self.client_id, stats); + self.state.store(ClientState::Idle, Ordering::Relaxed); + self.pool_stats.cl_idle.fetch_add(1, Ordering::Relaxed); + } + + /// Reports a client is done querying the server and is no longer assigned a server connection + pub fn idle(&self) { + self.pool_stats + .client_idle(self.state.load(Ordering::Relaxed)); + self.state.store(ClientState::Idle, Ordering::Relaxed); + } + + /// Reports a client is waiting for a connection + pub fn waiting(&self) { + self.pool_stats + .client_waiting(self.state.load(Ordering::Relaxed)); + self.state.store(ClientState::Waiting, Ordering::Relaxed); + } + + /// Reports a client is done waiting for a connection and is about to query the server. + pub fn active(&self) { + self.pool_stats + .client_active(self.state.load(Ordering::Relaxed)); + self.state.store(ClientState::Active, Ordering::Relaxed); + } + + /// Reports a client has failed to obtain a connection from a connection pool + pub fn checkout_error(&self) { + self.state.store(ClientState::Idle, Ordering::Relaxed); + } + + /// Reports a client has had the server assigned to it be banned + pub fn ban_error(&self) { + self.state.store(ClientState::Idle, Ordering::Relaxed); + self.error_count.fetch_add(1, Ordering::Relaxed); + } + + /// Reportes the time spent by a client waiting to get a healthy connection from the pool + pub fn checkout_time(&self, microseconds: u64) { + self.total_wait_time + .fetch_add(microseconds, Ordering::Relaxed); + } + + /// Report a query executed by a client against a server + pub fn query(&self) { + self.query_count.fetch_add(1, Ordering::Relaxed); + } + + /// Report a transaction executed by a client a server + /// we report each individual queries outside a transaction as a transaction + /// We only count the initial BEGIN as a transaction, all queries within do not + /// count as transactions + pub fn transaction(&self) { + self.transaction_count.fetch_add(1, Ordering::Relaxed); + } + + // Helper methods for show clients + pub fn connect_time(&self) -> Instant { + self.connect_time + } + + pub fn client_id(&self) -> i32 { + self.client_id + } + + pub fn application_name(&self) -> String { + self.application_name.clone() + } + + pub fn username(&self) -> String { + self.username.clone() + } + + pub fn pool_name(&self) -> String { + self.pool_name.clone() + } +} diff --git a/src/stats/pool.rs b/src/stats/pool.rs new file mode 100644 index 00000000..1b01ef2e --- /dev/null +++ b/src/stats/pool.rs @@ -0,0 +1,274 @@ +use crate::config::Pool; +use crate::config::PoolMode; +use crate::pool::PoolIdentifier; +use std::sync::atomic::*; +use std::sync::Arc; + +use super::get_reporter; +use super::Reporter; +use super::{ClientState, ServerState}; + +#[derive(Debug, Clone, Default)] +/// A struct that holds information about a Pool . +pub struct PoolStats { + // Pool identifier, cannot be changed after creating the instance + identifier: PoolIdentifier, + + // Pool Config, cannot be changed after creating the instance + config: Pool, + + // A reference to the global reporter. + reporter: Reporter, + + /// Counters (atomics) + pub cl_idle: Arc, + pub cl_active: Arc, + pub cl_waiting: Arc, + pub cl_cancel_req: Arc, + pub sv_active: Arc, + pub sv_idle: Arc, + pub sv_used: Arc, + pub sv_tested: Arc, + pub sv_login: Arc, + pub maxwait: Arc, +} + +impl IntoIterator for PoolStats { + type Item = (String, u64); + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + vec![ + ("cl_idle".to_string(), self.cl_idle.load(Ordering::Relaxed)), + ( + "cl_active".to_string(), + self.cl_active.load(Ordering::Relaxed), + ), + ( + "cl_waiting".to_string(), + self.cl_waiting.load(Ordering::Relaxed), + ), + ( + "cl_cancel_req".to_string(), + self.cl_cancel_req.load(Ordering::Relaxed), + ), + ( + "sv_active".to_string(), + self.sv_active.load(Ordering::Relaxed), + ), + ("sv_idle".to_string(), self.sv_idle.load(Ordering::Relaxed)), + ("sv_used".to_string(), self.sv_used.load(Ordering::Relaxed)), + ( + "sv_tested".to_string(), + self.sv_tested.load(Ordering::Relaxed), + ), + ( + "sv_login".to_string(), + self.sv_login.load(Ordering::Relaxed), + ), + ( + "maxwait".to_string(), + self.maxwait.load(Ordering::Relaxed) / 1_000_000, + ), + ( + "maxwait_us".to_string(), + self.maxwait.load(Ordering::Relaxed) % 1_000_000, + ), + ] + .into_iter() + } +} + +impl PoolStats { + pub fn new(identifier: PoolIdentifier, config: Pool) -> Self { + Self { + identifier, + config, + reporter: get_reporter(), + ..Default::default() + } + } + + // Getters + pub fn register(&self, stats: Arc) { + self.reporter.pool_register(self.identifier.clone(), stats); + } + + pub fn database(&self) -> String { + self.identifier.db.clone() + } + + pub fn user(&self) -> String { + self.identifier.user.clone() + } + + pub fn pool_mode(&self) -> PoolMode { + self.config.pool_mode + } + + /// Populates an array of strings with counters (used by admin in show pools) + pub fn populate_row(&self, row: &mut Vec) { + for (_key, value) in self.clone() { + row.push(value.to_string()); + } + } + + /// Deletes the maxwait counter, this is done everytime we obtain metrics + pub fn clear_maxwait(&self) { + self.maxwait.store(0, Ordering::Relaxed); + } + + /// Notified when a server of the pool enters login state. + /// + /// Arguments: + /// + /// `from`: The state of the server that notifies. + pub fn server_login(&self, from: ServerState) { + self.sv_login.fetch_add(1, Ordering::Relaxed); + if from != ServerState::Login { + self.decrease_from_server_state(from); + } + } + + /// Notified when a server of the pool become 'active' + /// + /// Arguments: + /// + /// `from`: The state of the server that notifies. + pub fn server_active(&self, from: ServerState) { + self.sv_active.fetch_add(1, Ordering::Relaxed); + if from != ServerState::Active { + self.decrease_from_server_state(from); + } + } + + /// Notified when a server of the pool become 'tested' + /// + /// Arguments: + /// + /// `from`: The state of the server that notifies. + pub fn server_tested(&self, from: ServerState) { + self.sv_tested.fetch_add(1, Ordering::Relaxed); + if from != ServerState::Tested { + self.decrease_from_server_state(from); + } + } + + /// Notified when a server of the pool become 'idle' + /// + /// Arguments: + /// + /// `from`: The state of the server that notifies. + pub fn server_idle(&self, from: ServerState) { + self.sv_idle.fetch_add(1, Ordering::Relaxed); + if from != ServerState::Idle { + self.decrease_from_server_state(from); + } + } + + /// Notified when a client of the pool become 'waiting' + /// + /// Arguments: + /// + /// `from`: The state of the client that notifies. + pub fn client_waiting(&self, from: ClientState) { + if from != ClientState::Waiting { + self.cl_waiting.fetch_add(1, Ordering::Relaxed); + self.decrease_from_client_state(from); + } + } + + /// Notified when a client of the pool become 'active' + /// + /// Arguments: + /// + /// `from`: The state of the client that notifies. + pub fn client_active(&self, from: ClientState) { + if from != ClientState::Active { + self.cl_active.fetch_add(1, Ordering::Relaxed); + self.decrease_from_client_state(from); + } + } + + /// Notified when a client of the pool become 'idle' + /// + /// Arguments: + /// + /// `from`: The state of the client that notifies. + pub fn client_idle(&self, from: ClientState) { + if from != ClientState::Idle { + self.cl_idle.fetch_add(1, Ordering::Relaxed); + self.decrease_from_client_state(from); + } + } + + /// Notified when a client disconnects. + /// + /// Arguments: + /// + /// `from`: The state of the client that notifies. + pub fn client_disconnect(&self, from: ClientState) { + let counter = match from { + ClientState::Idle => &self.cl_idle, + ClientState::Waiting => &self.cl_waiting, + ClientState::Active => &self.cl_active, + }; + + Self::decrease_counter(counter.clone()); + } + + /// Notified when a server disconnects. + /// + /// Arguments: + /// + /// `from`: The state of the client that notifies. + pub fn server_disconnect(&self, from: ServerState) { + let counter = match from { + ServerState::Active => &self.sv_active, + ServerState::Idle => &self.sv_idle, + ServerState::Login => &self.sv_login, + ServerState::Tested => &self.sv_tested, + }; + Self::decrease_counter(counter.clone()); + } + + // helpers for counter decrease + fn decrease_from_server_state(&self, from: ServerState) { + let counter = match from { + ServerState::Tested => &self.sv_tested, + ServerState::Active => &self.sv_active, + ServerState::Idle => &self.sv_idle, + ServerState::Login => &self.sv_login, + }; + Self::decrease_counter(counter.clone()); + } + + fn decrease_from_client_state(&self, from: ClientState) { + let counter = match from { + ClientState::Active => &self.cl_active, + ClientState::Idle => &self.cl_idle, + ClientState::Waiting => &self.cl_waiting, + }; + Self::decrease_counter(counter.clone()); + } + + fn decrease_counter(value: Arc) { + if value.load(Ordering::Relaxed) > 0 { + value.fetch_sub(1, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_decrease() { + let stat: PoolStats = PoolStats::default(); + stat.server_login(ServerState::Login); + stat.server_idle(ServerState::Login); + assert_eq!(stat.sv_login.load(Ordering::Relaxed), 0); + assert_eq!(stat.sv_idle.load(Ordering::Relaxed), 1); + } +} diff --git a/src/stats/server.rs b/src/stats/server.rs new file mode 100644 index 00000000..009e9b57 --- /dev/null +++ b/src/stats/server.rs @@ -0,0 +1,226 @@ +use super::AddressStats; +use super::PoolStats; +use super::{get_reporter, Reporter}; +use crate::config::Address; +use atomic_enum::atomic_enum; +use parking_lot::RwLock; +use std::sync::atomic::*; +use std::sync::Arc; +use tokio::time::Instant; + +/// The various states that a server can be in +#[atomic_enum] +#[derive(PartialEq)] +pub enum ServerState { + Login = 0, + Active, + Tested, + Idle, +} +impl std::fmt::Display for ServerState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match *self { + ServerState::Login => write!(f, "login"), + ServerState::Active => write!(f, "active"), + ServerState::Tested => write!(f, "tested"), + ServerState::Idle => write!(f, "idle"), + } + } +} + +/// Information we keep track of which can be queried by SHOW SERVERS +#[derive(Debug, Clone)] +pub struct ServerStats { + /// A random integer assigned to the server and used by stats to track the server + server_id: i32, + + /// Context information, only to be read + address: Address, + connect_time: Instant, + + pool_stats: Arc, + reporter: Reporter, + + /// Data + pub application_name: Arc>, + pub state: Arc, + pub bytes_sent: Arc, + pub bytes_received: Arc, + pub transaction_count: Arc, + pub query_count: Arc, + pub error_count: Arc, +} + +impl Default for ServerStats { + fn default() -> Self { + ServerStats { + server_id: 0, + application_name: Arc::new(RwLock::new(String::new())), + address: Address::default(), + pool_stats: Arc::new(PoolStats::default()), + connect_time: Instant::now(), + state: Arc::new(AtomicServerState::new(ServerState::Login)), + bytes_sent: Arc::new(AtomicU64::new(0)), + bytes_received: Arc::new(AtomicU64::new(0)), + transaction_count: Arc::new(AtomicU64::new(0)), + query_count: Arc::new(AtomicU64::new(0)), + error_count: Arc::new(AtomicU64::new(0)), + reporter: get_reporter(), + } + } +} + +impl ServerStats { + pub fn new(address: Address, pool_stats: Arc, connect_time: Instant) -> Self { + Self { + address, + pool_stats, + connect_time, + server_id: rand::random::(), + ..Default::default() + } + } + + pub fn server_id(&self) -> i32 { + self.server_id + } + + /// Register a server connection with the stats system. The stats system uses server_id + /// to track and aggregate statistics from all source that relate to that server + // Delegates to reporter + pub fn register(&self, stats: Arc) { + self.reporter.server_register(self.server_id, stats); + self.login(); + } + + /// Reports a server connection is no longer assigned to a client + /// and is available for the next client to pick it up + pub fn idle(&self) { + self.pool_stats + .server_idle(self.state.load(Ordering::Relaxed)); + + self.state.store(ServerState::Idle, Ordering::Relaxed); + self.set_undefined_application(); + } + + /// Reports a server connection is disconecting from the pooler. + /// Also updates metrics on the pool regarding server usage. + pub fn disconnect(&self) { + self.reporter.server_disconnecting(self.server_id); + self.pool_stats + .server_disconnect(self.state.load(Ordering::Relaxed)) + } + + /// Reports a server connection is being tested before being given to a client. + pub fn tested(&self) { + self.set_undefined_application(); + self.pool_stats + .server_tested(self.state.load(Ordering::Relaxed)); + self.state.store(ServerState::Tested, Ordering::Relaxed); + } + + /// Reports a server connection is attempting to login. + pub fn login(&self) { + self.pool_stats + .server_login(self.state.load(Ordering::Relaxed)); + self.state.store(ServerState::Login, Ordering::Relaxed); + self.set_undefined_application(); + } + + /// Reports a server connection has been assigned to a client that + /// is about to query the server + pub fn active(&self, application_name: String) { + self.pool_stats + .server_active(self.state.load(Ordering::Relaxed)); + self.state.store(ServerState::Active, Ordering::Relaxed); + self.set_application(application_name); + } + + pub fn address_stats(&self) -> Arc { + self.address.stats.clone() + } + + // Helper methods for show_servers + pub fn pool_name(&self) -> String { + self.pool_stats.database() + } + + pub fn username(&self) -> String { + self.pool_stats.user() + } + + pub fn address_name(&self) -> String { + self.address.name() + } + + pub fn connect_time(&self) -> Instant { + self.connect_time + } + + fn set_application(&self, name: String) { + let mut application_name = self.application_name.write(); + *application_name = name; + } + + fn set_undefined_application(&self) { + self.set_application(String::from("Undefined")) + } + + pub fn checkout_time(&self, microseconds: u64, application_name: String) { + // Update server stats and address aggergation stats + self.set_application(application_name); + self.address + .stats + .total_wait_time + .fetch_add(microseconds, Ordering::Relaxed); + self.pool_stats + .maxwait + .fetch_max(microseconds, Ordering::Relaxed); + } + + /// Report a query executed by a client against a server + pub fn query(&self, milliseconds: u64, application_name: &str) { + self.set_application(application_name.to_string()); + let address_stats = self.address_stats(); + address_stats + .total_query_count + .fetch_add(1, Ordering::Relaxed); + address_stats + .total_query_time + .fetch_add(milliseconds, Ordering::Relaxed); + } + + /// Report a transaction executed by a client a server + /// we report each individual queries outside a transaction as a transaction + /// We only count the initial BEGIN as a transaction, all queries within do not + /// count as transactions + pub fn transaction(&self, application_name: &str) { + self.set_application(application_name.to_string()); + + self.transaction_count.fetch_add(1, Ordering::Relaxed); + self.address + .stats + .total_xact_count + .fetch_add(1, Ordering::Relaxed); + } + + /// Report data sent to a server + pub fn data_sent(&self, amount_bytes: usize) { + self.bytes_sent + .fetch_add(amount_bytes as u64, Ordering::Relaxed); + self.address + .stats + .total_sent + .fetch_add(amount_bytes as u64, Ordering::Relaxed); + } + + /// Report data received from a server + pub fn data_received(&self, amount_bytes: usize) { + self.bytes_received + .fetch_add(amount_bytes as u64, Ordering::Relaxed); + self.address + .stats + .total_received + .fetch_add(amount_bytes as u64, Ordering::Relaxed); + } +} diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index f69c3df6..b611e8cb 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -176,6 +176,47 @@ end end + context "clients connects and disconnect normally" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } + + it 'shows the same number of clients before and after' do + clients_before = clients_connected_to_pool(processes: processes) + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT 1") } + end + clients_between = clients_connected_to_pool(processes: processes) + expect(clients_before).not_to eq(clients_between) + connections.each(&:close) + clients_after = clients_connected_to_pool(processes: processes) + expect(clients_before).to eq(clients_after) + end + end + + context "clients connects and disconnect abruptly" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + + it 'shows the same number of clients before and after' do + threads = [] + connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT 1") } + end + clients_before = clients_connected_to_pool(processes: processes) + random_string = (0...8).map { (65 + rand(26)).chr }.join + connection_string = "#{pgcat_conn_str}?application_name=#{random_string}" + faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null") + sleep(1) + # psql starts two processes, we only know the pid of the parent, this + # ensure both are killed + `pkill -9 -f '#{random_string}'` + Process.wait(faulty_client) + clients_after = clients_connected_to_pool(processes: processes) + expect(clients_before).to eq(clients_after) + end + end + context "clients overwhelm server pools" do let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } @@ -199,7 +240,7 @@ sleep(2.5) # Allow time for stats to update results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" end expect(results["cl_idle"]).to eq("4") diff --git a/tests/ruby/capture b/tests/ruby/capture new file mode 100644 index 00000000..268680ff Binary files /dev/null and b/tests/ruby/capture differ diff --git a/tests/ruby/spec_helper.rb b/tests/ruby/spec_helper.rb index d6796401..a95969fd 100644 --- a/tests/ruby/spec_helper.rb +++ b/tests/ruby/spec_helper.rb @@ -19,3 +19,10 @@ def with_captured_stdout_stderr STDOUT.reopen(sout) STDERR.reopen(serr) end + +def clients_connected_to_pool(pool_index: 0, processes:) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[pool_index] + admin_conn.close + results['cl_idle'].to_i + results['cl_active'].to_i + results['cl_waiting'].to_i +end