Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PgCat Query Mirroring #341

Merged
merged 41 commits into from
Mar 10, 2023
Merged

PgCat Query Mirroring #341

merged 41 commits into from
Mar 10, 2023

Conversation

drdrsh
Copy link
Collaborator

@drdrsh drdrsh commented Mar 4, 2023

This is an implementation of Query mirroring in PgCat (outlined here #302)

In configs, we match mirror hosts with the servers handling the traffic. A mirror host will receive the same protocol messages as the main server it was matched with.

This is done by creating an async task for each mirror server, it communicates with the main server through two channels, one for the protocol messages and one for the exit signal. The mirror server sends the protocol packets to the underlying PostgreSQL server. We receive from the underlying PostgreSQL server as soon as the data is available and we immediately discard it. We use bb8 to manage the life cycle of the connection, not for pooling since each mirror server handler is more or less single-threaded.

We don't have any connection pooling in the mirrors. Matching each mirror connection to an actual server connection guarantees that we will not have more connections to any of the mirrors than the parent pool would allow.

The config setup looks something like this

# Shard 0
[pools.sharded_db.shards.0]
# [ host, port, role ]
servers = [
    [ "127.0.0.1", 5432, "primary"],
    [ "127.0.0.1", 5432, "replica"],
    [ "127.0.0.1", 5432, "replica"],
]
# Database name (e.g. "postgres")
database = "shard0"

mirrors = [
    [ "localhost", 5432, 2], # Mirrors traffic to the second replica
    [ "localhost", 5432, 1], # Mirrors traffic to the first replica
    [ "localhost", 5432, 0]  # Mirrors traffic to the primary
]

Preview

Here I created 3 mirrors all pointing at the same server so sending one query from PgCat yields a total of 4 queries made a gainst the database from 4 different connections.
psql
Screen Shot 2023-03-05 at 9 37 17 AM

Postgres
Screen Shot 2023-03-05 at 9 37 04 AM

src/mirrors.rs Outdated
}

pub fn send(self: &mut Self, bytes: &BytesMut) {
let cpy = bytes.clone();
Copy link
Collaborator Author

@drdrsh drdrsh Mar 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening on the client<->server task so I would like to avoid cloning all the bytes as much as possible. Not sure how to do it given server.send requires BytesMut

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured it out. I will change send to use ByteMut.freeze which yields a frozen chunk of memory that is cheap to clone of the type Bytes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, changing server.send to take Bytes instead of BytesMut will require doing a ton of BytesMut.clone() in the server critical path which is not something I want to do.

Instead, I have the communication channel use Bytes and I make just one clone per server to create a frozen Bytes object, when I do fan-out I send that same frozen object across the channel and then create a new BytesMut in the mirror task

@drdrsh drdrsh changed the title First cut Query Mirroring PgCat Query Mirroring Mar 5, 2023
@drdrsh drdrsh requested a review from levkk March 5, 2023 15:33
src/mirrors.rs Outdated
Comment on lines 51 to 54
let mut delay = Duration::from_secs(0);
let min_backoff = Duration::from_millis(100);
let max_backoff = Duration::from_secs(5);
let mut retries = 0;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid too aggressive retries.

@drdrsh drdrsh marked this pull request as ready for review March 5, 2023 17:11
}

impl Drop for Server {
/// Try to do a clean shut down. Best effort because
/// the socket is in non-blocking mode, so it may not be ready
/// for a write.
fn drop(&mut self) {
self.mirror_disconnect();
Copy link
Collaborator Author

@drdrsh drdrsh Mar 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the bytes channel sender should be enough to kill the mirror task but just in case the bytes channel is blocked, we send a signal to the exit channel which has the one purpose of transmitting exit signals

src/pool.rs Outdated Show resolved Hide resolved
src/mirrors.rs Outdated Show resolved Hide resolved
src/mirrors.rs Outdated
);
stats.server_login(server_id);

match Server::startup(
Copy link
Contributor

@levkk levkk Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using bb8? I guess it makes sense because MirrorManager is owned by Server which itself is managed by bb8.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. But to be honest, I hate the connection management part of this PR to the point I am okay to create a bb8 pool with one connection to not have to manage the lifetime of the connection.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to go with the bb8 route even if the pool is tiny to avoid having to handle connection lifetime


pub fn mirror_send(&mut self, bytes: &BytesMut) {
match self.mirror_manager.as_mut() {
Some(manager) => manager.send(bytes),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirroring should be best effort imo, this will block if the mirror channel buffer is full because the mirror can't absorb any more traffic, I think?

Copy link
Collaborator Author

@drdrsh drdrsh Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, mirroring is best-effort. manager.send uses try_send under the hood. The reason I have a send method on manager is to handle the fanout because we could have one Server matched with more than one MirroredClient
https://github.com/levkk/pgcat/pull/341/files#diff-453e032b0a6294b617b502297ffd4ffcce47d4f81e06f16b602c0d6f89afebb9R215-R225

@@ -18,28 +18,28 @@ jobs:
RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Copt-level=0 -Clink-dead-code -Coverflow-checks=off -Zpanic_abort_tests -Cpanic=abort -Cinstrument-coverage"
RUSTDOCFLAGS: "-Cpanic=abort"
- image: postgres:14
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update the dev/docker-compose.yml as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

src/mirrors.rs Outdated
recv_result = server.recv() => {
match recv_result {
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
Err(err) => error!("Failed to receive from mirror {:?} {:?}", err, address.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you mark_bad here and make bb8 create you a new server?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left that up to server.recv. It calls mark_bad in a handful of places. Similarly for server.send. Double marking bad should be fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should mark_bad to be safe or just leave it up to the server logic to handle it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. An extra mark_bad does not hurt. It documents the behavior here

src/mirrors.rs Outdated

Pool::builder()
.max_size(1)
.connection_timeout(std::time::Duration::from_millis(10_000))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to use the config values here, so we don't get long timeouts unexpectedly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

let mut server = match pool.get().await {
Ok(server) => server,
Err(err) => {
error!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark_bad ?

Copy link
Collaborator Author

@drdrsh drdrsh Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark_bad works if we have a server connection. In this case we failed to checkout a connection from the pool so we have no server to mark_bad.

In the non-mirrored version, we ban the server but in the mirrored mode, it doesn't make sense to ban (we don't have banlists and banning in a mirrored setup is not very useful)

src/pool.rs Outdated
for (address_index, server) in shard.servers.iter().enumerate() {
let mut mirror_addresses: Vec<Address> = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need a type annotation typically, Rust should infer it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

src/pool.rs Outdated
host: mirror_settings.host.clone(),
port: mirror_settings.port,
role: server.role,
address_index: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We send stats from the mirrors , so we should make sure these unique identifiers are unique.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the stats use the address_id, not the address_index. The address_id is unique.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the address_index callsites
https://cs.github.com/levkk/pgcat?q=.address_index

They are both irrelevant to mirrors

Copy link
Collaborator Author

@drdrsh drdrsh Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same pattern we do for the server addresses. For the server addresses, we set the address_index to equal the index of the server in the configs array. We do the same for mirrors, we set the mirror address_index to be the index of the mirror in the mirror config array

https://github.com/levkk/pgcat/blob/main/src/pool.rs#L247-L254

Comment on lines 34 to 43
#
# Ruby integration tests
# These tests create their own PgCat servers so we want to run them after starting toxiproxy
# and before starting PgCat
#
cd tests/ruby
sudo bundle install
bundle exec rspec *_spec.rb --format documentation || exit 1
cd ../..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if the fact that we have an extra PgCat running in the background is making the specs flake out. So I moved the ruby tests up before we start any PgCats

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope :(

@drdrsh
Copy link
Collaborator Author

drdrsh commented Mar 10, 2023

I disabled another uber flaky test

Copy link
Contributor

@levkk levkk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

@drdrsh drdrsh merged commit aa89e35 into postgresml:main Mar 10, 2023
@mismaah
Copy link

mismaah commented May 21, 2024

Can this be a replacement for replication (like repmgr)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants