-
Notifications
You must be signed in to change notification settings - Fork 640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for io.Reader Interface in S3 Transfer Manager's Downloader #2247
Comments
Hi @yacchi, This seems like a reasonable request. We will likely work on this when we re-implement Downloader. For now we cannot prioritize this, but I will add this to our backlog. Thanks! |
To @yacchi or anyone else who may be watching this issue -- I haven't tested this, but I believe you can achieve "sequential" I/O by setting download concurrency to 1, which is spec'd to guarantee sequential in-order multipart downloads. If that's the case your // sequentialWriterAt adapts WriteAt() calls to a sequential I/O implementation
type sequentialWriterAt struct {
w io.Writer // or copy to another reader, etc.
off int
}
func (v *sequentialWriterAt) WriteAt(p []byte, off int64) (int, error) {
if off != v.off {
return 0, fmt.Errorf("broken write sequence")
}
n, err := v.w.Write(p)
if err != nil {
return n, fmt.Errorf("write: %v", err)
}
v.off += n
return n, nil
} This is definitely something we'd like to support for concurrent downloads, though. |
Thanks for the helpful code. It looks like I can certainly do sequentialization that way. I implemented the
By using the implemented code, it is possible to download the tar file at an average speed of 800MiB/s or more in an environment with sufficient memory. |
@lucix-aws @RanVaknin I opened a PR for a concurrent io.Reader/io.WriteCloser for this issue. See #2622 |
Would definitely appreciate having a way to handle this as part of the SDK, as this brings quite a lot of complexity to usage. Also, the current proposed approach ( If this can be of help to anyone, I have implemented an This might use up to // WriteAtWriter is an implementation that handles write concurrency from the AWS S3 download manager.
// It works by storing pending bytes to its internal buffer and flushing them when the expected offset is reached.
type WriteAtWriter struct {
writer io.WriteCloser
offset int64
mu sync.Mutex
pending map[int64][]byte
}
func NewWriteAtWriter(writer io.WriteCloser) *WriteAtWriter {
return &WriteAtWriter{writer: writer, offset: 0, pending: make(map[int64][]byte)}
}
func (fw *WriteAtWriter) WriteAt(p []byte, offset int64) (n int, err error) {
fw.mu.Lock()
defer fw.mu.Unlock()
// If chunk is current expected chunk, write it and flush pending chunks
if offset == fw.offset {
written, err := fw.SendAll(p)
fw.offset += int64(written)
if err != nil {
return written, err
}
err = fw.flush()
if err != nil {
return written, err
}
return written, nil
}
// Otherwise, queue the chunk
// Copy is required, because AWS re-uses the `p` slice under the hood.
copied := make([]byte, len(p))
copy(copied, p)
fw.pending[offset] = copied
return len(p), nil
}
func (fw *WriteAtWriter) flush() error {
for {
p, ok := fw.pending[fw.offset]
if !ok {
break
}
written, err := fw.SendAll(p)
if err != nil {
return err
}
delete(fw.pending, fw.offset)
fw.offset += int64(written)
}
return nil
}
func (fw *WriteAtWriter) SendAll(bytes []byte) (int, error) {
written := 0
for written < len(bytes) {
n, err := fw.writer.Write(bytes[written:])
if err != nil {
return 0, err
}
written += n
}
return written, nil
} |
@pelletier197 you can use my s3io package if you want to eliminate that complexity. |
Describe the feature
Currently, the Download function implemented in the Transfer Managers Downloader accepts
io.WriteAt
. Due to this, after writing to a file or buffer, there is a need to create anio.Reader
.Frequently when working with files in Go, the
io.Reader
interface is commonly required. I believe that if the Downloader could directly produce anio.Reader
, it would significantly improve usability.Use Case
Decompressing archive files (e.g., tar, zip) stored in S3
io.Reader
interface. With this feature, it would be possible to directly load and decompress files from S3.Stream processing large files in environments with limited memory or storage
io.Reader
to process streams directly, it becomes possible to handle large files without using storage and with minimal memory consumption.Proposed Solution
The behavior of the AWS CLI's cp command closely aligns with my expectations. For instance, it can be used as follows:
aws s3 cp s3://BUCKET/key.tar.gz | tar zxf -
Internally, it appears to use the heap to sequentially output chunks from the beginning.
I've created code in my repository that operates in a similar manner using the current Downloader.
https://github.com/yacchi/s3-fast-reader
Other Information
No response
Acknowledgements
AWS Go SDK V2 Module Versions Used
github.com/aws/aws-sdk-go-v2 v1.20.1
github.com/aws/aws-sdk-go-v2/config v1.18.33
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.77
github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2
Go version used
1.20.4
The text was updated successfully, but these errors were encountered: