PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
Beta: looking for beta testers, see #35.
Feature | Status | Comments |
---|---|---|
Transaction pooling | âś… | Identical to PgBouncer. |
Session pooling | âś… | Identical to PgBouncer. |
COPY support |
âś… | Both COPY TO and COPY FROM are supported. |
Query cancellation | âś… | Supported both in transaction and session pooling modes. |
Load balancing of read queries | âś… | Using random between replicas. Primary is included when primary_reads_enabled is enabled (default). |
Sharding | âś… | Transactions are sharded using SET SHARD TO and SET SHARDING KEY TO syntax extensions; see examples below. |
Failover | âś… | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
Statistics | âś… | Statistics available in the admin database (pgcat and pgbouncer ) with SHOW STATS , SHOW POOLS and others. |
Live configuration reloading | âś… | Reload supported settings with a SIGHUP to the process, e.g. kill -s SIGHUP $(pgrep pgcat) or RELOAD query issued to the admin database. |
Client authentication | ✅ 🔧 | MD5 password authentication is supported, SCRAM is on the roadmap; one user is used to connect to Postgres with both SCRAM and MD5 supported. |
Admin database | âś… | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. |
See Dockerfile
for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. That setting can be adjusted to spawn as many (or as little) workers as needed.
For quick local example, use the Docker Compose environment provided:
docker-compose up
# In a new terminal:
PGPASSWORD=postgres psql -h 127.0.0.1 -p 6432 -U postgres -c 'SELECT 1'
Name | Description | Examples |
---|---|---|
general |
||
host |
The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | 0.0.0.0 |
port |
The pooler will run on this port. | 6432 |
enable_prometheus_exporter |
Enable prometheus exporter which will export metrics in prometheus exposition format. | true |
prometheus_exporter_port |
Port at which prometheus exporter listens on. | 9930 |
pool_size |
Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | 15 |
pool_mode |
The pool mode to use, i.e. session or transaction . |
transaction |
connect_timeout |
Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | 5000 |
healthcheck_timeout |
Maximum time to pass a health check (SELECT 1 , milliseconds). If reached, the server is banned and the next target is attempted. |
1000 |
shutdown_timeout |
Maximum time to give clients during shutdown before forcibly killing client connections (ms). | 60000 |
healthcheck_delay |
How long to keep connection available for immediate re-use, without running a healthcheck query on it | 30000 |
ban_time |
Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | 60 |
autoreload |
Enable auto-reload of config after fixed time-interval. | false |
user |
||
name |
The user name. | sharding_user |
password |
The user password in plaintext. | hunter2 |
statement_timeout |
Timeout in milliseconds for how long a query takes to execute | 0 (disabled) |
shards |
Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | [shards.0] |
servers |
List of servers to connect to and their roles. A server is: [host, port, role] , where role is either primary or replica . |
["127.0.0.1", 5432, "primary"] |
database |
The name of the database to connect to. This is the same on all servers that are part of one shard. | |
query_router |
||
default_role |
Traffic is routed to this role by default (random), unless the client specifies otherwise. Default is any , for any role available. |
any , primary , replica |
query_parser_enabled |
Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | false |
primary_reads_enabled |
Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | true |
- Install Rust (latest stable will work great).
cargo build --release
(to get better benchmarks).- Change the config in
pgcat.toml
to fit your setup (optional given next step). - Install Postgres and run
psql -f tests/sharding/query_routing_setup.sql
(user/password may be required depending on your setup) RUST_LOG=info cargo run --release
You're ready to go!
Quickest way to test your changes is to use pgbench:
pgbench -i -h 127.0.0.1 -p 6432 && \
pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol simple && \
pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
See sharding README for sharding logic testing.
Run cargo test
to run Rust tests.
Run the following commands to run Integration tests locally.
cd tests/docker/
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
Feature | Tested in CI | Tested manually | Comments |
---|---|---|---|
Transaction pooling | âś… | âś… | Used by default for all tests. |
Session pooling | âś… | âś… | Tested by running pgbench with --protocol prepared which only works in session mode. |
COPY |
âś… | âś… | pgbench -i uses COPY . COPY FROM is tested as well. |
Query cancellation | âś… | âś… | psql -c 'SELECT pg_sleep(1000);' and press Ctrl-C . |
Load balancing | âś… | âś… | We could test this by emitting statistics for each replica and compare them. |
Failover | âś… | âś… | Misconfigure a replica in pgcat.toml and watch it forward queries to spares. CI testing is using Toxiproxy. |
Sharding | âś… | âś… | See tests/sharding and tests/ruby for an Rails/ActiveRecord example. |
Statistics | âś… | âś… | Query the admin database with psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' . |
Live config reloading | âś… | âś… | Run kill -s SIGHUP $(pgrep pgcat) and watch the config reload. |
In session mode, a client talks to one server for the duration of the connection. Prepared statements, SET
, and advisory locks are supported. In terms of supported features, there is very little if any difference between session mode and talking directly to the server.
To use session mode, change pool_mode = "session"
.
In transaction mode, a client talks to one server for the duration of a single transaction; once it's over, the server is returned to the pool. Prepared statements, SET
, and advisory locks are not supported; alternatives are to use SET LOCAL
and pg_advisory_xact_lock
which are scoped to the transaction.
This mode is enabled by default.
All queries are load balanced against the configured servers using the random algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser will interpret the query and route all SELECT
queries to a replica, while all other queries including explicit transactions will be routed to the primary.
The query parser is disabled by default.
The query parser will do its best to determine where the query should go, but sometimes that's not possible. In that case, the client can select which server it wants using this custom SQL syntax:
-- To talk to the primary for the duration of the next transaction:
SET SERVER ROLE TO 'primary';
-- To talk to the replica for the duration of the next transaction:
SET SERVER ROLE TO 'replica';
-- Let the query parser decide
SET SERVER ROLE TO 'auto';
-- Pick any server at random
SET SERVER ROLE TO 'any';
-- Reset to default configured settings
SET SERVER ROLE TO 'default';
The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to the first available server; default_role
setting controls this behavior.
All servers are checked with a SELECT 1
query before being given to a client. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
The ban time can be changed with ban_time
. The default is 60 seconds.
Failover behavior can get pretty interesting (read complex) when multiple configurations and factors are involved. The table below will try to explain what PgCat does in each scenario:
Query | SET SERVER ROLE TO |
query_parser_enabled |
primary_reads_enabled |
Target state | Outcome |
---|---|---|---|---|---|
Read query, i.e. SELECT |
unset (any) | false | false | up | Query is routed to the first instance in the random loop. |
Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the random loop. |
Read query | unset (any) | true | true | up | Query is routed to the first instance in the random loop. |
Read query | replica | false | false | up | Query is routed to the first replica instance in the random loop. |
Read query | primary | false | false | up | Query is routed to the primary. |
Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the random loop is attempted. |
Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the random loop. |
Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
Write query e.g. INSERT |
unset (any) | false | false | up | The query is attempted against the first available instance in the random loop. If the instance is a replica, the query fails and the client receives an error. |
Write query | unset (any) | true | false | up | The query is routed to the primary. |
Write query | unset (any) | true | true | up | The query is routed to the primary. |
Write query | primary | false | false | up | The query is routed to the primary. |
Write query | replica | false | false | up | The query is routed to the replica and fails. The client receives an error. |
Write query | unset (any) | true | false | down | The query is routed to the primary and fails. The client receives an error. |
Write query | unset (any) | true | true | down | The query is routed to the primary and fails. The client receives an error. |
Write query | primary | false | false | down | The query is routed to the primary and fails. The client receives an error. |
We use the PARTITION BY HASH
hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
To route queries to a particular shard, we use this custom SQL syntax:
-- To talk to a shard explicitly
SET SHARD TO '1';
-- To let the pooler choose based on a value
SET SHARDING KEY TO '1234';
The active shard will last until it's changed again or the client disconnects. By default, the queries are routed to shard 0.
For hash function implementation, see src/sharding.rs
and tests/sharding/partition_hash_test_setup.sql
.
class User < ActiveRecord::Base
end
# Metadata will be fetched from shard 0
ActiveRecord::Base.establish_connection
# Grab a bunch of users from shard 1
User.connection.execute "SET SHARD TO '1'"
User.take(10)
# Using id as the sharding key
User.connection.execute "SET SHARDING KEY TO '1234'"
User.find_by_id(1234)
# Using geographical sharding
User.connection.execute "SET SERVER ROLE TO 'primary'"
User.connection.execute "SET SHARDING KEY TO '85'"
User.create(name: "test user", email: "[email protected]", zone_id: 85)
# Let the query parser figure out where the query should go.
# We are still on shard = hash(85) % shards.
User.connection.execute "SET SERVER ROLE TO 'auto'"
User.find_by_email("[email protected]")
-- Grab a bunch of users from shard 1
SET SHARD TO '1';
SELECT * FROM users LIMT 10;
-- Find by id
SET SHARDING KEY TO '1234';
SELECT * FROM USERS WHERE id = 1234;
-- Writing in a primary/replicas configuration.
SET SHARDING ROLE TO 'primary';
SET SHARDING KEY TO '85';
INSERT INTO users (name, email, zome_id) VALUES ('test user', '[email protected]', 85);
SET SERVER ROLE TO 'auto'; -- let the query router figure out where the query should go
SELECT * FROM users WHERE email = '[email protected]'; -- shard setting lasts until set again; we are reading from the primary
The stats are very similar to what Pgbouncer reports and the names are kept to be comparable. They are accessible by querying the admin database pgcat
, and pgbouncer
for compatibility.
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
The config can be reloaded by sending a kill -s SIGHUP
to the process or by querying RELOAD
to the admin database. Not all settings are currently supported by live reload:
Config | Requires restart |
---|---|
host |
yes |
port |
yes |
pool_mode |
no |
connect_timeout |
yes |
healthcheck_timeout |
no |
shutdown_timeout |
no |
healthcheck_delay |
no |
ban_time |
no |
user |
yes |
shards |
yes |
default_role |
no |
primary_reads_enabled |
no |
query_parser_enabled |
no |
You can setup PgBench locally through PgCat:
pgbench -h 127.0.0.1 -p 6432 -i
Coincidenly, this uses COPY
so you can test if that works. Additionally, we'll be running the following PgBench configurations:
- 16 clients, 2 threads
- 32 clients, 2 threads
- 64 clients, 2 threads
- 128 clients, 2 threads
All queries will be SELECT
only (-S
) just so disks don't get in the way, since the dataset will be effectively all in RAM.
My setup:
- 8 cores, 16 hyperthreaded (AMD Ryzen 5800X)
- 32GB RAM (doesn't matter for this benchmark, except to prove that Postgres will fit the whole dataset into RAM)
[databases]
shard0 = host=localhost port=5432 user=sharding_user password=sharding_user
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
Everything else stays default.
$ pgbench -t 1000 -c 16 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 16
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 16000/16000
latency average = 0.155 ms
tps = 103417.377469 (including connections establishing)
tps = 103510.639935 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 32
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 32000/32000
latency average = 0.290 ms
tps = 110325.939785 (including connections establishing)
tps = 110386.513435 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 64
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 64000/64000
latency average = 0.692 ms
tps = 92470.427412 (including connections establishing)
tps = 92618.389350 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 128
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 128000/128000
latency average = 1.406 ms
tps = 91013.429985 (including connections establishing)
tps = 91067.583928 (excluding connections establishing)
The only thing that matters here is the number of workers in the Tokio pool. Make sure to set it to < than the number of your CPU cores. Also account for hyper-threading, so if you have that, take the number you got above and divide it by two, that way only "real" cores serving requests.
My setup is 16 threads, 8 cores (htop
shows as 16 CPUs), so I set the max_workers
in Tokio to 4. Too many, and it starts conflicting with PgBench
which is also running on the same system.
$ pgbench -t 1000 -c 16 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 16
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 16000/16000
latency average = 0.164 ms
tps = 97705.088232 (including connections establishing)
tps = 97872.216045 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 32
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 32000/32000
latency average = 0.288 ms
tps = 111300.488119 (including connections establishing)
tps = 111413.107800 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 64
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 64000/64000
latency average = 0.556 ms
tps = 115190.496139 (including connections establishing)
tps = 115247.521295 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 128
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 128000/128000
latency average = 1.135 ms
tps = 112770.562239 (including connections establishing)
tps = 112796.502381 (excluding connections establishing)
Always good to have a base line.
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 16
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 16000/16000
latency average = 0.115 ms
tps = 139443.955722 (including connections establishing)
tps = 142314.859075 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 32
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 32000/32000
latency average = 0.212 ms
tps = 150644.840891 (including connections establishing)
tps = 152218.499430 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 64
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 64000/64000
latency average = 0.420 ms
tps = 152517.663404 (including connections establishing)
tps = 153319.188482 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 128
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 128000/128000
latency average = 0.854 ms
tps = 149818.594087 (including connections establishing)
tps = 150200.603049 (excluding connections establishing)