From 7a01f4dd5e08b3a8afdf4c13b339ec6683bfce6e Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Thu, 25 Apr 2024 18:37:27 +0200 Subject: [PATCH 1/8] Add crc32c checksums to S3 --- core/Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/services/s3/backend.rs | 26 +++++++++++++++++++++++++ core/src/services/s3/core.rs | 34 +++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+) diff --git a/core/Cargo.lock b/core/Cargo.lock index 19abf6f5277..7faf93611c7 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4763,6 +4763,7 @@ dependencies = [ "cacache", "chrono", "compio", + "crc32c", "criterion", "dashmap", "dotenvy", diff --git a/core/Cargo.toml b/core/Cargo.toml index 20943924847..75f26150e56 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -350,6 +350,7 @@ prometheus-client = { version = "0.22.2", optional = true } tracing = { version = "0.1", optional = true } # for layers-dtrace probe = { version = "0.5.1", optional = true } +crc32c = "0.6.5" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 6a43746d6fb..1ae9a5ff834 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -204,6 +204,14 @@ pub struct S3Config { /// /// For example, R2 doesn't support stat with `response_content_type` query. pub disable_stat_with_override: bool, + /// Checksum Algorithm to use when sending checksums in HTTP headers. + /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example. + /// + /// Available options: + /// | Option | HTTP Header | + /// | -------- | ----------------------| + /// | "crc32c" | x-amz-checksum-crc32c | + pub checksum_algorithm: Option, } impl Debug for S3Config { @@ -663,6 +671,13 @@ impl S3Builder { self } + /// Set checksum algorithm of this backend. + pub fn checksum_algorithm(&mut self, checksum_algorithm: &str) -> &mut Self { + self.config.checksum_algorithm = Some(checksum_algorithm.to_string()); + + self + } + /// Detect region of S3 bucket. /// /// # Args @@ -858,6 +873,16 @@ impl Builder for S3Builder { })?), }; + let checksum_algorithm = match self.config.checksum_algorithm.as_deref() { + Some("crc32c") => Some(S3ChecksumAlgorithm::Crc32c), + None => None, + _ => return Err(Error::new( + ErrorKind::ConfigInvalid, + "{v} is not a supported checksum_algorithm.", + )), + }; + + let client = if let Some(client) = self.http_client.take() { client } else { @@ -979,6 +1004,7 @@ impl Builder for S3Builder { credential_loaded: AtomicBool::new(false), client, batch_max_operations, + checksum_algorithm, }), }) } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index a22c4af96d7..ce915129d79 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -88,6 +88,7 @@ pub struct S3Core { pub credential_loaded: AtomicBool, pub client: HttpClient, pub batch_max_operations: usize, + pub checksum_algorithm: Option, } impl Debug for S3Core { @@ -246,6 +247,22 @@ impl S3Core { req } + + pub fn insert_checksum_header( + &self, + mut req: http::request::Builder, + body: &Buffer, + ) -> http::request::Builder { + if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() { + let checksum = match checksum_algorithm { + S3ChecksumAlgorithm::Crc32c => { + format!("{}", crc32c::crc32c(body.to_vec().as_slice())) + } + }; + req = req.header(checksum_algorithm.to_header_key(), checksum); + } + req + } } impl S3Core { @@ -408,6 +425,9 @@ impl S3Core { // Set SSE headers. req = self.insert_sse_headers(req, true); + // Set Checksum header. + req = self.insert_checksum_header(req, &body); + // Set body let req = req.body(body).map_err(new_request_build_error)?; @@ -605,6 +625,9 @@ impl S3Core { // Set SSE headers. req = self.insert_sse_headers(req, true); + // Set Checksum header. + req = self.insert_checksum_header(req, &body); + // Set body let req = req.body(body).map_err(new_request_build_error)?; @@ -821,6 +844,17 @@ pub struct OutputCommonPrefix { pub prefix: String, } +pub enum S3ChecksumAlgorithm { + Crc32c, +} +impl S3ChecksumAlgorithm { + pub fn to_header_key(&self) -> &str { + match self { + Self::Crc32c => "x-amz-checksum-crc32c", + } + } +} + #[cfg(test)] mod tests { use bytes::Buf; From f5819e04a2b0271348c3c7083a2b940573c6d7b6 Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 00:50:32 +0200 Subject: [PATCH 2/8] Properly encode checksum based on https://docs.aws.amazon.com/AmazonS3/latest/API/API_Checksum.html --- core/src/services/s3/core.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index ce915129d79..7518a8fa0e8 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -23,6 +23,8 @@ use std::sync::atomic; use std::sync::atomic::AtomicBool; use std::time::Duration; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; use bytes::Bytes; use http::header::HeaderName; use http::header::CACHE_CONTROL; @@ -256,7 +258,7 @@ impl S3Core { if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() { let checksum = match checksum_algorithm { S3ChecksumAlgorithm::Crc32c => { - format!("{}", crc32c::crc32c(body.to_vec().as_slice())) + BASE64_STANDARD.encode(crc32c::crc32c(body.to_vec().as_slice()).to_be_bytes()) } }; req = req.header(checksum_algorithm.to_header_key(), checksum); From 30cf0488a9dddecb32ede45f7d1624e005869428 Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 00:55:08 +0200 Subject: [PATCH 3/8] Fix formatting --- core/src/services/s3/backend.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 1ae9a5ff834..aa5a43c7b4b 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -876,13 +876,14 @@ impl Builder for S3Builder { let checksum_algorithm = match self.config.checksum_algorithm.as_deref() { Some("crc32c") => Some(S3ChecksumAlgorithm::Crc32c), None => None, - _ => return Err(Error::new( - ErrorKind::ConfigInvalid, - "{v} is not a supported checksum_algorithm.", - )), + _ => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "{v} is not a supported checksum_algorithm.", + )) + } }; - let client = if let Some(client) = self.http_client.take() { client } else { From 3678539c1c56ea68c26e87f2c667a4127a375de0 Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 09:35:24 +0200 Subject: [PATCH 4/8] Remove S3 prefix, don't copy body & declare checksum_algorithm in initiate MultipartUpload --- core/src/services/s3/core.rs | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 7518a8fa0e8..13e77555faa 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -90,7 +90,7 @@ pub struct S3Core { pub credential_loaded: AtomicBool, pub client: HttpClient, pub batch_max_operations: usize, - pub checksum_algorithm: Option, + pub checksum_algorithm: Option, } impl Debug for S3Core { @@ -257,14 +257,32 @@ impl S3Core { ) -> http::request::Builder { if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() { let checksum = match checksum_algorithm { - S3ChecksumAlgorithm::Crc32c => { - BASE64_STANDARD.encode(crc32c::crc32c(body.to_vec().as_slice()).to_be_bytes()) + ChecksumAlgorithm::Crc32c => { + let mut crc = 0u32; + body.clone() + .for_each(|b| crc = crc32c::crc32c_append(crc, &b)); + BASE64_STANDARD.encode(crc.to_be_bytes()) } }; req = req.header(checksum_algorithm.to_header_key(), checksum); } req } + + pub fn insert_checksum_type_header( + &self, + mut req: http::request::Builder, + ) -> http::request::Builder { + if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() { + req = req.header( + "x-amz-checksum-algorithm", + match checksum_algorithm { + ChecksumAlgorithm::Crc32c => "CRC32C", + }, + ); + } + req + } } impl S3Core { @@ -595,6 +613,9 @@ impl S3Core { // Set SSE headers. let req = self.insert_sse_headers(req, true); + // Set SSE headers. + let req = self.insert_checksum_type_header(req); + let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; self.sign(&mut req).await?; @@ -846,10 +867,10 @@ pub struct OutputCommonPrefix { pub prefix: String, } -pub enum S3ChecksumAlgorithm { +pub enum ChecksumAlgorithm { Crc32c, } -impl S3ChecksumAlgorithm { +impl ChecksumAlgorithm { pub fn to_header_key(&self) -> &str { match self { Self::Crc32c => "x-amz-checksum-crc32c", From 6ebfd55d27e60b612e1e3cef67f44f20de11d6c6 Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 09:35:55 +0200 Subject: [PATCH 5/8] Improve documentation --- core/src/services/s3/backend.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index aa5a43c7b4b..d464eff6d1f 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -208,9 +208,7 @@ pub struct S3Config { /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example. /// /// Available options: - /// | Option | HTTP Header | - /// | -------- | ----------------------| - /// | "crc32c" | x-amz-checksum-crc32c | + /// - "crc32c" pub checksum_algorithm: Option, } @@ -672,6 +670,11 @@ impl S3Builder { } /// Set checksum algorithm of this backend. + /// + /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example. + /// + /// Available options: + /// - "crc32c" pub fn checksum_algorithm(&mut self, checksum_algorithm: &str) -> &mut Self { self.config.checksum_algorithm = Some(checksum_algorithm.to_string()); @@ -874,7 +877,7 @@ impl Builder for S3Builder { }; let checksum_algorithm = match self.config.checksum_algorithm.as_deref() { - Some("crc32c") => Some(S3ChecksumAlgorithm::Crc32c), + Some("crc32c") => Some(ChecksumAlgorithm::Crc32c), None => None, _ => { return Err(Error::new( From da1c104f3d970927461bcdc5799b4e1db0640f31 Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 09:51:04 +0200 Subject: [PATCH 6/8] Fix formatting --- core/src/services/s3/backend.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index d464eff6d1f..13f68061b07 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -670,7 +670,6 @@ impl S3Builder { } /// Set checksum algorithm of this backend. - /// /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example. /// /// Available options: From d6aef7ccafde78a65417cc6bfb0de037d8d8bfd5 Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 10:08:14 +0200 Subject: [PATCH 7/8] Refactor ChecksumAlgorithm --- core/src/services/s3/core.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 13e77555faa..e3df761b878 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -17,6 +17,7 @@ use std::fmt; use std::fmt::Debug; +use std::fmt::Display; use std::fmt::Formatter; use std::fmt::Write; use std::sync::atomic; @@ -264,7 +265,7 @@ impl S3Core { BASE64_STANDARD.encode(crc.to_be_bytes()) } }; - req = req.header(checksum_algorithm.to_header_key(), checksum); + req = req.header(checksum_algorithm.to_header_name(), checksum); } req } @@ -276,9 +277,7 @@ impl S3Core { if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() { req = req.header( "x-amz-checksum-algorithm", - match checksum_algorithm { - ChecksumAlgorithm::Crc32c => "CRC32C", - }, + checksum_algorithm.to_string(), ); } req @@ -871,12 +870,23 @@ pub enum ChecksumAlgorithm { Crc32c, } impl ChecksumAlgorithm { - pub fn to_header_key(&self) -> &str { + pub fn to_header_name(&self) -> HeaderName { match self { - Self::Crc32c => "x-amz-checksum-crc32c", + Self::Crc32c => HeaderName::from_static("x-amz-checksum-crc32c"), } } } +impl Display for ChecksumAlgorithm { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + match self { + Self::Crc32c => "CRC32C", + } + ) + } +} #[cfg(test)] mod tests { From de44c269d07f3386c5fd190022d875b0a92460ea Mon Sep 17 00:00:00 2001 From: Jan Wackerbauer Date: Fri, 26 Apr 2024 10:10:28 +0200 Subject: [PATCH 8/8] Fix formatting --- core/src/services/s3/core.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index e3df761b878..67b49d76efa 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -275,10 +275,7 @@ impl S3Core { mut req: http::request::Builder, ) -> http::request::Builder { if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() { - req = req.header( - "x-amz-checksum-algorithm", - checksum_algorithm.to_string(), - ); + req = req.header("x-amz-checksum-algorithm", checksum_algorithm.to_string()); } req }