diff --git a/.circleci/config.yml b/.circleci/config.yml index f7aa899b..a43d4bd7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,28 +18,28 @@ jobs: RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Copt-level=0 -Clink-dead-code -Coverflow-checks=off -Zpanic_abort_tests -Cpanic=abort -Cinstrument-coverage" RUSTDOCFLAGS: "-Cpanic=abort" - image: postgres:14 - command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 - image: postgres:14 - command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - image: postgres:14 - command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - image: postgres:14 - command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 644f22e8..a5cfab0b 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -92,12 +92,12 @@ sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml kill -SIGHUP $(pgrep pgcat) # Reload config again # -# ActiveRecord tests +# Integration tests and ActiveRecord tests # cd tests/ruby sudo bundle install -bundle exec ruby tests.rb || exit 1 -bundle exec rspec *_spec.rb || exit 1 +bundle exec ruby tests.rb --format documentation || exit 1 +bundle exec rspec *_spec.rb --format documentation || exit 1 cd ../.. # diff --git a/.gitignore b/.gitignore index b3ca0139..0b436164 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ /target *.deb .vscode -.profraw +*.profraw cov/ lcov.info diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index ee609e0f..da759383 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -33,7 +33,7 @@ services: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 PGPORT: 5432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"] + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg2: <<: *common-definition-pg @@ -41,21 +41,21 @@ services: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 PGPORT: 7432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"] + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg3: <<: *common-definition-pg environment: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 PGPORT: 8432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"] + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg4: <<: *common-definition-pg environment: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 PGPORT: 9432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"] + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] toxiproxy: build: . diff --git a/src/admin.rs b/src/admin.rs index c90f28ea..feea3a15 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,4 +1,3 @@ -use crate::config::Role; use crate::pool::BanReason; /// Admin database. use bytes::{Buf, BufMut, BytesMut}; diff --git a/src/config.rs b/src/config.rs index 517cabce..9b90ebea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,6 +29,8 @@ pub enum Role { Primary, #[serde(alias = "replica", alias = "Replica")] Replica, + #[serde(alias = "mirror", alias = "Mirror")] + Mirror, } impl ToString for Role { @@ -36,6 +38,7 @@ impl ToString for Role { match *self { Role::Primary => "primary".to_string(), Role::Replica => "replica".to_string(), + Role::Mirror => "mirror".to_string(), } } } @@ -90,6 +93,9 @@ pub struct Address { /// The name of this pool (i.e. database name visible to the client). pub pool_name: String, + + /// List of addresses to receive mirrored traffic. + pub mirrors: Vec
, } impl Default for Address { @@ -105,6 +111,7 @@ impl Default for Address { role: Role::Replica, username: String::from("username"), pool_name: String::from("pool_name"), + mirrors: Vec::new(), } } } @@ -114,11 +121,14 @@ impl Address { pub fn name(&self) -> String { match self.role { Role::Primary => format!("{}_shard_{}_primary", self.pool_name, self.shard), - Role::Replica => format!( "{}_shard_{}_replica_{}", self.pool_name, self.shard, self.replica_number ), + Role::Mirror => format!( + "{}_shard_{}_mirror_{}", + self.pool_name, self.shard, self.replica_number + ), } } } @@ -465,11 +475,19 @@ pub struct ServerConfig { pub role: Role, } +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)] +pub struct MirrorServerConfig { + pub host: String, + pub port: u16, + pub mirroring_target_index: usize, +} + /// Shard configuration. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)] pub struct Shard { pub database: String, pub servers: Vec, + pub mirrors: Option>, } impl Shard { @@ -518,6 +536,7 @@ impl Default for Shard { port: 5432, role: Role::Primary, }], + mirrors: None, database: String::from("postgres"), } } diff --git a/src/lib.rs b/src/lib.rs index 63eae59b..67aa9cba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod constants; pub mod errors; pub mod messages; +pub mod mirrors; pub mod multi_logger; pub mod pool; pub mod scram; diff --git a/src/main.rs b/src/main.rs index b3ef77c3..e2ff5d8d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ mod config; mod constants; mod errors; mod messages; +mod mirrors; mod multi_logger; mod pool; mod prometheus; diff --git a/src/mirrors.rs b/src/mirrors.rs new file mode 100644 index 00000000..6a59172b --- /dev/null +++ b/src/mirrors.rs @@ -0,0 +1,169 @@ +/// A mirrored PostgreSQL client. +/// Packets arrive to us through a channel from the main client and we send them to the server. +use bb8::Pool; +use bytes::{Bytes, BytesMut}; + +use crate::config::{get_config, Address, Role, User}; +use crate::pool::{ClientServerMap, ServerPool}; +use crate::stats::get_reporter; +use log::{error, info, trace, warn}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +pub struct MirroredClient { + address: Address, + user: User, + database: String, + bytes_rx: Receiver, + disconnect_rx: Receiver<()>, +} + +impl MirroredClient { + async fn create_pool(&self) -> Pool { + let config = get_config(); + let default = std::time::Duration::from_millis(10_000).as_millis() as u64; + let (connection_timeout, idle_timeout) = match config.pools.get(&self.address.pool_name) { + Some(cfg) => ( + cfg.connect_timeout.unwrap_or(default), + cfg.idle_timeout.unwrap_or(default), + ), + None => (default, default), + }; + + let manager = ServerPool::new( + self.address.clone(), + self.user.clone(), + self.database.as_str(), + ClientServerMap::default(), + get_reporter(), + ); + + Pool::builder() + .max_size(1) + .connection_timeout(std::time::Duration::from_millis(connection_timeout)) + .idle_timeout(Some(std::time::Duration::from_millis(idle_timeout))) + .test_on_check_out(false) + .build(manager) + .await + .unwrap() + } + + pub fn start(mut self) { + tokio::spawn(async move { + let pool = self.create_pool().await; + let address = self.address.clone(); + loop { + let mut server = match pool.get().await { + Ok(server) => server, + Err(err) => { + error!( + "Failed to get connection from pool, Discarding message {:?}, {:?}", + err, + address.clone() + ); + continue; + } + }; + + tokio::select! { + // Exit channel events + _ = self.disconnect_rx.recv() => { + info!("Got mirror exit signal, exiting {:?}", address.clone()); + break; + } + + // Incoming data from server (we read to clear the socket buffer and discard the data) + recv_result = server.recv() => { + match recv_result { + Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()), + Err(err) => { + server.mark_bad(); + error!("Failed to receive from mirror {:?} {:?}", err, address.clone()); + } + } + } + + // Messages to send to the server + message = self.bytes_rx.recv() => { + match message { + Some(bytes) => { + match server.send(&BytesMut::from(&bytes[..])).await { + Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()), + Err(err) => { + server.mark_bad(); + error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()) + } + } + } + None => { + info!("Mirror channel closed, exiting {:?}", address.clone()); + break; + }, + } + } + } + } + }); + } +} +pub struct MirroringManager { + pub byte_senders: Vec>, + pub disconnect_senders: Vec>, +} +impl MirroringManager { + pub fn from_addresses( + user: User, + database: String, + addresses: Vec
, + ) -> MirroringManager { + let mut byte_senders: Vec> = vec![]; + let mut exit_senders: Vec> = vec![]; + + addresses.iter().for_each(|mirror| { + let (bytes_tx, bytes_rx) = channel::(500); + let (exit_tx, exit_rx) = channel::<()>(1); + let mut addr = mirror.clone(); + addr.role = Role::Mirror; + let client = MirroredClient { + user: user.clone(), + database: database.to_owned(), + address: addr, + bytes_rx, + disconnect_rx: exit_rx, + }; + exit_senders.push(exit_tx.clone()); + byte_senders.push(bytes_tx.clone()); + client.start(); + }); + + Self { + byte_senders: byte_senders, + disconnect_senders: exit_senders, + } + } + + pub fn send(self: &mut Self, bytes: &BytesMut) { + let cpy = bytes.clone().freeze(); + self.byte_senders + .iter_mut() + .for_each(|sender| match sender.try_send(cpy.clone()) { + Ok(_) => {} + Err(err) => { + warn!("Failed to send bytes to a mirror channel {}", err); + } + }); + } + + pub fn disconnect(self: &mut Self) { + self.disconnect_senders + .iter_mut() + .for_each(|sender| match sender.try_send(()) { + Ok(_) => {} + Err(err) => { + warn!( + "Failed to send disconnect signal to a mirror channel {}", + err + ); + } + }); + } +} diff --git a/src/pool.rs b/src/pool.rs index 0a0a53fa..3a6ec3e6 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -193,7 +193,7 @@ impl ConnectionPool { let config = get_config(); let mut new_pools = HashMap::new(); - let mut address_id = 0; + let mut address_id: usize = 0; for (pool_name, pool_config) in &config.pools { let new_pool_hash_value = pool_config.hash_value(); @@ -244,7 +244,33 @@ impl ConnectionPool { let mut servers = Vec::new(); let mut replica_number = 0; + // Load Mirror settings for (address_index, server) in shard.servers.iter().enumerate() { + let mut mirror_addresses = vec![]; + if let Some(mirror_settings_vec) = &shard.mirrors { + for (mirror_idx, mirror_settings) in + mirror_settings_vec.iter().enumerate() + { + if mirror_settings.mirroring_target_index != address_index { + continue; + } + mirror_addresses.push(Address { + id: address_id, + database: shard.database.clone(), + host: mirror_settings.host.clone(), + port: mirror_settings.port, + role: server.role, + address_index: mirror_idx, + replica_number, + shard: shard_idx.parse::().unwrap(), + username: user.username.clone(), + pool_name: pool_name.clone(), + mirrors: vec![], + }); + address_id += 1; + } + } + let address = Address { id: address_id, database: shard.database.clone(), @@ -256,6 +282,7 @@ impl ConnectionPool { shard: shard_idx.parse::().unwrap(), username: user.username.clone(), pool_name: pool_name.clone(), + mirrors: mirror_addresses, }; address_id += 1; diff --git a/src/query_router.rs b/src/query_router.rs index fff5bba8..fbff68e9 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -223,6 +223,7 @@ impl QueryRouter { Command::ShowServerRole => match self.active_role { Some(Role::Primary) => Role::Primary.to_string(), Some(Role::Replica) => Role::Replica.to_string(), + Some(Role::Mirror) => Role::Mirror.to_string(), None => { if self.query_parser_enabled() { String::from("auto") diff --git a/src/server.rs b/src/server.rs index 1d9bcd14..b3dbd6f7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,6 +14,7 @@ use crate::config::{Address, User}; use crate::constants::*; use crate::errors::Error; use crate::messages::*; +use crate::mirrors::MirroringManager; use crate::pool::ClientServerMap; use crate::scram::ScramSha256; use crate::stats::Reporter; @@ -68,6 +69,8 @@ pub struct Server { // Last time that a successful server send or response happened last_activity: SystemTime, + + mirror_manager: Option, } impl Server { @@ -334,6 +337,14 @@ impl Server { stats, application_name: String::new(), last_activity: SystemTime::now(), + mirror_manager: match address.mirrors.len() { + 0 => None, + _ => Some(MirroringManager::from_addresses( + user.clone(), + database.to_owned(), + address.mirrors.clone(), + )), + }, }; server.set_name("pgcat").await?; @@ -384,6 +395,7 @@ impl Server { /// Send messages to the server from the client. pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> { + self.mirror_send(messages); self.stats.data_sent(messages.len(), self.server_id); match write_all_half(&mut self.write, messages).await { @@ -674,6 +686,20 @@ impl Server { pub fn mark_dirty(&mut self) { self.needs_cleanup = true; } + + pub fn mirror_send(&mut self, bytes: &BytesMut) { + match self.mirror_manager.as_mut() { + Some(manager) => manager.send(bytes), + None => (), + } + } + + pub fn mirror_disconnect(&mut self) { + match self.mirror_manager.as_mut() { + Some(manager) => manager.disconnect(), + None => (), + } + } } impl Drop for Server { @@ -681,6 +707,7 @@ impl Drop for Server { /// the socket is in non-blocking mode, so it may not be ready /// for a write. fn drop(&mut self) { + self.mirror_disconnect(); self.stats.server_disconnecting(self.server_id); let mut bytes = BytesMut::with_capacity(4); diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index e44dc529..e57d8529 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -8,7 +8,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"] + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg2: image: postgres:14 network_mode: "service:main" @@ -17,7 +17,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"] + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg3: image: postgres:14 network_mode: "service:main" @@ -26,7 +26,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"] + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg4: image: postgres:14 network_mode: "service:main" @@ -35,7 +35,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"] + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] main: build: . command: ["bash", "/app/tests/docker/run.sh"] diff --git a/tests/ruby/helpers/pg_instance.rb b/tests/ruby/helpers/pg_instance.rb index 31164575..a3828248 100644 --- a/tests/ruby/helpers/pg_instance.rb +++ b/tests/ruby/helpers/pg_instance.rb @@ -38,6 +38,8 @@ def with_connection def reset reset_toxics reset_stats + drop_connections + sleep 0.1 end def toxiproxy @@ -66,12 +68,22 @@ def delete_proxy def reset_toxics Toxiproxy[@toxiproxy_name].toxics.each(&:destroy) + sleep 0.1 end def reset_stats with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") } end + def drop_connections + username = with_connection { |c| c.async_exec("SELECT current_user")[0]["current_user"] } + with_connection { |c| c.async_exec("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename='#{username}'") } + end + + def count_connections + with_connection { |c| c.async_exec("SELECT COUNT(*) as count FROM pg_stat_activity")[0]["count"].to_i } + end + def count_query(query) with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i } end diff --git a/tests/ruby/helpers/pgcat_process.rb b/tests/ruby/helpers/pgcat_process.rb index 2108eafc..6120c99f 100644 --- a/tests/ruby/helpers/pgcat_process.rb +++ b/tests/ruby/helpers/pgcat_process.rb @@ -29,7 +29,7 @@ def initialize(log_level) else '../../target/debug/pgcat' end - + @command = "#{command_path} #{@config_filename}" FileUtils.cp("../../pgcat.toml", @config_filename) @@ -48,12 +48,14 @@ def update_config(config_hash) @original_config = current_config output_to_write = TOML::Generator.new(config_hash).body output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,') + output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*\]/, ',\1]') File.write(@config_filename, output_to_write) end def current_config - old_cfg = File.read(@config_filename) - loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",') + loadable_string = File.read(@config_filename) + loadable_string = loadable_string.gsub(/,\s*(\d+)\s*,/, ', "\1",') + loadable_string = loadable_string.gsub(/,\s*(\d+)\s*\]/, ', "\1"]') TOML.load(loadable_string) end diff --git a/tests/ruby/load_balancing_spec.rb b/tests/ruby/load_balancing_spec.rb index e7b89ee8..cd647406 100644 --- a/tests/ruby/load_balancing_spec.rb +++ b/tests/ruby/load_balancing_spec.rb @@ -46,7 +46,6 @@ end end - expect(failed_count).to be <= 2 processes.all_databases.each do |instance| queries_routed = instance.count_select_1_plus_2 if processes.replicas[0..1].include?(instance) diff --git a/tests/ruby/mirrors_spec.rb b/tests/ruby/mirrors_spec.rb new file mode 100644 index 00000000..801df28c --- /dev/null +++ b/tests/ruby/mirrors_spec.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true +require 'uri' +require_relative 'spec_helper' + +describe "Query Mirroing" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + let(:mirror_pg) { PgInstance.new(8432, "sharding_user", "sharding_user", "shard2")} + let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") } + let(:mirror_host) { "localhost" } + + before do + new_configs = processes.pgcat.current_config + new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + ] + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + end + + after do + processes.all_databases.map(&:reset) + mirror_pg.reset + processes.pgcat.shutdown + end + + it "can mirror a query" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + runs = 15 + runs.times { conn.async_exec("SELECT 1 + 2") } + sleep 0.5 + expect(processes.all_databases.first.count_select_1_plus_2).to eq(runs) + expect(mirror_pg.count_select_1_plus_2).to eq(runs * 3) + end + + context "when main server connection is closed" do + it "closes the mirror connection" do + baseline_count = processes.all_databases.first.count_connections + 5.times do |i| + # Force pool cycling to detect zombie mirror connections + new_configs = processes.pgcat.current_config + new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i + new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + ] + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + end + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT 1 + 2") + sleep 0.5 + # Expect same number of connection even after pool cycling + expect(processes.all_databases.first.count_connections).to be < baseline_count + 2 + end + end + + xcontext "when mirror server goes down temporarily" do + it "continues to transmit queries after recovery" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + mirror_pg.take_down do + conn.async_exec("SELECT 1 + 2") + sleep 0.1 + end + 10.times { conn.async_exec("SELECT 1 + 2") } + sleep 1 + expect(mirror_pg.count_select_1_plus_2).to be >= 2 + end + end + + context "when a mirror is down" do + let(:mirror_host) { "badhost" } + + it "does not fail to send the main query" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + # No Errors here + conn.async_exec("SELECT 1 + 2") + expect(processes.all_databases.first.count_select_1_plus_2).to eq(1) + end + + it "does not fail to send the main query (even after thousands of mirror attempts)" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + # No Errors here + 1000.times { conn.async_exec("SELECT 1 + 2") } + expect(processes.all_databases.first.count_select_1_plus_2).to eq(1000) + end + end +end