Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make put_object_stream for large files progressively upload several chunks at a time #404

Open
kskalski opened this issue Oct 14, 2024 · 1 comment
Assignees

Comments

@kskalski
Copy link

Is your feature request related to a problem? Please describe.
I noticed that the implementation of _put_object_stream_with_content_type doing multi-part upload reads up all the chunks first (creating upload futures) and only then awaits for them to execute.
This blows up memory usage when uploading large files.

Additionally I suspect it might be the cause of frequent "Broken pipe 32" errors that I'm seeing (could it be that we initiate request to s3, but before we actually start sending data, that is after reading all chunks from disk to memory, the server closes connection out of impatience?).

Describe the solution you'd like
It should incrementally add chunks (as their content is read to memory) for multi-part upload while awaiting those already created.
This kind of queue scheduling is not the most trival with rust async, but doable e.g. with https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html (or possibly with multi-receiver channels like flume)

Describe alternatives you've considered
I suppose for now the way to go is to switch to initiate_multipart_upload and put_multipart_chunk on the user side, re-implementing _put_object_stream_with_content_type

Additional context
related or not, the main issue I'm fighting right now are Broken pipe and error like that:

reqwest::Error { kind: Request, url: "https://...amazonaws.com/...", source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(IncompleteMessage)) }

will report back if I find something more about it, but I think it's consistent with the hypothesis that we initiate request, but not send body fast enough.

@kskalski
Copy link
Author

Confirmed that putting parts one by one using multi-part upload API for large files fixes the broken pipe and incomplete message errors for me.

So the simplest workaround is:

let multi = bucket.initiate_multipart_upload(dst, CONTENT_TYPE).await?;
let mut parts = vec![];
loop {
    let chunk = s3::utils::read_chunk_async(&mut file).await?;
    if chunk.is_empty() {
        break;
    }
    let nr = parts.len() as u32 + 1;
    tracing::debug!(len = chunk.len(), multi.upload_id, nr, "Putting part");
    let part = bucket
        .put_multipart_chunk(
            chunk,
            &multi.key,
            nr,
            &multi.upload_id,
            CONTENT_TYPE,
        )
        .await?;
    parts.push(part);
}
tracing::debug!(multi.upload_id, num_parts = parts.len(), "Completing");
let completed = bucket
    .complete_multipart_upload(&multi.key, &multi.upload_id, parts)
    .await?;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants