From 68c938834fc69f57e9b5efdbec46cc6cbcadb1e1 Mon Sep 17 00:00:00 2001 From: yahaa <1477765176@qq.com> Date: Sun, 1 Aug 2021 21:36:49 +0800 Subject: [PATCH] objstore : implement Baidu BOS Signed-off-by: yahaa <1477765176@qq.com> --- .circleci/config.yml | 2 +- CHANGELOG.md | 1 + Makefile | 6 +- docs/storage.md | 15 ++ go.mod | 1 + go.sum | 2 + pkg/objstore/bos/bos.go | 388 +++++++++++++++++++++++++++++ pkg/objstore/client/factory.go | 7 +- pkg/objstore/objstore_test.go | 8 +- pkg/objstore/objtesting/foreach.go | 16 +- pkg/objstore/testing.go | 4 + pkg/testutil/testutil.go | 4 + scripts/cfggen/main.go | 5 +- 13 files changed, 448 insertions(+), 11 deletions(-) create mode 100644 pkg/objstore/bos/bos.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 9eb2f3000c..75f9d47b21 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -35,7 +35,7 @@ jobs: - run: name: "Run unit tests." environment: - THANOS_TEST_OBJSTORE_SKIP: AZURE,COS,ALIYUNOSS + THANOS_TEST_OBJSTORE_SKIP: AZURE,COS,ALIYUNOSS,BOS # Variables for Swift testing. OS_AUTH_URL: http://127.0.0.1:5000/v2.0 OS_PASSWORD: s3cr3t diff --git a/CHANGELOG.md b/CHANGELOG.md index f6718244b9..47a958421e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs. - [#4462](https://github.com/thanos-io/thanos/pull/4462) UI: Add find overlap block UI - [#4469](https://github.com/thanos-io/thanos/pull/4469) Compact: Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting +- [#4506](https://github.com/thanos-io/thanos/pull/4506) `Baidu BOS` object storage, see [documents](docs/storage.md#baidu-bos) for further information. ### Fixed diff --git a/Makefile b/Makefile index 04ce3a8eca..24b5c64b0b 100644 --- a/Makefile +++ b/Makefile @@ -222,12 +222,12 @@ test: export THANOS_TEST_PROMETHEUS_PATHS= $(PROMETHEUS_ARRAY) test: export THANOS_TEST_ALERTMANAGER_PATH= $(ALERTMANAGER) test: check-git install-deps @echo ">> install thanos GOOPTS=${GOOPTS}" - @echo ">> running unit tests (without /test/e2e). Do export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS if you want to skip e2e tests against all real store buckets. Current value: ${THANOS_TEST_OBJSTORE_SKIP}" + @echo ">> running unit tests (without /test/e2e). Do export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS if you want to skip e2e tests against all real store buckets. Current value: ${THANOS_TEST_OBJSTORE_SKIP}" @go test $(shell go list ./... | grep -v /vendor/ | grep -v /test/e2e); .PHONY: test-local test-local: ## Runs test excluding tests for ALL object storage integrations. -test-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS +test-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS test-local: $(MAKE) test @@ -245,7 +245,7 @@ test-e2e: docker .PHONY: test-e2e-local test-e2e-local: ## Runs all thanos e2e tests locally. -test-e2e-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS +test-e2e-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS test-e2e-local: $(MAKE) test-e2e diff --git a/docs/storage.md b/docs/storage.md index fc974253f0..5fceae5dee 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -412,6 +412,21 @@ config: Use --objstore.config-file to reference to this configuration file. +#### Baidu BOS + +In order to use Baidu BOS object storage, you should apply for a Baidu Account and create an object storage bucket first. Refer to [Baidu Cloud Documents](https://cloud.baidu.com/doc/BOS/index.html) for more details. +To use Baidu BOS object storage, please specify the following yaml configuration file in `--objstore.config*` flag. + +```yaml mdox-exec="go run scripts/cfggen/main.go --name=bos.Config" +type: BOS +config: + bucket: "" + endpoint: "" + access_key: "" + secret_key: "" +``` + + #### Filesystem This storage type is used when user wants to store and access the bucket in the local filesystem. We treat filesystem the same way we would treat object storage, so all optimization for remote bucket applies even though, we might have the files locally. diff --git a/go.mod b/go.mod index a0af77debf..86716fb21c 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/NYTimes/gziphandler v1.1.1 github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible + github.com/baidubce/bce-sdk-go v0.9.81 github.com/blang/semver/v4 v4.0.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 diff --git a/go.sum b/go.sum index f43699cb40..2c97c5e3ff 100644 --- a/go.sum +++ b/go.sum @@ -185,6 +185,8 @@ github.com/aws/aws-sdk-go v1.38.3/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zK github.com/aws/aws-sdk-go v1.38.35 h1:7AlAO0FC+8nFjxiGKEmq0QLpiA8/XFr6eIxgRTwkdTg= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= +github.com/baidubce/bce-sdk-go v0.9.81 h1:n8KfThLG9fvGv3A+RtTt/jKhg/FPPRpo+iNnS2r+iPI= +github.com/baidubce/bce-sdk-go v0.9.81/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg= diff --git a/pkg/objstore/bos/bos.go b/pkg/objstore/bos/bos.go new file mode 100644 index 0000000000..8fa5f61889 --- /dev/null +++ b/pkg/objstore/bos/bos.go @@ -0,0 +1,388 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package bos + +import ( + "context" + "fmt" + "io" + "math" + "math/rand" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/baidubce/bce-sdk-go/bce" + "github.com/baidubce/bce-sdk-go/services/bos" + "github.com/baidubce/bce-sdk-go/services/bos/api" + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/thanos/pkg/objstore" +) + +// partSize 128MB. +const partSize = 1024 * 1024 * 128 + +// Bucket implements the store.Bucket interface against bos-compatible(Baidu Object Storage) APIs. +type Bucket struct { + logger log.Logger + client *bos.Client + name string +} + +// Config encapsulates the necessary config values to instantiate an bos client. +type Config struct { + Bucket string `yaml:"bucket"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key"` + SecretKey string `yaml:"secret_key"` +} + +func (conf *Config) validate() error { + if conf.Bucket == "" || + conf.Endpoint == "" || + conf.AccessKey == "" || + conf.SecretKey == "" { + return errors.New("insufficient BOS configuration information") + } + + return nil +} + +// parseConfig unmarshal a buffer into a Config with default HTTPConfig values. +func parseConfig(conf []byte) (Config, error) { + config := Config{} + if err := yaml.Unmarshal(conf, &config); err != nil { + return Config{}, err + } + + return config, nil +} + +// NewBucket new bos bucket. +func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + config, err := parseConfig(conf) + if err != nil { + return nil, errors.Wrap(err, "parsing BOS configuration") + } + + if err := config.validate(); err != nil { + return nil, errors.Wrap(err, "validating BOS configuration") + } + + client, err := bos.NewClient(config.AccessKey, config.SecretKey, config.Endpoint) + if err != nil { + return nil, errors.Wrap(err, "creating BOS client") + } + + client.Config.UserAgent = fmt.Sprintf("thanos-%s", component) + + bkt := &Bucket{ + logger: logger, + client: client, + name: config.Bucket, + } + return bkt, nil +} + +// Name returns the bucket name for the provider. +func (b *Bucket) Name() string { + return b.name +} + +// Delete removes the object with the given name. +func (b *Bucket) Delete(_ context.Context, name string) error { + return b.client.DeleteObject(b.name, name) +} + +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { + size, err := objstore.TryToGetSize(r) + if err != nil { + return errors.Wrapf(err, "getting size of %s", name) + } + + partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize + if partNums == 0 { + body, err := bce.NewBodyFromSizedReader(r, lastSlice) + if err != nil { + return errors.Wrapf(err, "failed to create SizedReader for %s", name) + } + + if _, err := b.client.PutObject(b.name, name, body, nil); err != nil { + return errors.Wrapf(err, "failed to upload %s", name) + } + + return nil + } + + result, err := b.client.BasicInitiateMultipartUpload(b.name, name) + if err != nil { + return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name) + } + + uploadEveryPart := func(partSize int64, part int, uploadId string) (string, error) { + body, err := bce.NewBodyFromSizedReader(r, partSize) + if err != nil { + return "", err + } + + etag, err := b.client.UploadPart(b.name, name, uploadId, part, body, nil) + if err != nil { + if err := b.client.AbortMultipartUpload(b.name, name, uploadId); err != nil { + return etag, err + } + return etag, err + } + return etag, nil + } + + var parts []api.UploadInfoType + + for part := 1; part <= partNums; part++ { + etag, err := uploadEveryPart(partSize, part, result.UploadId) + if err != nil { + return errors.Wrapf(err, "failed to upload part %d for %s", part, name) + } + parts = append(parts, api.UploadInfoType{PartNumber: part, ETag: etag}) + } + + if lastSlice != 0 { + etag, err := uploadEveryPart(lastSlice, partNums+1, result.UploadId) + if err != nil { + return errors.Wrapf(err, "failed to upload the last part for %s", name) + } + parts = append(parts, api.UploadInfoType{PartNumber: partNums + 1, ETag: etag}) + } + + if _, err := b.client.CompleteMultipartUploadFromStruct(b.name, name, result.UploadId, &api.CompleteMultipartUploadArgs{Parts: parts}); err != nil { + return errors.Wrapf(err, "failed to set %s upload completed", name) + } + return nil +} + +// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt ...objstore.IterOption) error { + if dir != "" { + dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim + } + + delimiter := objstore.DirDelim + + if objstore.ApplyIterOptions(opt...).Recursive { + delimiter = "" + } + + var marker string + for { + if err := ctx.Err(); err != nil { + return err + } + + objects, err := b.client.ListObjects(b.name, &api.ListObjectsArgs{ + Delimiter: delimiter, + Marker: marker, + MaxKeys: 1000, + Prefix: dir, + }) + if err != nil { + return err + } + + marker = objects.NextMarker + for _, object := range objects.Contents { + if err := f(object.Key); err != nil { + return err + } + } + + for _, object := range objects.CommonPrefixes { + if err := f(object.Prefix); err != nil { + return err + } + } + if !objects.IsTruncated { + break + } + } + return nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.getRange(ctx, b.name, name, 0, -1) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.getRange(ctx, b.name, name, off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *Bucket) Exists(_ context.Context, name string) (bool, error) { + _, err := b.client.GetObjectMeta(b.name, name) + if err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrapf(err, "getting object metadata of %s", name) + } + return true, nil +} + +func (b *Bucket) Close() error { + return nil +} + +// ObjectSize returns the size of the specified object. +func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { + objMeta, err := b.client.GetObjectMeta(b.name, name) + if err != nil { + return 0, err + } + return uint64(objMeta.ContentLength), nil +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { + objMeta, err := b.client.GetObjectMeta(b.name, name) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrapf(err, "gettting objectmeta of %s", name) + } + + lastModified, err := time.Parse(time.RFC1123, objMeta.LastModified) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: objMeta.ContentLength, + LastModified: lastModified, + }, nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + switch bosErr := errors.Cause(err).(type) { + case *bce.BceServiceError: + if bosErr.StatusCode == http.StatusNotFound || bosErr.Code == "NoSuchKey" { + return true + } + } + return false +} + +func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off, length int64) (io.ReadCloser, error) { + if len(objectKey) == 0 { + return nil, errors.Errorf("given object name should not empty") + } + + ranges := []int64{off} + if length != -1 { + ranges = append(ranges, off+length-1) + } + + obj, err := b.client.GetObject(bucketName, objectKey, map[string]string{}, ranges...) + if err != nil { + return nil, err + } + + return obj.Body, nil +} + +func configFromEnv() Config { + c := Config{ + Bucket: os.Getenv("BOS_BUCKET"), + Endpoint: os.Getenv("BOS_ENDPOINT"), + AccessKey: os.Getenv("BOS_ACCESS_KEY"), + SecretKey: os.Getenv("BOS_SECRET_KEY"), + } + return c +} + +// NewTestBucket creates test bkt client that before returning creates temporary bucket. +// In a close function it empties and deletes the bucket. +func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { + c := configFromEnv() + if err := validateForTest(c); err != nil { + return nil, nil, err + } + + if c.Bucket != "" { + if os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "" { + return nil, nil, errors.New("BOS_BUCKET is defined. Normally this tests will create temporary bucket " + + "and delete it after test. Unset BOS_BUCKET env variable to use default logic. If you really want to run " + + "tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true. WARNING: That bucket " + + "needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " + + "to safety (accidentally pointing prod bucket for test) as well as BOS not being fully strong consistent.") + } + + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + if err != nil { + return nil, nil, err + } + + if err := b.Iter(context.Background(), "", func(f string) error { + return errors.Errorf("bucket %s is not empty", c.Bucket) + }); err != nil { + return nil, nil, errors.Wrapf(err, "checking bucket %s", c.Bucket) + } + + t.Log("WARNING. Reusing", c.Bucket, "BOS bucket for BOS tests. Manual cleanup afterwards is required") + return b, func() {}, nil + } + + src := rand.NewSource(time.Now().UnixNano()) + tmpBucketName := strings.Replace(fmt.Sprintf("test_%x", src.Int63()), "_", "-", -1) + + if len(tmpBucketName) >= 31 { + tmpBucketName = tmpBucketName[:31] + } + + c.Bucket = tmpBucketName + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + if err != nil { + return nil, nil, err + } + + if _, err := b.client.PutBucket(b.name); err != nil { + return nil, nil, err + } + + t.Log("created temporary BOS bucket for BOS tests with name", tmpBucketName) + return b, func() { + objstore.EmptyBucket(t, context.Background(), b) + if err := b.client.DeleteBucket(b.name); err != nil { + t.Logf("deleting bucket %s failed: %s", tmpBucketName, err) + } + }, nil +} + +func validateForTest(conf Config) error { + if conf.Endpoint == "" || + conf.AccessKey == "" || + conf.SecretKey == "" { + return errors.New("insufficient BOS configuration information") + } + return nil +} diff --git a/pkg/objstore/client/factory.go b/pkg/objstore/client/factory.go index 5c037bc0ed..97a3e47750 100644 --- a/pkg/objstore/client/factory.go +++ b/pkg/objstore/client/factory.go @@ -12,15 +12,17 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + yaml "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/azure" + "github.com/thanos-io/thanos/pkg/objstore/bos" "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/filesystem" "github.com/thanos-io/thanos/pkg/objstore/gcs" "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" - yaml "gopkg.in/yaml.v2" ) type ObjProvider string @@ -33,6 +35,7 @@ const ( SWIFT ObjProvider = "SWIFT" COS ObjProvider = "COS" ALIYUNOSS ObjProvider = "ALIYUNOSS" + BOS ObjProvider = "BOS" ) type BucketConfig struct { @@ -70,6 +73,8 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe bucket, err = oss.NewBucket(logger, config, component) case string(FILESYSTEM): bucket, err = filesystem.NewBucketFromConfig(config) + case string(BOS): + bucket, err = bos.NewBucket(logger, config, component) default: return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type) } diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index d136e4ab7d..acd46a93f5 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -26,8 +26,8 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet))) testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange))) testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists))) - testutil.Equals(t, float64(8), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload))) - testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete))) + testutil.Equals(t, float64(9), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload))) + testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete))) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter))) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes))) @@ -49,8 +49,8 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet))) testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange))) testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists))) - testutil.Equals(t, float64(16), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload))) - testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete))) + testutil.Equals(t, float64(18), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload))) + testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete))) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter))) // Not expected not found error here. diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go index 3cc54a27e0..6d1cad859f 100644 --- a/pkg/objstore/objtesting/foreach.go +++ b/pkg/objstore/objtesting/foreach.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + "github.com/thanos-io/thanos/pkg/objstore/bos" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/filesystem" @@ -23,7 +24,7 @@ import ( ) // IsObjStoreSkipped returns true if given provider ID is found in THANOS_TEST_OBJSTORE_SKIP array delimited by comma e.g: -// THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS. +// THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS. func IsObjStoreSkipped(t *testing.T, provider client.ObjProvider) bool { if e, ok := os.LookupEnv("THANOS_TEST_OBJSTORE_SKIP"); ok { obstores := strings.Split(e, ",") @@ -148,4 +149,17 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) testFn(t, bkt) }) } + + // Optional BOS. + if !IsObjStoreSkipped(t, client.BOS) { + t.Run("Baidu BOS", func(t *testing.T) { + bkt, closeFn, err := bos.NewTestBucket(t) + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() + + testFn(t, bkt) + }) + } } diff --git a/pkg/objstore/testing.go b/pkg/objstore/testing.go index 897772a9d5..c76552c60a 100644 --- a/pkg/objstore/testing.go +++ b/pkg/objstore/testing.go @@ -4,6 +4,7 @@ package objstore import ( + "bytes" "context" "fmt" "io/ioutil" @@ -245,4 +246,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { sort.Strings(expected) sort.Strings(seen) testutil.Equals(t, expected, seen) + + testutil.Ok(t, bkt.Upload(ctx, "obj_6.som", bytes.NewReader(make([]byte, 1024*1024*200)))) + testutil.Ok(t, bkt.Delete(ctx, "obj_6.som")) } diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 9668a2f03f..9a649436ea 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -181,6 +181,8 @@ func TolerantVerifyLeakMain(m *testing.M) { // https://github.com/kubernetes/klog/blob/c85d02d1c76a9ebafa81eb6d35c980734f2c4727/klog.go#L417 goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"), goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"), + // https://github.com/baidubce/bce-sdk-go/blob/9a8c1139e6a3ad23080b9b8c51dec88df8ce3cda/util/log/logger.go#L359 + goleak.IgnoreTopFunction("github.com/baidubce/bce-sdk-go/util/log.NewLogger.func1"), ) } @@ -193,6 +195,8 @@ func TolerantVerifyLeak(t *testing.T) { // https://github.com/kubernetes/klog/blob/c85d02d1c76a9ebafa81eb6d35c980734f2c4727/klog.go#L417 goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"), goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"), + // https://github.com/baidubce/bce-sdk-go/blob/9a8c1139e6a3ad23080b9b8c51dec88df8ce3cda/util/log/logger.go#L359 + goleak.IgnoreTopFunction("github.com/baidubce/bce-sdk-go/util/log.NewLogger.func1"), ) } diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 4388802c92..e58b629670 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -15,10 +15,12 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/query" "gopkg.in/alecthomas/kingpin.v2" "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/objstore/bos" + "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/cacheutil" http_util "github.com/thanos-io/thanos/pkg/http" @@ -52,6 +54,7 @@ var ( client.COS: cos.DefaultConfig, client.ALIYUNOSS: oss.Config{}, client.FILESYSTEM: filesystem.Config{}, + client.BOS: bos.Config{}, } tracingConfigs = map[trclient.TracingProvider]interface{}{