Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultipartStore for ThrottledStore #5533

Merged
merged 3 commits into from
Mar 29, 2024

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

Follow up to #5500

What changes are included in this PR?

Are there any user-facing changes?

Limit concurrency in BufWriter

Tweak WriteMultipart
@github-actions github-actions bot added the object-store Object Store Interface label Mar 20, 2024
/// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks
pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: usize) -> Self {
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this naming is a bit more obvious

pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
while self.tasks.len() > max_concurrency {
self.tasks.join_next().await.unwrap()??;
/// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this to be less than instead of equal as I think it means if you want to limit to x requests you poll for x requests, as opposed to x - 1

@@ -216,6 +216,7 @@ impl AsyncBufRead for BufReader {
/// streamed using [`ObjectStore::put_multipart`]
pub struct BufWriter {
capacity: usize,
max_concurrency: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to #5500 this was an implementation detail of the stores, now it is user configurable 🎉

Comment on lines +259 to +267
/// Override the maximum number of in-flight requests for this writer
///
/// Defaults to 8
pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
Self {
max_concurrency,
..self
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if 0 is passed in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will apply backpressure if there is any in-flight request

@tustvold tustvold merged commit 9f36c88 into apache:master Mar 29, 2024
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
object-store Object Store Interface
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants