-
Notifications
You must be signed in to change notification settings - Fork 643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
api-put-object-streaming: Removed unnecessary partBuf
copy to save allocations; no race detected.
#1673
Conversation
Signed-off-by: bwplotka <[email protected]>
Signed-off-by: bwplotka <[email protected]>
The race is basically a single file being Seeked at different locations in parallel - this is not supported by |
return UploadInfo{}, uploadRes.Error | ||
} | ||
|
||
// Update the totalUploadedSize. | ||
totalUploadedSize += uploadRes.Size | ||
// Store the parts to be completed in order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep the comments @bwplotka
// Store the parts to be completed in upload order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this comment was quite confusing we don't store things in order, we only sort it later. Happy to make it clear in this comment. 🤗
The reproducer is very simple package main
import (
"fmt"
"os"
"strconv"
"sync"
"github.com/minio/minio-go/v7"
)
var (
endpoint = "localhost:9001"
accessKeyID = "some_key"
secretAccessKey = "some_secret"
useSSL = false
bName = "test"
bLocation = ""
toUpload = "1GB_file.img"
workers = 5
)
// initializes MinIO client
func createMC() (*minio.Client, error) {
mc, err := minio.New(
endpoint,
accessKeyID,
secretAccessKey,
useSSL,
)
return mc, err
}
// creates new bucket
func createBucket(mc *minio.Client) error {
exists, err := mc.BucketExists(bName)
if err == nil && exists {
return nil
} else if err != nil {
return err
}
if err := mc.MakeBucket(bName, bLocation); err != nil {
return err
}
return nil
}
func chanPopulate(ch chan<- string) {
defer close(ch)
for i := 0; i < 20; i++ {
fmt.Printf("populating ch ID %d\n", i)
ch <- toUpload
}
}
func main() {
mc, err := createMC()
if err != nil {
fmt.Printf("failed to create MinIO client, %v\n", err)
os.Exit(1)
}
if err := createBucket(mc); err != nil {
fmt.Printf("failed to create bucket, %v\n", err)
os.Exit(1)
}
files := make(chan string, 5)
go chanPopulate(files)
var wg sync.WaitGroup
for i := 1; i < 6; i++ {
fmt.Printf("worker is starting ID %d\n", i)
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Printf("worker started ID %d\n", i)
for p := range files {
fInfo, err := os.Stat(p)
if err != nil {
fmt.Printf("failed to stat file, path %s, %v\n", p, err)
return
}
f, err := os.Open(p)
if err != nil {
fmt.Printf("failed to open file, path %s, %v\n", p, err)
return
}
n, err := mc.PutObject(
bName,
toUpload+strconv.Itoa(i),
f,
fInfo.Size(),
minio.PutObjectOptions{},
)
if err != nil {
fmt.Printf("failed to upload file, path %s, %v\n", p, err)
return
}
fmt.Printf("file uploaded, size '%d'\n", n)
}
}(i)
}
wg.Wait()
} This change in the PR is basically not doing anything other than making the entire upload operation serial @bwplotka - what we wanted to essentially have was to be able to upload - x parts in parallel which we cannot do in your PR because
Of course, this is why you won't see the race at all. Since the "length" of the stream must be known in S3 API there is no way to "stream", and we can't read random offsets and allow them to be "safely" accessed without serializing on the single file. From what I can see parallel uploads of multiple parts within an object never really worked properly without buffering because of the S3's length requirement - so simply using io.NewSectionReader() doesn't preserve that. |
Let me think about this more. |
Thanks for even considering my PR, let's unpack this. 🤗
That's incorrect, they are not blocked because the main goroutine actively consumes that into our slice: Benchmarks prove it (no regression in latency for this PR)
That's not the problem, there is concurrency here. I also tested against race on the exact commit message before your fix commit. There must be something different I do wrong if there is a race somewhere.
Again, non-buffering does not change anything here. Even with buffering, I could not reproduce the race you mentioned. Sorry for adding that extra complexity to the discussion. It's red herring here. The true source of this problem is indeed package file
import (
"crypto/rand"
"io"
"os"
"path/filepath"
"sync"
"testing"
"github.com/efficientgo/tools/core/pkg/errcapture"
"github.com/efficientgo/tools/core/pkg/testutil"
"github.com/pkg/errors"
)
func createTestInput(fn string, bytes int) (err error) {
if err := os.MkdirAll(filepath.Dir(fn), os.ModePerm); err != nil {
return err
}
f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return errors.Wrap(err, "open")
}
defer func() {
if err != nil {
errcapture.Do(&err, func() error { return os.Remove(fn) }, "remove failed file")
}
}()
b := make([]byte, 10*1024*1024)
for i := 0; i < bytes/len(b); i++ {
if _, err := rand.Read(b); err != nil {
return errors.Wrap(err, "read urandom")
}
if _, err := f.Write(b); err != nil {
return err
}
}
return f.Close()
}
const partSize int64 = 1 * 1024 * 1024
func readThreeSections(t *testing.T, f *os.File) ([]byte, []byte, []byte) {
s1 := io.NewSectionReader(f, 20, partSize)
s2 := io.NewSectionReader(f, 2*partSize, partSize)
s3 := io.NewSectionReader(f, 10*partSize, partSize)
s1read := make([]byte, partSize)
_, err := s1.Read(s1read)
testutil.Ok(t, err)
s3read := make([]byte, partSize)
_, err = s3.Read(s3read)
testutil.Ok(t, err)
s2read := make([]byte, partSize)
_, err = s2.Read(s2read)
testutil.Ok(t, err)
return s1read, s2read, s3read
}
func TestFile_SectionReader(t *testing.T) {
fn := filepath.Join(t.TempDir(), "test.file")
testutil.Ok(t, createTestInput(fn, 100*1024*1024))
f, err := os.Open(fn)
testutil.Ok(t, err)
s1read, s2read, s3read := readThreeSections(t, f)
testutil.Assert(t, bytes.Compare(s1read, s2read) != 0)
testutil.Assert(t, bytes.Compare(s2read, s3read) != 0)
testutil.Assert(t, bytes.Compare(s1read, s3read) != 0)
defer func() { testutil.Ok(t, f.Close()) }()
// We can reuse same file descriptor for new section reads - they are not touching "current offset".
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
s1, s2, s3 := readThreeSections(t, f)
testutil.Equals(t, s1read, s1)
testutil.Equals(t, s2read, s2)
testutil.Equals(t, s3read, s3)
}()
}
wg.Wait()
} Thanks for providing the repro you used back then, but does it actually trigger race warning on my PR? I will test it later, but my test is similar and it was fine. Interesting enough in my test I use exactly the same file descriptor which should trigger even more races if our code is racy. It proves to be fine though. |
Therefore assuming that it is, can lead to subtle bugs. |
Ack. Would you accept an explicit option to reuse reader in options then? (: |
what I would suggest instead of generic ReaderAt make that ReaderAt to be explicit *os.File - rest of them pay the penalty of being io.Reader instead since we can't reliably guarantee that concurrency is possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
reader *os.File, size int64, opts PutObjectOptions,
Just this change would provide the necessary guarantees
The actual issue because of a "retry" - I reproduced the problem even with this PR
You are not able to reproduce this because you are not seeing the issue with retrying that is To avoid that Seek we have to make sure that Seeking is not allowed if you are going to Instead, the Seek should be converted into a ReaderAt call again to avoid offset in parallel I reproduced it by taking the server up and down multiple times - this triggers the race. |
Ack. I manually injected fault to "executeMethod" to trigger the race, but unsuccessfully. Thanks for repro, it seems we know what we are looking for now then! |
Let me fix it in your PR and send a PR to your fork - we can then merge it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM and tested
Amazing 😱 You are the best all! <3 |
FYI: PR to remove fork thanos-io/thanos#5474 |
Hi and thanks for the great work so far. 👋🏽
Following our discussion in minio/mc#3376 (comment) I performed extended testing, benchmarking and profiling to figure out what's best for
minio-go
and the Thanos issue.Proving tests:
No matter how hard I tried, I could not reproduce ANY race the fix was made for. I even try to trigger retries, and all works fine for Linux
*os.File
. I tried to look for any repro tests on your side but none was provided to prove that there is indeed any race. Perhaps the race was detected on certain OS? (windows/MacOS?)The benchmarks show significant allocation problems while copying large files (PartSize 64MB). I also optimize naive chunk preallocations.
I know you reduced default PartSize to 16MB (from 128MB) to reduce default allocation spread, but this is still unacceptable and 16MB part size have other tradeoffs (latency). So in this PR I propose to revert that fix and optimize allocations. I would love to be e able to use minio-go in Thanos and related projects, but we need this path to be leaner. One of the components Thanos sidecar is usually deployed with limited mem (100-200 MB), so optimizations in this library matters.
Perhaps if you don't want to accept this PR because you are afraid of (IMO unproven) potential races, you would be happy with adding extra option to
Put
API likeReuseReader
?I am happy to be proven wrong that we indeed have raced here (e.g I did not test other OS than Linux), but we can't accept 60x allocation regression for unexisting race. Let me know what do you think, thanks! 🤗
This version is also proposed to be used in Thanos: thanos-io/thanos#5474
Fixes: thanos-io/thanos#3967
Fixes: thanos-io/thanos#5393