Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add crc32c checksums to S3 Service #4533

Merged
merged 8 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ prometheus-client = { version = "0.22.2", optional = true }
tracing = { version = "0.1", optional = true }
# for layers-dtrace
probe = { version = "0.5.1", optional = true }
crc32c = "0.6.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency doesn't seem to be grouped cleanly like others.

Copy link
Member

@Xuanwo Xuanwo May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, would like to send a PR to fix this? Also, I think this dep should be hidden under services-s3.


[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand Down
30 changes: 30 additions & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ pub struct S3Config {
///
/// For example, R2 doesn't support stat with `response_content_type` query.
pub disable_stat_with_override: bool,
/// Checksum Algorithm to use when sending checksums in HTTP headers.
/// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
///
/// Available options:
/// - "crc32c"
pub checksum_algorithm: Option<String>,
}

impl Debug for S3Config {
Expand Down Expand Up @@ -663,6 +669,18 @@ impl S3Builder {
self
}

/// Set checksum algorithm of this backend.
///
/// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
///
/// Available options:
/// - "crc32c"
pub fn checksum_algorithm(&mut self, checksum_algorithm: &str) -> &mut Self {
self.config.checksum_algorithm = Some(checksum_algorithm.to_string());

self
}

/// Detect region of S3 bucket.
///
/// # Args
Expand Down Expand Up @@ -858,6 +876,17 @@ impl Builder for S3Builder {
})?),
};

let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
None => None,
_ => {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"{v} is not a supported checksum_algorithm.",
))
}
};

let client = if let Some(client) = self.http_client.take() {
client
} else {
Expand Down Expand Up @@ -979,6 +1008,7 @@ impl Builder for S3Builder {
credential_loaded: AtomicBool::new(false),
client,
batch_max_operations,
checksum_algorithm,
}),
})
}
Expand Down
57 changes: 57 additions & 0 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use std::time::Duration;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use http::header::HeaderName;
use http::header::CACHE_CONTROL;
Expand Down Expand Up @@ -88,6 +90,7 @@ pub struct S3Core {
pub credential_loaded: AtomicBool,
pub client: HttpClient,
pub batch_max_operations: usize,
pub checksum_algorithm: Option<ChecksumAlgorithm>,
}

impl Debug for S3Core {
Expand Down Expand Up @@ -246,6 +249,40 @@ impl S3Core {

req
}

pub fn insert_checksum_header(
&self,
mut req: http::request::Builder,
body: &Buffer,
) -> http::request::Builder {
if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
let checksum = match checksum_algorithm {
ChecksumAlgorithm::Crc32c => {
let mut crc = 0u32;
body.clone()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clone should be okay, since it does not do any allocation (?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this clone is Arc::clone.

.for_each(|b| crc = crc32c::crc32c_append(crc, &b));
BASE64_STANDARD.encode(crc.to_be_bytes())
}
};
req = req.header(checksum_algorithm.to_header_key(), checksum);
}
req
}

pub fn insert_checksum_type_header(
&self,
mut req: http::request::Builder,
) -> http::request::Builder {
if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
req = req.header(
"x-amz-checksum-algorithm",
match checksum_algorithm {
ChecksumAlgorithm::Crc32c => "CRC32C",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement Display for ChecksumAlgorithm

},
);
}
req
}
}

impl S3Core {
Expand Down Expand Up @@ -408,6 +445,9 @@ impl S3Core {
// Set SSE headers.
req = self.insert_sse_headers(req, true);

// Set Checksum header.
req = self.insert_checksum_header(req, &body);

// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Expand Down Expand Up @@ -573,6 +613,9 @@ impl S3Core {
// Set SSE headers.
let req = self.insert_sse_headers(req, true);

// Set SSE headers.
let req = self.insert_checksum_type_header(req);

let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;

self.sign(&mut req).await?;
Expand Down Expand Up @@ -605,6 +648,9 @@ impl S3Core {
// Set SSE headers.
req = self.insert_sse_headers(req, true);

// Set Checksum header.
req = self.insert_checksum_header(req, &body);

// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Expand Down Expand Up @@ -821,6 +867,17 @@ pub struct OutputCommonPrefix {
pub prefix: String,
}

pub enum ChecksumAlgorithm {
Crc32c,
}
impl ChecksumAlgorithm {
pub fn to_header_key(&self) -> &str {
Copy link
Member

@Xuanwo Xuanwo Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using to_header_name()? And I think we can use HeaderName here directly.

match self {
Self::Crc32c => "x-amz-checksum-crc32c",
}
}
}

#[cfg(test)]
mod tests {
use bytes::Buf;
Expand Down
Loading