diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index d7249f15..943eff92 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -32,6 +32,9 @@ ban_time = 60 # Seconds # autoreload = true +tls_certificate = ".circleci/server.cert" +tls_private_key = ".circleci/server.key" + # # User to use for authentication against the server. [user] diff --git a/.circleci/server.cert b/.circleci/server.cert new file mode 100644 index 00000000..a24847a7 --- /dev/null +++ b/.circleci/server.cert @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUChIvUGFJGJe5EDch32rchqoxER0wDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMjA2MjcyMjI2MDZaFw0yMjA3 +MjcyMjI2MDZaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDdTwrBzV1v79faVckFvIn/9V4fypYs4vDi3X+h3wGn +AjEh6mmizlKCwSwAam07D9Q5zKiXFrzNJqzSioOv5zsOAvObwrnzbtKSwfs3aP5g +eEh2clHCZYx9p06WszPcgSB5nTz1NeY4XAwvGn3A+SVCLyPMTNwnem48+ONh2F9u +FHtSuIsEVvTjMlH09O7LjwJlODxy3HNv2JHYM5Hx9tzc+NVYdERPtaVcX8ycw1Eh +9hgGSgfaNM52/JfRMIDhENrsn0S1omRUtcJe72loreiwrECUOLAnAfp9Xqc+rMPP +aLA6ElzmYef1+ZEC0p6isCHPhxY5ESVhKYhE9nQvksjnAgMBAAGjUzBRMB0GA1Ud +DgQWBBQLDtzexqjx7xPtUZuZB/angU9oSDAfBgNVHSMEGDAWgBQLDtzexqjx7xPt +UZuZB/angU9oSDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQC/ +mxY/a/WeLENVj2Gg9EUH0CKzfqeTey1mb6YfPGxzrD7oq1m0Vn2MmTbjZrJgh/Ob +QckO3ElF4kC9+6XP+iDPmabGpjeLgllBboT5l2aqnD1syMrf61WPLzgRzRfplYGy +cjBQDDKPu8Lu0QRMWU28tHYN0bMxJoCuXysGGX5WsuFnKCA6f/V+nycJJXxJH3eB +eLjTueD9/RE3OXhi6m8A29Q1E9AE5EF4uRxYXrr91BmYnk4aFvSmBxhUEzE12eSN +lHB/uSc0+Dp+UVmVr6wW8AQfd16UBA0BUf3kSW3aSvirYPYH0rXiOOpEJgOwOMnR +f5+XAbN1Y+3OsFz/ZmP9 +-----END CERTIFICATE----- diff --git a/.circleci/server.key b/.circleci/server.key new file mode 100644 index 00000000..14e4fd68 --- /dev/null +++ b/.circleci/server.key @@ -0,0 +1,28 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDdTwrBzV1v79fa +VckFvIn/9V4fypYs4vDi3X+h3wGnAjEh6mmizlKCwSwAam07D9Q5zKiXFrzNJqzS +ioOv5zsOAvObwrnzbtKSwfs3aP5geEh2clHCZYx9p06WszPcgSB5nTz1NeY4XAwv +Gn3A+SVCLyPMTNwnem48+ONh2F9uFHtSuIsEVvTjMlH09O7LjwJlODxy3HNv2JHY +M5Hx9tzc+NVYdERPtaVcX8ycw1Eh9hgGSgfaNM52/JfRMIDhENrsn0S1omRUtcJe +72loreiwrECUOLAnAfp9Xqc+rMPPaLA6ElzmYef1+ZEC0p6isCHPhxY5ESVhKYhE +9nQvksjnAgMBAAECggEAbnvddO9frFhivJ+DIhgEFQKcIOb0nigV9kx6QYehvYy8 +lp/+aMb0Lk7d9r8rFQdL/icMK5GwZALg2KNKJvEbbF1Q3PwT9VHoUlgBYKJMDEFA +e9GKu7ASuVBjTZzdUUItwkkbe5eS/aQGeSWSjlpTnX0HNCFS72qRymK+scRhsAQf +ZoHyZHDslkvPR3Pos+sndWBYCDHag5/KoPhsMt1+5S9NQcOUHx9Ac0gLHjau3N+P +0FhODHFFGnnpyQvLvj6u3ZOR34ladMgoBglE0O3vPFhckn92EK4teeTWOsUMotiz +qM3QIJTOJjtiY6VDGY93bIa4pFvt7Zi4vIerenKt0QKBgQD/UMFqfevTAMrk10AC +bOa4+cM07ORY4ZwVj5ILhZn+8crDEEtBsUyuEU2FTINtnoEq1yGc/IXpsyS1BHjL +L1xSml5LN3jInbi8z5XQfY5Sj3VOMtwY6yD20jcdeDC44rz3nStXdkcMWxbTMapx +iOPsap5ciUKOMS7LyMidPEG/LQKBgQDd5vHgrLN0FBIIm+vZg6MEm4QyobstVp4l +7V/GZsdL+M8AQv1Rx+5wSUSWKomOIv5lglis7f6g0c9O7Qkr78/wzoyoKC2RRqPp +I90GjY2Iv22N4GIkRrDAgMZbkTitzIB6tbXEVeLAOh3frFJ8IwauRCOiXIjrZdJ4 +FvV86+nU4wKBgQDdWTP2kWkMrBk7QOp7r9Jv+AmnLuHhtOdPQgOJ/bA++X2ik9PL +Bl3GY7XjpSwks1CkxZKcucmXjPp7/X6EGXFfI/owF82dkDADca0e7lufdERtIWb0 +K5WOpz2lTPhgsiLGQfq7fw2lxqsJOnvcpqOD6gOVkmKjSDyb7F0RBJazmQKBgQDD +a8PQTcesjpBjLI3EfX1vbVY7ENu6zfFxDV+vZoxVh8UlQdm90AlYse3JIaUKnB7W +Xrihcucv0hZ0N6RAIW5LcFvHK7sVmdR4WbEpODhRGeTtcZJ8yBSZM898jKQRy2vK +pYRyaADNsWDlvujVkjMr/a40KrIaPQ3h3LZNUaYYaQKBgQD1x8A5S5SiE1cN1vFr +aACkmA2WqEDKKhUsUigJdwW6WB/B9kWlIlz/iV1H9uwBXtSIYG4VqCSTAvh0z4gX +Qu2SrdPm5PYnKzpdynpz78OnGdflD1RKWFGHItR6GN6tj/VmulO6mlFvT4jzBQ7j ++Hf8m2TcD4U3ksz3xw+YOD+cmA== +-----END RSA PRIVATE KEY----- diff --git a/Cargo.lock b/Cargo.lock index b8b4bfd9..cfaa0fbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,12 +79,24 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" + [[package]] name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + [[package]] name = "cfg-if" version = "1.0.0" @@ -236,6 +248,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "js-sys" +version = "0.3.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fac17f7123a73ca62df411b1bf727ccc805daa070338fda671c86dac1bdc27" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.126" @@ -385,6 +412,7 @@ dependencies = [ "parking_lot", "rand", "regex", + "rustls-pemfile", "serde", "serde_derive", "sha-1", @@ -392,6 +420,7 @@ dependencies = [ "sqlparser", "stringprep", "tokio", + "tokio-rustls", "toml", ] @@ -497,12 +526,58 @@ version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.20.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" +dependencies = [ + "base64", +] + [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "serde" version = "1.0.136" @@ -563,6 +638,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "sqlparser" version = "0.14.0" @@ -664,6 +745,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "toml" version = "0.5.8" @@ -700,6 +792,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "version_check" version = "0.9.4" @@ -712,6 +810,80 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c53b543413a17a202f4be280a7e5c62a1c69345f5de525ee64f8cfdbc954994" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5491a68ab4500fa6b4d726bd67408630c3dbe9c4fe7bda16d5c82a1fd8c7340a" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c441e177922bc58f1e12c022624b6216378e5febc2f0533e41ba443d505b80aa" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a89911bd99e5f3659ec4acf9c4d93b0a90fe4a2a11f15328472058edc5261be" + +[[package]] +name = "web-sys" +version = "0.3.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fed94beee57daf8dd7d51f2b15dc2bcde92d7a72304cdf662a4371008b71b90" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index fa63c0e8..3f65e90c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,5 @@ hmac = "0.12" sha2 = "0.10" base64 = "0.13" stringprep = "0.1" +tokio-rustls = "0.23" +rustls-pemfile = "1" diff --git a/pgcat.toml b/pgcat.toml index 70b2fae0..e9dbf075 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -32,6 +32,10 @@ ban_time = 60 # Seconds # Reload config automatically if it changes. autoreload = false +# TLS +# tls_certificate = "server.cert" +# tls_private_key = "server.key" + # # User to use for authentication against the server. [user] diff --git a/src/admin.rs b/src/admin.rs index b7a5b6fd..74acf151 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -2,7 +2,6 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{info, trace}; use std::collections::HashMap; -use tokio::net::tcp::OwnedWriteHalf; use crate::config::{get_config, reload_config}; use crate::errors::Error; @@ -12,12 +11,15 @@ use crate::stats::get_stats; use crate::ClientServerMap; /// Handle admin client. -pub async fn handle_admin( - stream: &mut OwnedWriteHalf, +pub async fn handle_admin( + stream: &mut T, mut query: BytesMut, pool: ConnectionPool, client_server_map: ClientServerMap, -) -> Result<(), Error> { +) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let code = query.get_u8() as char; if code != 'Q' { @@ -61,7 +63,10 @@ pub async fn handle_admin( } /// Column-oriented statistics. -async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_lists(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let stats = get_stats(); let columns = vec![("list", DataType::Text), ("items", DataType::Int4)]; @@ -128,7 +133,10 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul } /// Show PgCat version. -async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn show_version(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut res = BytesMut::new(); res.put(row_description(&vec![("version", DataType::Text)])); @@ -143,7 +151,10 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { } /// Show utilization of connection pools for each shard and replicas. -async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_pools(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let stats = get_stats(); let config = get_config(); @@ -197,7 +208,10 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul } /// Show shards and replicas. -async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_databases(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let config = get_config(); // Columns @@ -258,15 +272,18 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R /// Ignore any SET commands the client sends. /// This is common initialization done by ORMs. -async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn ignore_set(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ custom_protocol_response_ok(stream, "SET").await } /// Reload the configuration file without restarting the process. -async fn reload( - stream: &mut OwnedWriteHalf, - client_server_map: ClientServerMap, -) -> Result<(), Error> { +async fn reload(stream: &mut T, client_server_map: ClientServerMap) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ info!("Reloading config"); reload_config(client_server_map).await?; @@ -286,7 +303,10 @@ async fn reload( } /// Shows current configuration. -async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn show_config(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let config = &get_config(); let config: HashMap = config.into(); @@ -329,7 +349,10 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> { } /// Show shard and replicas statistics. -async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_stats(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let columns = vec![ ("database", DataType::Text), ("total_xact_count", DataType::Numeric), diff --git a/src/client.rs b/src/client.rs index ea1bf019..05895b6d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,9 @@ /// Handle clients by pretending to be a PostgreSQL server. use bytes::{Buf, BufMut, BytesMut}; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use std::collections::HashMap; -use tokio::io::{AsyncReadExt, BufReader}; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; +use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; +use tokio::net::TcpStream; use crate::admin::handle_admin; use crate::config::get_config; @@ -17,20 +14,33 @@ use crate::pool::{get_pool, ClientServerMap}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; use crate::stats::{get_reporter, Reporter}; +use crate::tls::Tls; + +use tokio_rustls::server::TlsStream; + +/// Type of connection received from client. +enum ClientConnectionType { + Startup, + Tls, + CancelQuery, +} /// The client state. One of these is created per client. -pub struct Client { +pub struct Client { /// The reads are buffered (8K by default). - read: BufReader, + read: BufReader, /// We buffer the writes ourselves because we know the protocol /// better than a stock buffer. - write: OwnedWriteHalf, + write: T, /// Internal buffer, where we place messages until we have to flush /// them to the backend. buffer: BytesMut, + /// Address + addr: std::net::SocketAddr, + /// The client was started with the sole reason to cancel another running query. cancel_mode: bool, @@ -63,161 +73,307 @@ pub struct Client { last_server_id: Option, } -impl Client { - /// Perform client startup sequence. - /// See docs: - pub async fn startup( - mut stream: TcpStream, - client_server_map: ClientServerMap, - ) -> Result { - let config = get_config(); - let transaction_mode = config.general.pool_mode == "transaction"; - let stats = get_reporter(); +/// Client entrypoint. +pub async fn client_entrypoint( + mut stream: TcpStream, + client_server_map: ClientServerMap, +) -> Result<(), Error> { + // Figure out if the client wants TLS or not. + let addr = stream.peer_addr().unwrap(); - loop { - trace!("Waiting for StartupMessage"); + match get_startup::(&mut stream).await { + // Client requested a TLS connection. + Ok((ClientConnectionType::Tls, _)) => { + let config = get_config(); - // Could be StartupMessage, SSLRequest or CancelRequest. - let len = match stream.read_i32().await { - Ok(len) => len, - Err(_) => return Err(Error::ClientBadStartup), - }; + // TLS settings are configured, will setup TLS now. + if config.general.tls_certificate != None { + debug!("Accepting TLS request"); - let mut startup = vec![0u8; len as usize - 4]; + let mut yes = BytesMut::new(); + yes.put_u8(b'S'); + write_all(&mut stream, yes).await?; - match stream.read_exact(&mut startup).await { - Ok(_) => (), - Err(_) => return Err(Error::ClientBadStartup), - }; + // Negotiate TLS. + match startup_tls(stream, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} connected (TLS)", addr); - let mut bytes = BytesMut::from(&startup[..]); - let code = bytes.get_i32(); + client.handle().await + } + Err(err) => Err(err), + } + } + // TLS is not configured, we cannot offer it. + else { + // Rejecting client request for TLS. + let mut no = BytesMut::new(); + no.put_u8(b'N'); + write_all(&mut stream, no).await?; + + // Attempting regular startup. Client can disconnect now + // if they choose. + match get_startup::(&mut stream).await { + // Client accepted unencrypted connection. + Ok((ClientConnectionType::Startup, bytes)) => { + let (read, write) = split(stream); + + // Continue with regular startup. + match Client::startup(read, write, addr, bytes, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} connected (plain)", addr); + + client.handle().await + } + Err(err) => Err(err), + } + } + + // Client probably disconnected rejecting our plain text connection. + _ => Err(Error::ProtocolSyncError), + } + } + } - match code { - // Client wants SSL. We don't support it at the moment. - SSL_REQUEST_CODE => { - trace!("Rejecting SSLRequest"); + // Client wants to use plain connection without encryption. + Ok((ClientConnectionType::Startup, bytes)) => { + let (read, write) = split(stream); - let mut no = BytesMut::with_capacity(1); - no.put_u8(b'N'); + // Continue with regular startup. + match Client::startup(read, write, addr, bytes, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} connected (plain)", addr); - write_all(&mut stream, no).await?; + client.handle().await } + Err(err) => Err(err), + } + } - // Regular startup message. - PROTOCOL_VERSION_NUMBER => { - trace!("Got StartupMessage"); - let parameters = parse_startup(bytes.clone())?; + // Client wants to cancel a query. + Ok((ClientConnectionType::CancelQuery, bytes)) => { + let (read, write) = split(stream); - // Generate random backend ID and secret key - let process_id: i32 = rand::random(); - let secret_key: i32 = rand::random(); + // Continue with cancel query request. + match Client::cancel(read, write, addr, bytes, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} issued a cancel query request", addr); - // Perform MD5 authentication. - // TODO: Add SASL support. - let salt = md5_challenge(&mut stream).await?; + client.handle().await + } - let code = match stream.read_u8().await { - Ok(p) => p, - Err(_) => return Err(Error::SocketError), - }; + Err(err) => Err(err), + } + } - // PasswordMessage - if code as char != 'p' { - debug!("Expected p, got {}", code as char); - return Err(Error::ProtocolSyncError); - } + // Something failed, probably the socket. + Err(err) => Err(err), + } +} + +/// Handle the first message the client sends. +async fn get_startup(stream: &mut S) -> Result<(ClientConnectionType, BytesMut), Error> +where + S: tokio::io::AsyncRead + std::marker::Unpin + tokio::io::AsyncWrite, +{ + // Get startup message length. + let len = match stream.read_i32().await { + Ok(len) => len, + Err(_) => return Err(Error::ClientBadStartup), + }; + + // Get the rest of the message. + let mut startup = vec![0u8; len as usize - 4]; + match stream.read_exact(&mut startup).await { + Ok(_) => (), + Err(_) => return Err(Error::ClientBadStartup), + }; + + let mut bytes = BytesMut::from(&startup[..]); + let code = bytes.get_i32(); + + match code { + // Client is requesting SSL (TLS). + SSL_REQUEST_CODE => Ok((ClientConnectionType::Tls, bytes)), + + // Client wants to use plain text, requesting regular startup. + PROTOCOL_VERSION_NUMBER => Ok((ClientConnectionType::Startup, bytes)), + + // Client is requesting to cancel a running query (plain text connection). + CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)), + + // Something else, probably something is wrong and it's not our fault, + // e.g. badly implemented Postgres client. + _ => Err(Error::ProtocolSyncError), + } +} + +/// Handle TLS connection negotation. +pub async fn startup_tls( + stream: TcpStream, + client_server_map: ClientServerMap, +) -> Result>, WriteHalf>>, Error> { + // Negotiate TLS. + let tls = Tls::new()?; + let addr = stream.peer_addr().unwrap(); + + let mut stream = match tls.acceptor.accept(stream).await { + Ok(stream) => stream, + + // TLS negotitation failed. + Err(err) => { + error!("TLS negotiation failed: {:?}", err); + return Err(Error::TlsError); + } + }; - let len = match stream.read_i32().await { - Ok(len) => len, - Err(_) => return Err(Error::SocketError), - }; + // TLS negotitation successful. + // Continue with regular startup using encrypted connection. + match get_startup::>(&mut stream).await { + // Got good startup message, proceeding like normal except we + // are encrypted now. + Ok((ClientConnectionType::Startup, bytes)) => { + let (read, write) = split(stream); - let mut password_response = vec![0u8; (len - 4) as usize]; + Client::startup(read, write, addr, bytes, client_server_map).await + } - match stream.read_exact(&mut password_response).await { - Ok(_) => (), - Err(_) => return Err(Error::SocketError), - }; + // Bad Postgres client. + _ => Err(Error::ProtocolSyncError), + } +} + +impl Client +where + S: tokio::io::AsyncRead + std::marker::Unpin, + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ + /// Handle Postgres client startup after TLS negotiation is complete + /// or over plain text. + pub async fn startup( + mut read: S, + mut write: T, + addr: std::net::SocketAddr, + bytes: BytesMut, // The rest of the startup message. + client_server_map: ClientServerMap, + ) -> Result, Error> { + let config = get_config(); + let transaction_mode = config.general.pool_mode == "transaction"; + let stats = get_reporter(); - // Compare server and client hashes. - let password_hash = - md5_hash_password(&config.user.name, &config.user.password, &salt); + trace!("Got StartupMessage"); + let parameters = parse_startup(bytes.clone())?; - if password_hash != password_response { - debug!("Password authentication failed"); - wrong_password(&mut stream, &config.user.name).await?; - return Err(Error::ClientError); - } + // Generate random backend ID and secret key + let process_id: i32 = rand::random(); + let secret_key: i32 = rand::random(); - debug!("Password authentication successful"); - - auth_ok(&mut stream).await?; - write_all(&mut stream, get_pool().server_info()).await?; - backend_key_data(&mut stream, process_id, secret_key).await?; - ready_for_query(&mut stream).await?; - - trace!("Startup OK"); - - let database = parameters - .get("database") - .unwrap_or(parameters.get("user").unwrap()); - let admin = ["pgcat", "pgbouncer"] - .iter() - .filter(|db| *db == &database) - .count() - == 1; - - // Split the read and write streams - // so we can control buffering. - let (read, write) = stream.into_split(); - - return Ok(Client { - read: BufReader::new(read), - write: write, - buffer: BytesMut::with_capacity(8196), - cancel_mode: false, - transaction_mode: transaction_mode, - process_id: process_id, - secret_key: secret_key, - client_server_map: client_server_map, - parameters: parameters, - stats: stats, - admin: admin, - last_address_id: None, - last_server_id: None, - }); - } + // Perform MD5 authentication. + // TODO: Add SASL support. + let salt = md5_challenge(&mut write).await?; - // Query cancel request. - CANCEL_REQUEST_CODE => { - let (read, write) = stream.into_split(); - - let process_id = bytes.get_i32(); - let secret_key = bytes.get_i32(); - - return Ok(Client { - read: BufReader::new(read), - write: write, - buffer: BytesMut::with_capacity(8196), - cancel_mode: true, - transaction_mode: transaction_mode, - process_id: process_id, - secret_key: secret_key, - client_server_map: client_server_map, - parameters: HashMap::new(), - stats: stats, - admin: false, - last_address_id: None, - last_server_id: None, - }); - } + let code = match read.read_u8().await { + Ok(p) => p, + Err(_) => return Err(Error::SocketError), + }; - _ => { - return Err(Error::ProtocolSyncError); - } - }; + // PasswordMessage + if code as char != 'p' { + debug!("Expected p, got {}", code as char); + return Err(Error::ProtocolSyncError); } + + let len = match read.read_i32().await { + Ok(len) => len, + Err(_) => return Err(Error::SocketError), + }; + + let mut password_response = vec![0u8; (len - 4) as usize]; + + match read.read_exact(&mut password_response).await { + Ok(_) => (), + Err(_) => return Err(Error::SocketError), + }; + + // Compare server and client hashes. + let password_hash = md5_hash_password(&config.user.name, &config.user.password, &salt); + + if password_hash != password_response { + debug!("Password authentication failed"); + wrong_password(&mut write, &config.user.name).await?; + return Err(Error::ClientError); + } + + debug!("Password authentication successful"); + + auth_ok(&mut write).await?; + write_all(&mut write, get_pool().server_info()).await?; + backend_key_data(&mut write, process_id, secret_key).await?; + ready_for_query(&mut write).await?; + + trace!("Startup OK"); + + let database = parameters + .get("database") + .unwrap_or(parameters.get("user").unwrap()); + let admin = ["pgcat", "pgbouncer"] + .iter() + .filter(|db| *db == &database) + .count() + == 1; + + // Split the read and write streams + // so we can control buffering. + + return Ok(Client { + read: BufReader::new(read), + write: write, + addr, + buffer: BytesMut::with_capacity(8196), + cancel_mode: false, + transaction_mode: transaction_mode, + process_id: process_id, + secret_key: secret_key, + client_server_map: client_server_map, + parameters: parameters, + stats: stats, + admin: admin, + last_address_id: None, + last_server_id: None, + }); + } + + /// Handle cancel request. + pub async fn cancel( + read: S, + write: T, + addr: std::net::SocketAddr, + mut bytes: BytesMut, // The rest of the startup message. + client_server_map: ClientServerMap, + ) -> Result, Error> { + let process_id = bytes.get_i32(); + let secret_key = bytes.get_i32(); + + let config = get_config(); + let transaction_mode = config.general.pool_mode == "transaction"; + let stats = get_reporter(); + + return Ok(Client { + read: BufReader::new(read), + write: write, + addr, + buffer: BytesMut::with_capacity(8196), + cancel_mode: true, + transaction_mode: transaction_mode, + process_id: process_id, + secret_key: secret_key, + client_server_map: client_server_map, + parameters: HashMap::new(), + stats: stats, + admin: false, + last_address_id: None, + last_server_id: None, + }); } /// Handle a connected and authenticated client. @@ -415,7 +571,7 @@ impl Client { debug!( "Client {:?} talking to server {:?}", - self.write.peer_addr().unwrap(), + self.addr, server.address() ); @@ -650,8 +806,11 @@ impl Client { } } -impl Drop for Client { +impl Drop for Client { fn drop(&mut self) { + let mut guard = self.client_server_map.lock(); + guard.remove(&(self.process_id, self.secret_key)); + // Update statistics. if let Some(address_id) = self.last_address_id { self.stats.client_disconnecting(self.process_id, address_id); @@ -660,5 +819,7 @@ impl Drop for Client { self.stats.server_idle(process_id, address_id); } } + + // self.release(); } } diff --git a/src/config.rs b/src/config.rs index f6fa129c..da59d2ae 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,12 +4,14 @@ use log::{error, info}; use once_cell::sync::Lazy; use serde_derive::Deserialize; use std::collections::{HashMap, HashSet}; +use std::path::Path; use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncReadExt; use toml; use crate::errors::Error; +use crate::tls::{load_certs, load_keys}; use crate::{ClientServerMap, ConnectionPool}; /// Globally available configuration. @@ -111,6 +113,8 @@ pub struct General { pub healthcheck_timeout: u64, pub ban_time: i64, pub autoreload: bool, + pub tls_certificate: Option, + pub tls_private_key: Option, } impl Default for General { @@ -124,6 +128,8 @@ impl Default for General { healthcheck_timeout: 1000, ban_time: 60, autoreload: false, + tls_certificate: None, + tls_private_key: None, } } } @@ -249,6 +255,25 @@ impl Config { info!("Primary reads: {}", self.query_router.primary_reads_enabled); info!("Query router: {}", self.query_router.query_parser_enabled); info!("Number of shards: {}", self.shards.len()); + + match self.general.tls_certificate.clone() { + Some(tls_certificate) => { + info!("TLS certificate: {}", tls_certificate); + + match self.general.tls_private_key.clone() { + Some(tls_private_key) => { + info!("TLS private key: {}", tls_private_key); + info!("TLS support is enabled"); + } + + None => (), + } + } + + None => { + info!("TLS support is disabled"); + } + }; } } @@ -368,6 +393,37 @@ pub async fn parse(path: &str) -> Result<(), Error> { } }; + // Validate TLS! + match config.general.tls_certificate.clone() { + Some(tls_certificate) => { + match load_certs(&Path::new(&tls_certificate)) { + Ok(_) => { + // Cert is okay, but what about the private key? + match config.general.tls_private_key.clone() { + Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) { + Ok(_) => (), + Err(err) => { + error!("tls_private_key is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); + } + }, + + None => { + error!("tls_certificate is set, but the tls_private_key is not"); + return Err(Error::BadConfig); + } + }; + } + + Err(err) => { + error!("tls_certificate is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); + } + } + } + None => (), + }; + config.path = path.to_string(); // Update the configuration globally. diff --git a/src/errors.rs b/src/errors.rs index b07d5088..cc8f65d0 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -10,4 +10,5 @@ pub enum Error { BadConfig, AllServersDown, ClientError, + TlsError, } diff --git a/src/main.rs b/src/main.rs index 7b78e5b1..49795b70 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,10 +28,12 @@ extern crate log; extern crate md5; extern crate num_cpus; extern crate once_cell; +extern crate rustls_pemfile; extern crate serde; extern crate serde_derive; extern crate sqlparser; extern crate tokio; +extern crate tokio_rustls; extern crate toml; use log::{debug, error, info}; @@ -58,6 +60,7 @@ mod scram; mod server; mod sharding; mod stats; +mod tls; use config::{get_config, reload_config}; use pool::{ClientServerMap, ConnectionPool}; @@ -150,30 +153,20 @@ async fn main() { // Handle client. tokio::task::spawn(async move { let start = chrono::offset::Utc::now().naive_utc(); - match client::Client::startup(socket, client_server_map).await { - Ok(mut client) => { - info!("Client {:?} connected", addr); - - match client.handle().await { - Ok(()) => { - let duration = chrono::offset::Utc::now().naive_utc() - start; - - info!( - "Client {:?} disconnected, session duration: {}", - addr, - format_duration(&duration) - ); - } - - Err(err) => { - error!("Client disconnected with error: {:?}", err); - client.release(); - } - } + + match client::client_entrypoint(socket, client_server_map).await { + Ok(_) => { + let duration = chrono::offset::Utc::now().naive_utc() - start; + + info!( + "Client {:?} disconnected, session duration: {}", + addr, + format_duration(&duration) + ); } Err(err) => { - debug!("Client failed to login: {:?}", err); + debug!("Client disconnected with error {:?}", err); } }; }); diff --git a/src/messages.rs b/src/messages.rs index 993545bb..89795c65 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -2,11 +2,8 @@ /// and handle TcpStream (TCP socket). use bytes::{Buf, BufMut, BytesMut}; use md5::{Digest, Md5}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; use crate::errors::Error; use std::collections::HashMap; @@ -30,7 +27,10 @@ impl From<&DataType> for i32 { } /// Tell the client that authentication handshake completed successfully. -pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> { +pub async fn auth_ok(stream: &mut S) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut auth_ok = BytesMut::with_capacity(9); auth_ok.put_u8(b'R'); @@ -41,7 +41,10 @@ pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> { } /// Generate md5 password challenge. -pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> { +pub async fn md5_challenge(stream: &mut S) -> Result<[u8; 4], Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ // let mut rng = rand::thread_rng(); let salt: [u8; 4] = [ rand::random(), @@ -62,11 +65,14 @@ pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> { /// Give the client the process_id and secret we generated /// used in query cancellation. -pub async fn backend_key_data( - stream: &mut TcpStream, +pub async fn backend_key_data( + stream: &mut S, backend_id: i32, secret_key: i32, -) -> Result<(), Error> { +) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut key_data = BytesMut::from(&b"K"[..]); key_data.put_i32(12); key_data.put_i32(backend_id); @@ -87,7 +93,10 @@ pub fn simple_query(query: &str) -> BytesMut { } /// Tell the client we're ready for another query. -pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> { +pub async fn ready_for_query(stream: &mut S) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut bytes = BytesMut::with_capacity(5); bytes.put_u8(b'Z'); @@ -205,12 +214,15 @@ pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec { /// Send password challenge response to the server. /// This is the MD5 challenge. -pub async fn md5_password( - stream: &mut TcpStream, +pub async fn md5_password( + stream: &mut S, user: &str, password: &str, salt: &[u8], -) -> Result<(), Error> { +) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let password = md5_hash_password(user, password, salt); let mut message = BytesMut::with_capacity(password.len() as usize + 5); @@ -225,10 +237,10 @@ pub async fn md5_password( /// Implements a response to our custom `SET SHARDING KEY` /// and `SET SERVER ROLE` commands. /// This tells the client we're ready for the next query. -pub async fn custom_protocol_response_ok( - stream: &mut OwnedWriteHalf, - message: &str, -) -> Result<(), Error> { +pub async fn custom_protocol_response_ok(stream: &mut S, message: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut res = BytesMut::with_capacity(25); let set_complete = BytesMut::from(&format!("{}\0", message)[..]); @@ -250,7 +262,10 @@ pub async fn custom_protocol_response_ok( /// Send a custom error message to the client. /// Tell the client we are ready for the next query and no rollback is necessary. /// Docs on error codes: . -pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> { +pub async fn error_response(stream: &mut S, message: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut error = BytesMut::new(); // Error level @@ -291,7 +306,10 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul Ok(write_all_half(stream, res).await?) } -pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Error> { +pub async fn wrong_password(stream: &mut S, user: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut error = BytesMut::new(); // Error level @@ -325,11 +343,10 @@ pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Er } /// Respond to a SHOW SHARD command. -pub async fn show_response( - stream: &mut OwnedWriteHalf, - name: &str, - value: &str, -) -> Result<(), Error> { +pub async fn show_response(stream: &mut S, name: &str, value: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ // A SELECT response consists of: // 1. RowDescription // 2. One or more DataRow @@ -430,7 +447,10 @@ pub fn command_complete(command: &str) -> BytesMut { } /// Write all data in the buffer to the TcpStream. -pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> { +pub async fn write_all(stream: &mut S, buf: BytesMut) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ match stream.write_all(&buf).await { Ok(_) => Ok(()), Err(_) => return Err(Error::SocketError), @@ -438,7 +458,10 @@ pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Erro } /// Write all the data in the buffer to the TcpStream, write owned half (see mpsc). -pub async fn write_all_half(stream: &mut OwnedWriteHalf, buf: BytesMut) -> Result<(), Error> { +pub async fn write_all_half(stream: &mut S, buf: BytesMut) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ match stream.write_all(&buf).await { Ok(_) => Ok(()), Err(_) => return Err(Error::SocketError), @@ -446,7 +469,10 @@ pub async fn write_all_half(stream: &mut OwnedWriteHalf, buf: BytesMut) -> Resul } /// Read a complete message from the socket. -pub async fn read_message(stream: &mut BufReader) -> Result { +pub async fn read_message(stream: &mut S) -> Result +where + S: tokio::io::AsyncRead + std::marker::Unpin, +{ let code = match stream.read_u8().await { Ok(code) => code, Err(_) => return Err(Error::SocketError), diff --git a/src/tls.rs b/src/tls.rs new file mode 100644 index 00000000..3bc4a6a0 --- /dev/null +++ b/src/tls.rs @@ -0,0 +1,57 @@ +// Stream wrapper. + +use rustls_pemfile::{certs, rsa_private_keys}; +use std::path::Path; +use std::sync::Arc; +use tokio_rustls::rustls::{self, Certificate, PrivateKey}; +use tokio_rustls::TlsAcceptor; + +use crate::config::get_config; +use crate::errors::Error; + +// TLS +pub fn load_certs(path: &Path) -> std::io::Result> { + certs(&mut std::io::BufReader::new(std::fs::File::open(path)?)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert")) + .map(|mut certs| certs.drain(..).map(Certificate).collect()) +} + +pub fn load_keys(path: &Path) -> std::io::Result> { + rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key")) + .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) +} + +pub struct Tls { + pub acceptor: TlsAcceptor, +} + +impl Tls { + pub fn new() -> Result { + let config = get_config(); + + let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) { + Ok(certs) => certs, + Err(_) => return Err(Error::TlsError), + }; + + let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) { + Ok(keys) => keys, + Err(_) => return Err(Error::TlsError), + }; + + let config = match rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, keys.remove(0)) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) + { + Ok(c) => c, + Err(_) => return Err(Error::TlsError), + }; + + Ok(Tls { + acceptor: TlsAcceptor::from(Arc::new(config)), + }) + } +}