Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug logging #39

Merged
merged 3 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/// We are pretending to the server in this scenario,
/// and this module implements that.
use bytes::{Buf, BufMut, BytesMut};
use log::error;
use log::{debug, error};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
Expand Down Expand Up @@ -70,6 +70,8 @@ impl Client {
let transaction_mode = config.general.pool_mode.starts_with("t");
drop(config);
loop {
debug!("Waiting for StartupMessage");

// Could be StartupMessage or SSLRequest
// which makes this variable length.
let len = match stream.read_i32().await {
Expand All @@ -91,6 +93,8 @@ impl Client {
match code {
// Client wants SSL. We don't support it at the moment.
SSL_REQUEST_CODE => {
debug!("Rejecting SSLRequest");

let mut no = BytesMut::with_capacity(1);
no.put_u8(b'N');

Expand All @@ -99,6 +103,8 @@ impl Client {

// Regular startup message.
PROTOCOL_VERSION_NUMBER => {
debug!("Got StartupMessage");

// TODO: perform actual auth.
let parameters = parse_startup(bytes.clone())?;

Expand All @@ -110,6 +116,7 @@ impl Client {
write_all(&mut stream, server_info).await?;
backend_key_data(&mut stream, process_id, secret_key).await?;
ready_for_query(&mut stream).await?;
debug!("Startup OK");

// Split the read and write streams
// so we can control buffering.
Expand Down Expand Up @@ -161,6 +168,8 @@ impl Client {
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
debug!("Sending CancelRequest");

let (process_id, secret_key, address, port) = {
let guard = self.client_server_map.lock().unwrap();

Expand Down Expand Up @@ -193,6 +202,8 @@ impl Client {
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocols.
loop {
debug!("Client idle, waiting for message");

// Client idle, waiting for messages.
self.stats.client_idle(self.process_id);

Expand All @@ -203,6 +214,12 @@ impl Client {
// SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?;

// Avoid taking a server if the client just wants to disconnect.
if message[0] as char == 'X' {
debug!("Client disconnecting");
return Ok(());
}

// Handle all custom protocol commands here.
match query_router.try_execute_command(message.clone()) {
// Normal query
Expand Down Expand Up @@ -250,9 +267,14 @@ impl Client {
// Waiting for server connection.
self.stats.client_waiting(self.process_id);

debug!("Waiting for connection from pool");

// Grab a server from the pool: the client issued a regular query.
let connection = match pool.get(query_router.shard(), query_router.role()).await {
Ok(conn) => conn,
Ok(conn) => {
debug!("Got connection from pool");
conn
}
Err(err) => {
error!("Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool")
Expand All @@ -272,11 +294,19 @@ impl Client {
self.stats.client_active(self.process_id);
self.stats.server_active(server.process_id());

debug!(
"Client {:?} talking to server {:?}",
self.write.peer_addr().unwrap(),
server.address()
);

// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode.
loop {
let mut message = if message.len() == 0 {
debug!("Waiting for message inside transaction or in session mode");

match read_message(&mut self.read).await {
Ok(message) => message,
Err(err) => {
Expand All @@ -303,9 +333,13 @@ impl Client {
let code = message.get_u8() as char;
let _len = message.get_i32() as usize;

debug!("Message: {}", code);

match code {
// ReadyForQuery
'Q' => {
debug!("Sending query to server");

// TODO: implement retries here for read-only transactions.
server.send(original).await?;

Expand Down Expand Up @@ -387,6 +421,8 @@ impl Client {
// Sync
// Frontend (client) is asking for the query result now.
'S' => {
debug!("Sending query to server");

self.buffer.put(&original[..]);

// TODO: retries for read-only transactions.
Expand Down Expand Up @@ -471,6 +507,7 @@ impl Client {
}

// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
self.release();
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ impl QueryRouter {
let len = buf.get_i32() as usize;

let query = match code {
'Q' => String::from_utf8_lossy(&buf[..len - 5]).to_string(),
'Q' => {
let query = String::from_utf8_lossy(&buf[..len - 5]).to_string();
debug!("Query: '{}'", query);
query
}

'P' => {
let mut start = 0;
let mut end;
Expand All @@ -213,6 +218,8 @@ impl QueryRouter {

let query = String::from_utf8_lossy(&buf[start..end]).to_string();

debug!("Prepared statement: '{}'", query);

query.replace("$", "") // Remove placeholders turning them into "values"
}
_ => return false,
Expand All @@ -221,7 +228,7 @@ impl QueryRouter {
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast,
Err(err) => {
debug!("{:?}, query: {}", err, query);
debug!("{}", err.to_string());
return false;
}
};
Expand Down
14 changes: 13 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::{Buf, BufMut, BytesMut};
///! Implementation of the PostgreSQL server (database) protocol.
///! Here we are pretending to the a Postgres client.
use log::{error, info};
use log::{debug, error, info};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
Expand Down Expand Up @@ -75,6 +75,8 @@ impl Server {
}
};

debug!("Sending StartupMessage");

// Send the startup packet telling the server we're a normal Postgres client.
startup(&mut stream, &user.name, database).await?;

Expand All @@ -95,6 +97,8 @@ impl Server {
Err(_) => return Err(Error::SocketError),
};

debug!("Message: {}", code);

match code {
// Authentication
'R' => {
Expand All @@ -104,6 +108,8 @@ impl Server {
Err(_) => return Err(Error::SocketError),
};

debug!("Auth: {}", auth_code);

match auth_code {
MD5_ENCRYPTED_PASSWORD => {
// The salt is 4 bytes.
Expand Down Expand Up @@ -135,6 +141,8 @@ impl Server {
Err(_) => return Err(Error::SocketError),
};

debug!("Error: {}", error_code);

match error_code {
// No error message is present in the message.
MESSAGE_TERMINATOR => (),
Expand Down Expand Up @@ -247,6 +255,8 @@ impl Server {
}
};

debug!("Sending CancelRequest");

let mut bytes = BytesMut::with_capacity(16);
bytes.put_i32(16);
bytes.put_i32(CANCEL_REQUEST_CODE);
Expand Down Expand Up @@ -290,6 +300,8 @@ impl Server {
let code = message.get_u8() as char;
let _len = message.get_i32();

debug!("Message: {}", code);

match code {
// ReadyForQuery
'Z' => {
Expand Down