From 6daafad9a4b8e0b8beee544c0479b142f41b1b23 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 23 Sep 2022 16:24:15 -0400 Subject: [PATCH 01/11] Change sharding config to enum and move validation of configs into public functions --- src/config.rs | 243 +++++++++++++++++++++++++----------------------- src/pool.rs | 6 +- src/sharding.rs | 14 ++- 3 files changed, 140 insertions(+), 123 deletions(-) diff --git a/src/config.rs b/src/config.rs index d13bee62..55548795 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,7 @@ use toml; use crate::errors::Error; use crate::pool::{ClientServerMap, ConnectionPool}; +use crate::sharding::ShardingFunction; use crate::tls::{load_certs, load_keys}; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -179,31 +180,31 @@ pub struct General { } impl General { - fn default_host() -> String { + pub fn default_host() -> String { "0.0.0.0".into() } - fn default_port() -> i16 { + pub fn default_port() -> i16 { 5432 } - fn default_connect_timeout() -> u64 { + pub fn default_connect_timeout() -> u64 { 1000 } - fn default_shutdown_timeout() -> u64 { + pub fn default_shutdown_timeout() -> u64 { 60000 } - fn default_healthcheck_timeout() -> u64 { + pub fn default_healthcheck_timeout() -> u64 { 1000 } - fn default_healthcheck_delay() -> u64 { + pub fn default_healthcheck_delay() -> u64 { 30000 } - fn default_ban_time() -> i64 { + pub fn default_ban_time() -> i64 { 60 } } @@ -266,15 +267,47 @@ pub struct Pool { #[serde(default = "General::default_connect_timeout")] pub connect_timeout: u64, - pub sharding_function: String, + pub sharding_function: ShardingFunction, pub shards: BTreeMap, pub users: BTreeMap, } impl Pool { - fn default_pool_mode() -> PoolMode { + pub fn default_pool_mode() -> PoolMode { PoolMode::Transaction } + + pub fn validate(&self) -> Result<(), Error> { + match self.default_role.as_ref() { + "any" => (), + "primary" => (), + "replica" => (), + other => { + error!( + "Query router default_role must be 'primary', 'replica', or 'any', got: '{}'", + other + ); + return Err(Error::BadConfig); + } + }; + + for (shard_idx, shard) in &self.shards { + match shard_idx.parse::() { + Ok(_) => (), + Err(_) => { + error!( + "Shard '{}' is not a valid number, shards must be numbered starting at 0", + shard_idx + ); + return Err(Error::BadConfig); + } + }; + + shard.validate()?; + } + + Ok(()) + } } impl Default for Pool { @@ -286,7 +319,7 @@ impl Default for Pool { default_role: String::from("any"), query_parser_enabled: false, primary_reads_enabled: false, - sharding_function: "pg_bigint_hash".to_string(), + sharding_function: ShardingFunction::PgBigintHash, connect_timeout: General::default_connect_timeout(), } } @@ -306,6 +339,45 @@ pub struct Shard { pub servers: Vec, } +impl Shard { + pub fn validate(&self) -> Result<(), Error> { + // We use addresses as unique identifiers, + // let's make sure they are unique in the config as well. + let mut dup_check = HashSet::new(); + let mut primary_count = 0; + + if self.servers.len() == 0 { + error!("Shard {} has no servers configured", self.database); + return Err(Error::BadConfig); + } + + for server in &self.servers { + dup_check.insert(server); + + // Check that we define only zero or one primary. + match server.role { + Role::Primary => primary_count += 1, + _ => (), + }; + } + + if primary_count > 1 { + error!( + "Shard {} has more than on primary configured", + self.database + ); + return Err(Error::BadConfig); + } + + if dup_check.len() != self.servers.len() { + error!("Shard {} contains duplicate server configs", self.database); + return Err(Error::BadConfig); + } + + Ok(()) + } +} + impl Default for Shard { fn default() -> Shard { Shard { @@ -341,7 +413,7 @@ pub struct Config { } impl Config { - fn default_path() -> String { + pub fn default_path() -> String { String::from("pgcat.toml") } } @@ -381,7 +453,7 @@ impl From<&Config> for std::collections::HashMap { ), ( format!("pools.{}.sharding_function", pool_name), - pool.sharding_function.clone(), + pool.sharding_function.to_string(), ), ( format!("pools.{:?}.shard_count", pool_name), @@ -479,7 +551,8 @@ impl Config { ); info!( "[pool: {}] Sharding function: {}", - pool_name, pool_config.sharding_function + pool_name, + pool_config.sharding_function.to_string() ); info!( "[pool: {}] Primary reads: {}", @@ -512,6 +585,45 @@ impl Config { } } } + + pub fn validate(&self) -> Result<(), Error> { + // Validate TLS! + match self.general.tls_certificate.clone() { + Some(tls_certificate) => { + match load_certs(&Path::new(&tls_certificate)) { + Ok(_) => { + // Cert is okay, but what about the private key? + match self.general.tls_private_key.clone() { + Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) { + Ok(_) => (), + Err(err) => { + error!("tls_private_key is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); + } + }, + + None => { + error!("tls_certificate is set, but the tls_private_key is not"); + return Err(Error::BadConfig); + } + }; + } + + Err(err) => { + error!("tls_certificate is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); + } + } + } + None => (), + }; + + for (_, pool) in &self.pools { + pool.validate()?; + } + + Ok(()) + } } /// Get a read-only instance of the configuration @@ -548,110 +660,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { } }; - // Validate TLS! - match config.general.tls_certificate.clone() { - Some(tls_certificate) => { - match load_certs(&Path::new(&tls_certificate)) { - Ok(_) => { - // Cert is okay, but what about the private key? - match config.general.tls_private_key.clone() { - Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) { - Ok(_) => (), - Err(err) => { - error!("tls_private_key is incorrectly configured: {:?}", err); - return Err(Error::BadConfig); - } - }, - - None => { - error!("tls_certificate is set, but the tls_private_key is not"); - return Err(Error::BadConfig); - } - }; - } - - Err(err) => { - error!("tls_certificate is incorrectly configured: {:?}", err); - return Err(Error::BadConfig); - } - } - } - None => (), - }; - - for (pool_name, mut pool) in &mut config.pools { - // Copy the connect timeout over for hashing. - pool.connect_timeout = config.general.connect_timeout; - - match pool.sharding_function.as_ref() { - "pg_bigint_hash" => (), - "sha1" => (), - _ => { - error!( - "Supported sharding functions are: 'pg_bigint_hash', 'sha1', got: '{}' in pool {} settings", - pool.sharding_function, - pool_name - ); - return Err(Error::BadConfig); - } - }; - - match pool.default_role.as_ref() { - "any" => (), - "primary" => (), - "replica" => (), - other => { - error!( - "Query router default_role must be 'primary', 'replica', or 'any', got: '{}'", - other - ); - return Err(Error::BadConfig); - } - }; - - for shard in &pool.shards { - // We use addresses as unique identifiers, - // let's make sure they are unique in the config as well. - let mut dup_check = HashSet::new(); - let mut primary_count = 0; - - match shard.0.parse::() { - Ok(_) => (), - Err(_) => { - error!( - "Shard '{}' is not a valid number, shards must be numbered starting at 0", - shard.0 - ); - return Err(Error::BadConfig); - } - }; - - if shard.1.servers.len() == 0 { - error!("Shard {} has no servers configured", shard.0); - return Err(Error::BadConfig); - } - - for server in &shard.1.servers { - dup_check.insert(server); - - // Check that we define only zero or one primary. - match server.role { - Role::Primary => primary_count += 1, - _ => (), - }; - } - - if primary_count > 1 { - error!("Shard {} has more than on primary configured", &shard.0); - return Err(Error::BadConfig); - } - - if dup_check.len() != shard.1.servers.len() { - error!("Shard {} contains duplicate server configs", &shard.0); - return Err(Error::BadConfig); - } - } - } + config.validate()?; config.path = path.to_string(); diff --git a/src/pool.rs b/src/pool.rs index d9c9e7d6..9289740c 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -221,11 +221,7 @@ impl ConnectionPool { }, query_parser_enabled: pool_config.query_parser_enabled.clone(), primary_reads_enabled: pool_config.primary_reads_enabled, - sharding_function: match pool_config.sharding_function.as_str() { - "pg_bigint_hash" => ShardingFunction::PgBigintHash, - "sha1" => ShardingFunction::Sha1, - _ => unreachable!(), - }, + sharding_function: pool_config.sharding_function, }, }; diff --git a/src/sharding.rs b/src/sharding.rs index c332c601..f1316f72 100644 --- a/src/sharding.rs +++ b/src/sharding.rs @@ -1,3 +1,4 @@ +use serde_derive::{Serialize, Deserialize}; /// Implements various sharding functions. use sha1::{Digest, Sha1}; @@ -5,12 +6,23 @@ use sha1::{Digest, Sha1}; const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD; /// The sharding functions we support. -#[derive(Debug, PartialEq, Copy, Clone)] +#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize, Hash, std::cmp::Eq)] pub enum ShardingFunction { + #[serde(alias = "pg_bigint_hash", alias = "PgBigintHash")] PgBigintHash, + #[serde(alias = "sha1", alias = "Sha1")] Sha1, } +impl ToString for ShardingFunction { + fn to_string(&self) -> String { + match *self { + ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(), + ShardingFunction::Sha1 => "sha1".to_string(), + } + } +} + /// The sharder. pub struct Sharder { /// Number of shards in the cluster. From e0ae98fa6587a546ca284e80c87c63cf19da9fee Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 23 Sep 2022 16:28:05 -0400 Subject: [PATCH 02/11] fnt --- src/config.rs | 25 +++++++++++++++---------- src/sharding.rs | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index 55548795..8b621a2f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -594,21 +594,26 @@ impl Config { Ok(_) => { // Cert is okay, but what about the private key? match self.general.tls_private_key.clone() { - Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) { - Ok(_) => (), - Err(err) => { - error!("tls_private_key is incorrectly configured: {:?}", err); - return Err(Error::BadConfig); + Some(tls_private_key) => { + match load_keys(&Path::new(&tls_private_key)) { + Ok(_) => (), + Err(err) => { + error!( + "tls_private_key is incorrectly configured: {:?}", + err + ); + return Err(Error::BadConfig); + } } - }, - + } + None => { error!("tls_certificate is set, but the tls_private_key is not"); return Err(Error::BadConfig); } }; } - + Err(err) => { error!("tls_certificate is incorrectly configured: {:?}", err); return Err(Error::BadConfig); @@ -617,11 +622,11 @@ impl Config { } None => (), }; - + for (_, pool) in &self.pools { pool.validate()?; } - + Ok(()) } } diff --git a/src/sharding.rs b/src/sharding.rs index f1316f72..c5ab45e7 100644 --- a/src/sharding.rs +++ b/src/sharding.rs @@ -1,4 +1,4 @@ -use serde_derive::{Serialize, Deserialize}; +use serde_derive::{Deserialize, Serialize}; /// Implements various sharding functions. use sha1::{Digest, Sha1}; From ecb0e797f0b518cd6d43059032fdc1abcd4bf03e Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 23 Sep 2022 16:47:45 -0400 Subject: [PATCH 03/11] flakey test? --- src/config.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 8b621a2f..63ad295e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -302,7 +302,6 @@ impl Pool { return Err(Error::BadConfig); } }; - shard.validate()?; } From 86350b57e2462e4e8dc699ec6df0789ab1cfcb41 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 23 Sep 2022 17:17:55 -0400 Subject: [PATCH 04/11] test --- .circleci/pgcat.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index 56aa1ddc..433220e1 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -33,7 +33,7 @@ shutdown_timeout = 5000 ban_time = 60 # Seconds # Reload config automatically if it changes. -autoreload = true +autoreload = false # TLS tls_certificate = ".circleci/server.cert" From 38f4d49317b84f8de2de4e77816b241fbc834ac4 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 23 Sep 2022 18:08:30 -0400 Subject: [PATCH 05/11] undo --- .circleci/pgcat.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index 433220e1..56aa1ddc 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -33,7 +33,7 @@ shutdown_timeout = 5000 ban_time = 60 # Seconds # Reload config automatically if it changes. -autoreload = false +autoreload = true # TLS tls_certificate = ".circleci/server.cert" From 85f6fd5c82ecd30c14e875d89f000c8f27a40f29 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Sat, 24 Sep 2022 00:19:40 -0400 Subject: [PATCH 06/11] fix connect timeout bug --- src/config.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index 63ad295e..94f736e5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -212,15 +212,15 @@ impl General { impl Default for General { fn default() -> General { General { - host: General::default_host(), - port: General::default_port(), + host: Self::default_host(), + port: Self::default_port(), enable_prometheus_exporter: Some(false), prometheus_exporter_port: 9930, connect_timeout: General::default_connect_timeout(), - shutdown_timeout: General::default_shutdown_timeout(), - healthcheck_timeout: General::default_healthcheck_timeout(), - healthcheck_delay: General::default_healthcheck_delay(), - ban_time: General::default_ban_time(), + shutdown_timeout: Self::default_shutdown_timeout(), + healthcheck_timeout: Self::default_healthcheck_timeout(), + healthcheck_delay: Self::default_healthcheck_delay(), + ban_time: Self::default_ban_time(), autoreload: false, tls_certificate: None, tls_private_key: None, @@ -312,7 +312,7 @@ impl Pool { impl Default for Pool { fn default() -> Pool { Pool { - pool_mode: Pool::default_pool_mode(), + pool_mode: Self::default_pool_mode(), shards: BTreeMap::from([(String::from("1"), Shard::default())]), users: BTreeMap::default(), default_role: String::from("any"), @@ -420,7 +420,7 @@ impl Config { impl Default for Config { fn default() -> Config { Config { - path: Config::default_path(), + path: Self::default_path(), general: General::default(), pools: HashMap::default(), } @@ -548,6 +548,10 @@ impl Config { "[pool: {}] Pool mode: {:?}", pool_name, pool_config.pool_mode ); + info!( + "[pool: {}] Connection timeout: {}ms", + pool_name, pool_config.connect_timeout + ); info!( "[pool: {}] Sharding function: {}", pool_name, @@ -585,7 +589,7 @@ impl Config { } } - pub fn validate(&self) -> Result<(), Error> { + pub fn validate(&mut self, connect_timeout: u64) -> Result<(), Error> { // Validate TLS! match self.general.tls_certificate.clone() { Some(tls_certificate) => { @@ -622,7 +626,8 @@ impl Config { None => (), }; - for (_, pool) in &self.pools { + for (_, pool) in &mut self.pools { + pool.connect_timeout = connect_timeout; pool.validate()?; } @@ -664,7 +669,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { } }; - config.validate()?; + config.validate(config.general.connect_timeout)?; config.path = path.to_string(); From de99e749e32a3eba3189dcdc2d13cbbb8f90c6d0 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Sat, 24 Sep 2022 00:27:32 -0400 Subject: [PATCH 07/11] override stuff --- src/config.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 94f736e5..776fbe49 100644 --- a/src/config.rs +++ b/src/config.rs @@ -264,7 +264,7 @@ pub struct Pool { #[serde(default)] // False pub primary_reads_enabled: bool, - #[serde(default = "General::default_connect_timeout")] + #[serde(default = "Pool::default_connect_timeout")] pub connect_timeout: u64, pub sharding_function: ShardingFunction, @@ -277,6 +277,10 @@ impl Pool { PoolMode::Transaction } + pub fn default_connect_timeout() -> u64 { + General::default_connect_timeout() + } + pub fn validate(&self) -> Result<(), Error> { match self.default_role.as_ref() { "any" => (), @@ -319,7 +323,7 @@ impl Default for Pool { query_parser_enabled: false, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, - connect_timeout: General::default_connect_timeout(), + connect_timeout: Self::default_connect_timeout(), } } } @@ -397,7 +401,7 @@ pub struct Config { // so we should always put simple fields before nested fields // in all serializable structs to avoid ValueAfterTable errors // These errors occur when the toml serializer is about to produce - // ambigous toml structure like the one below + // ambiguous toml structure like the one below // [main] // field1_under_main = 1 // field2_under_main = 2 @@ -627,7 +631,11 @@ impl Config { }; for (_, pool) in &mut self.pools { - pool.connect_timeout = connect_timeout; + // Don't override general connect_timeout the pool connect_timeout is non-default + if pool.connect_timeout != Pool::default_connect_timeout() { + pool.connect_timeout = connect_timeout; + } + pool.validate()?; } From ddea5870790c72f0d259334a3a37495c8000063f Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Sat, 24 Sep 2022 00:30:49 -0400 Subject: [PATCH 08/11] typo --- src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 776fbe49..d404765a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -631,8 +631,8 @@ impl Config { }; for (_, pool) in &mut self.pools { - // Don't override general connect_timeout the pool connect_timeout is non-default - if pool.connect_timeout != Pool::default_connect_timeout() { + // Don't override with general connect_timeout if the pool connect_timeout is non-default + if pool.connect_timeout == Pool::default_connect_timeout() { pool.connect_timeout = connect_timeout; } From ad6fa98b39fd4765d9926d4c97f57fc987a84d75 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Sat, 24 Sep 2022 00:39:41 -0400 Subject: [PATCH 09/11] revert override stuff --- src/config.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index d404765a..5e03e709 100644 --- a/src/config.rs +++ b/src/config.rs @@ -631,10 +631,7 @@ impl Config { }; for (_, pool) in &mut self.pools { - // Don't override with general connect_timeout if the pool connect_timeout is non-default - if pool.connect_timeout == Pool::default_connect_timeout() { - pool.connect_timeout = connect_timeout; - } + pool.connect_timeout = connect_timeout; pool.validate()?; } From d7bc7673beff76ca121ed27d528b560acf0ede62 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 26 Sep 2022 14:39:28 -0400 Subject: [PATCH 10/11] Change connect timeout config usage to option --- src/config.rs | 21 +++++++++------------ src/pool.rs | 9 ++++++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5e03e709..7ef5aca1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -264,8 +264,7 @@ pub struct Pool { #[serde(default)] // False pub primary_reads_enabled: bool, - #[serde(default = "Pool::default_connect_timeout")] - pub connect_timeout: u64, + pub connect_timeout: Option, pub sharding_function: ShardingFunction, pub shards: BTreeMap, @@ -277,10 +276,6 @@ impl Pool { PoolMode::Transaction } - pub fn default_connect_timeout() -> u64 { - General::default_connect_timeout() - } - pub fn validate(&self) -> Result<(), Error> { match self.default_role.as_ref() { "any" => (), @@ -323,7 +318,7 @@ impl Default for Pool { query_parser_enabled: false, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, - connect_timeout: Self::default_connect_timeout(), + connect_timeout: None, } } } @@ -552,9 +547,13 @@ impl Config { "[pool: {}] Pool mode: {:?}", pool_name, pool_config.pool_mode ); + let connect_timeout = match pool_config.connect_timeout { + Some(connect_timeout) => connect_timeout, + None => self.general.connect_timeout + }; info!( "[pool: {}] Connection timeout: {}ms", - pool_name, pool_config.connect_timeout + pool_name, connect_timeout ); info!( "[pool: {}] Sharding function: {}", @@ -593,7 +592,7 @@ impl Config { } } - pub fn validate(&mut self, connect_timeout: u64) -> Result<(), Error> { + pub fn validate(&mut self) -> Result<(), Error> { // Validate TLS! match self.general.tls_certificate.clone() { Some(tls_certificate) => { @@ -631,8 +630,6 @@ impl Config { }; for (_, pool) in &mut self.pools { - pool.connect_timeout = connect_timeout; - pool.validate()?; } @@ -674,7 +671,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { } }; - config.validate(config.general.connect_timeout)?; + config.validate()?; config.path = path.to_string(); diff --git a/src/pool.rs b/src/pool.rs index 9289740c..815a2b8b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -181,11 +181,14 @@ impl ConnectionPool { get_reporter(), ); + let connect_timeout = match pool_config.connect_timeout { + Some(connect_timeout) => connect_timeout, + None => config.general.connect_timeout, + }; + let pool = Pool::builder() .max_size(user.pool_size) - .connection_timeout(std::time::Duration::from_millis( - pool_config.connect_timeout, - )) + .connection_timeout(std::time::Duration::from_millis(connect_timeout)) .test_on_check_out(false) .build(manager) .await From 11800656eee8deb137a31cd455c87eda3431b245 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 26 Sep 2022 15:46:51 -0400 Subject: [PATCH 11/11] fmt --- src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 7ef5aca1..1cb37595 100644 --- a/src/config.rs +++ b/src/config.rs @@ -547,9 +547,9 @@ impl Config { "[pool: {}] Pool mode: {:?}", pool_name, pool_config.pool_mode ); - let connect_timeout = match pool_config.connect_timeout { + let connect_timeout = match pool_config.connect_timeout { Some(connect_timeout) => connect_timeout, - None => self.general.connect_timeout + None => self.general.connect_timeout, }; info!( "[pool: {}] Connection timeout: {}ms",