-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PgCat Query Mirroring #341
Changes from 36 commits
393b48c
396ffc7
a6b9df6
993d8ed
4c9025c
226c051
23e75ed
cd8eb9b
8d4af57
7392b3d
53b8422
a78c0c6
d11fd8f
786ba14
31254eb
7d123aa
4947e69
cbe934b
0b3dd37
0604f9a
d22343a
672edc3
035fb8d
d291a1a
8b25b67
92ff1bd
b24c187
fa509c2
781843e
d254c0f
cfc347b
9cde6e0
2ddd1c7
795f13d
476bd43
87bf33c
5e2e205
bd437f3
2dfb6ff
69e0a0a
e7d4114
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
/target | ||
*.deb | ||
.vscode | ||
.profraw | ||
*.profraw | ||
cov/ | ||
lcov.info | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,3 @@ | ||
use crate::config::Role; | ||
use crate::pool::BanReason; | ||
/// Admin database. | ||
use bytes::{Buf, BufMut, BytesMut}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/// A mirrored PostgreSQL client. | ||
/// Packets arrive to us through a channel from the main client and we send them to the server. | ||
use bb8::Pool; | ||
use bytes::{Bytes, BytesMut}; | ||
|
||
use crate::config::{Address, Role, User}; | ||
use crate::pool::{ClientServerMap, ServerPool}; | ||
use crate::stats::get_reporter; | ||
use log::{error, info, trace, warn}; | ||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||
|
||
pub struct MirroredClient { | ||
address: Address, | ||
user: User, | ||
database: String, | ||
bytes_rx: Receiver<Bytes>, | ||
disconnect_rx: Receiver<()>, | ||
} | ||
|
||
impl MirroredClient { | ||
async fn create_pool(&self) -> Pool<ServerPool> { | ||
let manager = ServerPool::new( | ||
self.address.clone(), | ||
self.user.clone(), | ||
self.database.as_str(), | ||
ClientServerMap::default(), | ||
get_reporter(), | ||
); | ||
|
||
Pool::builder() | ||
.max_size(1) | ||
.connection_timeout(std::time::Duration::from_millis(10_000)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might want to use the config values here, so we don't get long timeouts unexpectedly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
.idle_timeout(Some(std::time::Duration::from_millis(10_000))) | ||
.test_on_check_out(false) | ||
.build(manager) | ||
.await | ||
.unwrap() | ||
} | ||
|
||
pub fn start(mut self) { | ||
tokio::spawn(async move { | ||
let pool = self.create_pool().await; | ||
let address = self.address.clone(); | ||
loop { | ||
let mut server = match pool.get().await { | ||
Ok(server) => server, | ||
Err(err) => { | ||
error!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the non-mirrored version, we ban the server but in the mirrored mode, it doesn't make sense to ban (we don't have banlists and banning in a mirrored setup is not very useful) |
||
"Failed to get connection from pool, Discarding message {:?}, {:?}", | ||
err, | ||
address.clone() | ||
); | ||
continue; | ||
} | ||
}; | ||
|
||
tokio::select! { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: It may be possible to refactor this outer loop into it's own function returning an Option so we can use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I simplified the logic a ton, I think your concern should be addressed now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks great. 🙇 |
||
// Exit channel events | ||
_ = self.disconnect_rx.recv() => { | ||
info!("Got mirror exit signal, exiting {:?}", address.clone()); | ||
break; | ||
} | ||
|
||
// Incoming data from server (we read to clear the socket buffer and discard the data) | ||
recv_result = server.recv() => { | ||
match recv_result { | ||
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()), | ||
Err(err) => error!("Failed to receive from mirror {:?} {:?}", err, address.clone()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left that up to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think we should mark_bad to be safe or just leave it up to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. An extra |
||
} | ||
} | ||
|
||
// Messages to send to the server | ||
message = self.bytes_rx.recv() => { | ||
match message { | ||
Some(bytes) => { | ||
match server.send(&BytesMut::from(&bytes[..])).await { | ||
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()), | ||
Err(err) => error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()) | ||
} | ||
} | ||
None => { | ||
info!("Mirror channel closed, exiting {:?}", address.clone()); | ||
break; | ||
}, | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
pub struct MirroringManager { | ||
pub byte_senders: Vec<Sender<Bytes>>, | ||
pub disconnect_senders: Vec<Sender<()>>, | ||
} | ||
impl MirroringManager { | ||
pub fn from_addresses( | ||
user: User, | ||
database: String, | ||
addresses: Vec<Address>, | ||
) -> MirroringManager { | ||
let mut byte_senders: Vec<Sender<Bytes>> = vec![]; | ||
let mut exit_senders: Vec<Sender<()>> = vec![]; | ||
|
||
addresses.iter().for_each(|mirror| { | ||
let (bytes_tx, bytes_rx) = channel::<Bytes>(500); | ||
let (exit_tx, exit_rx) = channel::<()>(1); | ||
let mut addr = mirror.clone(); | ||
addr.role = Role::Mirror; | ||
let client = MirroredClient { | ||
user: user.clone(), | ||
database: database.to_owned(), | ||
address: addr, | ||
bytes_rx, | ||
disconnect_rx: exit_rx, | ||
}; | ||
exit_senders.push(exit_tx.clone()); | ||
byte_senders.push(bytes_tx.clone()); | ||
client.start(); | ||
}); | ||
|
||
Self { | ||
byte_senders: byte_senders, | ||
disconnect_senders: exit_senders, | ||
} | ||
} | ||
|
||
pub fn send(self: &mut Self, bytes: &BytesMut) { | ||
let cpy = bytes.clone().freeze(); | ||
self.byte_senders | ||
.iter_mut() | ||
.for_each(|sender| match sender.try_send(cpy.clone()) { | ||
Ok(_) => {} | ||
Err(err) => { | ||
warn!("Failed to send bytes to a mirror channel {}", err); | ||
} | ||
}); | ||
} | ||
|
||
pub fn disconnect(self: &mut Self) { | ||
self.disconnect_senders | ||
.iter_mut() | ||
.for_each(|sender| match sender.try_send(()) { | ||
Ok(_) => {} | ||
Err(err) => { | ||
warn!( | ||
"Failed to send disconnect signal to a mirror channel {}", | ||
err | ||
); | ||
} | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -193,7 +193,7 @@ impl ConnectionPool { | |
let config = get_config(); | ||
|
||
let mut new_pools = HashMap::new(); | ||
let mut address_id = 0; | ||
let mut address_id: usize = 0; | ||
|
||
for (pool_name, pool_config) in &config.pools { | ||
let new_pool_hash_value = pool_config.hash_value(); | ||
|
@@ -244,7 +244,31 @@ impl ConnectionPool { | |
let mut servers = Vec::new(); | ||
let mut replica_number = 0; | ||
|
||
// Load Mirror settings | ||
for (address_index, server) in shard.servers.iter().enumerate() { | ||
let mut mirror_addresses: Vec<Address> = vec![]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need a type annotation typically, Rust should infer it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if let Some(mirror_settings_vec) = &shard.mirrors { | ||
for mirror_settings in mirror_settings_vec { | ||
if mirror_settings.mirroring_target_index != address_index { | ||
continue; | ||
} | ||
mirror_addresses.push(Address { | ||
id: address_id, | ||
database: shard.database.clone(), | ||
host: mirror_settings.host.clone(), | ||
port: mirror_settings.port, | ||
role: server.role, | ||
address_index: 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We send stats from the mirrors , so we should make sure these unique identifiers are unique. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the stats use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the They are both irrelevant to mirrors There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I followed the same pattern we do for the server addresses. For the server addresses, we set the https://github.com/levkk/pgcat/blob/main/src/pool.rs#L247-L254 |
||
replica_number, | ||
shard: shard_idx.parse::<usize>().unwrap(), | ||
username: user.username.clone(), | ||
pool_name: pool_name.clone(), | ||
mirrors: vec![], | ||
}); | ||
address_id += 1; | ||
} | ||
} | ||
|
||
let address = Address { | ||
id: address_id, | ||
database: shard.database.clone(), | ||
|
@@ -256,6 +280,7 @@ impl ConnectionPool { | |
shard: shard_idx.parse::<usize>().unwrap(), | ||
username: user.username.clone(), | ||
pool_name: pool_name.clone(), | ||
mirrors: mirror_addresses, | ||
}; | ||
|
||
address_id += 1; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to update the
dev/docker-compose.yml
as well?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done