diff --git a/src/client.rs b/src/client.rs index 06f5a9f6..09612530 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ /// We are pretending to the server in this scenario, /// and this module implements that. use bytes::{Buf, BufMut, BytesMut}; -use log::error; +use log::{debug, error}; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -70,6 +70,8 @@ impl Client { let transaction_mode = config.general.pool_mode.starts_with("t"); drop(config); loop { + debug!("Waiting for StartupMessage"); + // Could be StartupMessage or SSLRequest // which makes this variable length. let len = match stream.read_i32().await { @@ -91,6 +93,8 @@ impl Client { match code { // Client wants SSL. We don't support it at the moment. SSL_REQUEST_CODE => { + debug!("Rejecting SSLRequest"); + let mut no = BytesMut::with_capacity(1); no.put_u8(b'N'); @@ -99,6 +103,8 @@ impl Client { // Regular startup message. PROTOCOL_VERSION_NUMBER => { + debug!("Got StartupMessage"); + // TODO: perform actual auth. let parameters = parse_startup(bytes.clone())?; @@ -110,6 +116,7 @@ impl Client { write_all(&mut stream, server_info).await?; backend_key_data(&mut stream, process_id, secret_key).await?; ready_for_query(&mut stream).await?; + debug!("Startup OK"); // Split the read and write streams // so we can control buffering. @@ -161,6 +168,8 @@ impl Client { pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> { // The client wants to cancel a query it has issued previously. if self.cancel_mode { + debug!("Sending CancelRequest"); + let (process_id, secret_key, address, port) = { let guard = self.client_server_map.lock().unwrap(); @@ -193,6 +202,8 @@ impl Client { // We expect the client to either start a transaction with regular queries // or issue commands for our sharding and server selection protocols. loop { + debug!("Client idle, waiting for message"); + // Client idle, waiting for messages. self.stats.client_idle(self.process_id); @@ -203,6 +214,12 @@ impl Client { // SET SHARDING KEY TO 'bigint'; let mut message = read_message(&mut self.read).await?; + // Avoid taking a server if the client just wants to disconnect. + if message[0] as char == 'X' { + debug!("Client disconnecting"); + return Ok(()); + } + // Handle all custom protocol commands here. match query_router.try_execute_command(message.clone()) { // Normal query @@ -250,9 +267,14 @@ impl Client { // Waiting for server connection. self.stats.client_waiting(self.process_id); + debug!("Waiting for connection from pool"); + // Grab a server from the pool: the client issued a regular query. let connection = match pool.get(query_router.shard(), query_router.role()).await { - Ok(conn) => conn, + Ok(conn) => { + debug!("Got connection from pool"); + conn + } Err(err) => { error!("Could not get connection from pool: {:?}", err); error_response(&mut self.write, "could not get connection from the pool") @@ -272,11 +294,19 @@ impl Client { self.stats.client_active(self.process_id); self.stats.server_active(server.process_id()); + debug!( + "Client {:?} talking to server {:?}", + self.write.peer_addr().unwrap(), + server.address() + ); + // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, // or until the client disconnects if we are in session mode. loop { let mut message = if message.len() == 0 { + debug!("Waiting for message inside transaction or in session mode"); + match read_message(&mut self.read).await { Ok(message) => message, Err(err) => { @@ -303,9 +333,13 @@ impl Client { let code = message.get_u8() as char; let _len = message.get_i32() as usize; + debug!("Message: {}", code); + match code { // ReadyForQuery 'Q' => { + debug!("Sending query to server"); + // TODO: implement retries here for read-only transactions. server.send(original).await?; @@ -387,6 +421,8 @@ impl Client { // Sync // Frontend (client) is asking for the query result now. 'S' => { + debug!("Sending query to server"); + self.buffer.put(&original[..]); // TODO: retries for read-only transactions. @@ -471,6 +507,7 @@ impl Client { } // The server is no longer bound to us, we can't cancel it's queries anymore. + debug!("Releasing server back into the pool"); self.release(); } } diff --git a/src/query_router.rs b/src/query_router.rs index 4134af8d..8dddef07 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -194,7 +194,12 @@ impl QueryRouter { let len = buf.get_i32() as usize; let query = match code { - 'Q' => String::from_utf8_lossy(&buf[..len - 5]).to_string(), + 'Q' => { + let query = String::from_utf8_lossy(&buf[..len - 5]).to_string(); + debug!("Query: '{}'", query); + query + } + 'P' => { let mut start = 0; let mut end; @@ -213,6 +218,8 @@ impl QueryRouter { let query = String::from_utf8_lossy(&buf[start..end]).to_string(); + debug!("Prepared statement: '{}'", query); + query.replace("$", "") // Remove placeholders turning them into "values" } _ => return false, @@ -221,7 +228,7 @@ impl QueryRouter { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { Ok(ast) => ast, Err(err) => { - debug!("{:?}, query: {}", err, query); + debug!("{}", err.to_string()); return false; } }; diff --git a/src/server.rs b/src/server.rs index 18b1f267..f186c0c7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use bytes::{Buf, BufMut, BytesMut}; ///! Implementation of the PostgreSQL server (database) protocol. ///! Here we are pretending to the a Postgres client. -use log::{error, info}; +use log::{debug, error, info}; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -75,6 +75,8 @@ impl Server { } }; + debug!("Sending StartupMessage"); + // Send the startup packet telling the server we're a normal Postgres client. startup(&mut stream, &user.name, database).await?; @@ -95,6 +97,8 @@ impl Server { Err(_) => return Err(Error::SocketError), }; + debug!("Message: {}", code); + match code { // Authentication 'R' => { @@ -104,6 +108,8 @@ impl Server { Err(_) => return Err(Error::SocketError), }; + debug!("Auth: {}", auth_code); + match auth_code { MD5_ENCRYPTED_PASSWORD => { // The salt is 4 bytes. @@ -135,6 +141,8 @@ impl Server { Err(_) => return Err(Error::SocketError), }; + debug!("Error: {}", error_code); + match error_code { // No error message is present in the message. MESSAGE_TERMINATOR => (), @@ -247,6 +255,8 @@ impl Server { } }; + debug!("Sending CancelRequest"); + let mut bytes = BytesMut::with_capacity(16); bytes.put_i32(16); bytes.put_i32(CANCEL_REQUEST_CODE); @@ -290,6 +300,8 @@ impl Server { let code = message.get_u8() as char; let _len = message.get_i32(); + debug!("Message: {}", code); + match code { // ReadyForQuery 'Z' => {