Skip to content

Commit

Permalink
added roundtripper wrapper as param
Browse files Browse the repository at this point in the history
Signed-off-by: milinddethe15 <[email protected]>
  • Loading branch information
milinddethe15 committed Oct 21, 2024
1 parent 5f04b8b commit 7e645b1
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 87 deletions.
16 changes: 8 additions & 8 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand All @@ -65,23 +65,23 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt h
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, wrapRoundtripper)
case string(S3):
bucket, err = s3.NewBucket(logger, config, component, rt)
bucket, err = s3.NewBucket(logger, config, component, wrapRoundtripper)
case string(AZURE):
bucket, err = azure.NewBucket(logger, config, component, rt)
bucket, err = azure.NewBucket(logger, config, component, wrapRoundtripper)
case string(SWIFT):
bucket, err = swift.NewContainer(logger, config, rt)
bucket, err = swift.NewContainer(logger, config, wrapRoundtripper)
case string(COS):
bucket, err = cos.NewBucket(logger, config, component, rt)
bucket, err = cos.NewBucket(logger, config, component, wrapRoundtripper)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component, rt)
bucket, err = oss.NewBucket(logger, config, component, wrapRoundtripper)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
bucket, err = oci.NewBucket(logger, config, rt)
bucket, err = oci.NewBucket(logger, config, wrapRoundtripper)
case string(OBS):
bucket, err = obs.NewBucket(logger, config)
default:
Expand Down
11 changes: 10 additions & 1 deletion errutil/rt_error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package errutil

import "net/http"
import (
"errors"

Check failure on line 4 in errutil/rt_error.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

package "errors" shouldn't be imported, suggested: "github.com/pkg/errors"
"net/http"
)

var Rt_err = errors.New("RoundTripper error")

// ErrorRoundTripper is a custom RoundTripper that always returns an error.
type ErrorRoundTripper struct {
Expand All @@ -10,3 +15,7 @@ type ErrorRoundTripper struct {
func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return nil, ert.Err
}

func WrapRoundtripper(rt http.RoundTripper) http.RoundTripper {
return &ErrorRoundTripper{Err: Rt_err}
}
11 changes: 4 additions & 7 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Bucket struct {
}

// NewBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, azureConfig []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
conf, err := parseConfig(azureConfig)
if err != nil {
Expand All @@ -155,19 +155,16 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component, rt)
return NewBucketWithConfig(logger, conf, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if rt != nil {
conf.HTTPConfig.Transport = rt
}
func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if err := conf.validate(); err != nil {
return nil, err
}

containerClient, err := getContainerClient(conf)
containerClient, err := getContainerClient(conf, wrapRoundtripper)
if err != nil {
return nil, err
}
Expand Down
6 changes: 2 additions & 4 deletions providers/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,9 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
cfg, err := parseConfig(validConfig)
testutil.Ok(t, err)

rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt)
_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", errutil.WrapRoundtripper)

// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errors.Is(err, errutil.Rt_err), "Expected RoundTripper error, got: %v", err)
}
5 changes: 4 additions & 1 deletion providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
func getContainerClient(conf Config, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*container.Client, error) {
var rt http.RoundTripper
rt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
Expand All @@ -28,6 +28,9 @@ func getContainerClient(conf Config) (*container.Client, error) {
if conf.HTTPConfig.Transport != nil {
rt = conf.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}
opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
Expand Down
19 changes: 11 additions & 8 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -104,11 +104,11 @@ func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTr
if err != nil {
return nil, errors.Wrap(err, "parsing cos configuration")
}
return NewBucketWithConfig(logger, config, component, rt)
return NewBucketWithConfig(logger, config, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -127,19 +127,22 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, rt
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
var tpt http.RoundTripper
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
var rt http.RoundTripper
rt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, err
}
if rt != nil {
tpt = rt
if config.HTTPConfig.Transport != nil {
rt = config.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
SecretKey: config.SecretKey,
Transport: tpt,
Transport: rt,
},
})

Expand Down
5 changes: 2 additions & 3 deletions providers/cos/cos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,11 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
SecretId: "sid",
SecretKey: "skey",
}
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt)
bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", errutil.WrapRoundtripper)
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "Test")
// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errors.Is(err, errutil.Rt_err), "Expected RoundTripper error, got: %v", err)
}
17 changes: 9 additions & 8 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,20 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component, rt)
return NewBucketWithConfig(ctx, logger, config, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
if rt != nil {
gc.HTTPConfig.Transport = rt
}

var opts []option.ClientOption

// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
Expand All @@ -112,7 +110,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp

if !gc.UseGRPC {
var err error
opts, err = appendHttpOptions(gc, opts)
opts, err = appendHttpOptions(gc, opts, wrapRoundtripper)
if err != nil {
return nil, err
}
Expand All @@ -121,7 +119,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
return newBucket(ctx, logger, gc, opts)
}

func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOption, error) {
func appendHttpOptions(gc Config, opts []option.ClientOption, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) ([]option.ClientOption, error) {
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
Expand All @@ -132,6 +130,9 @@ func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOp
if gc.HTTPConfig.Transport != nil {
rt = gc.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}

// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
// htransport.NewTransport namely the scopes that are then used for OAth authentication. So to build our own
Expand Down
5 changes: 2 additions & 3 deletions providers/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ http_config:
}

func TestNewBucketWithErrorRoundTripper(t *testing.T) {
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}
cfg := Config{
Bucket: "test-bucket",
ServiceAccount: "",
Expand All @@ -174,9 +173,9 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
err = os.Setenv("STORAGE_EMULATOR_HOST", svr.Addr)
testutil.Ok(t, err)

bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", rt)
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", errutil.WrapRoundtripper)
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "test-bucket")
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errors.Is(err, errutil.Rt_err), "Expected RoundTripper error, got: %v", err)
}
15 changes: 9 additions & 6 deletions providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (b *Bucket) deleteBucket(ctx context.Context) (err error) {
}

// NewBucket returns a new Bucket using the provided oci config values.
func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, ociConfig []byte, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new oci bucket connection")
var config = DefaultConfig
var configurationProvider common.ConfigurationProvider
Expand Down Expand Up @@ -344,13 +344,16 @@ func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Buck
if err != nil {
return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations")
}

config.HTTPConfig.Transport = CustomTransport(config)
if rt != nil {
config.HTTPConfig.Transport = rt
var rt http.RoundTripper
rt = CustomTransport(config)
if config.HTTPConfig.Transport != nil {
rt = config.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}
httpClient := http.Client{
Transport: config.HTTPConfig.Transport,
Transport: rt,
Timeout: config.HTTPConfig.ClientTimeout,
}
client.HTTPClient = &httpClient
Expand Down
6 changes: 2 additions & 4 deletions providers/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ G6aFKaqQfOXKCyWoUiVknQJAXrlgySFci/2ueKlIE1QqIiLSZ8V8OlpFLRnb1pzI
ociConfig, err := yaml.Marshal(config)
testutil.Ok(t, err)

rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

_, err = NewBucket(log.NewNopLogger(), ociConfig, rt)
_, err = NewBucket(log.NewNopLogger(), ociConfig, errutil.WrapRoundtripper)
// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errors.Is(err, errutil.Rt_err), "Expected RoundTripper error, got: %v", err)
}
23 changes: 15 additions & 8 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore/clientutil"
"github.com/thanos-io/objstore/exthttp"

"github.com/thanos-io/objstore"
)
Expand Down Expand Up @@ -159,26 +160,32 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt
}

// NewBucket returns a new Bucket using the provided oss config values.
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
var config Config
if err := yaml.Unmarshal(conf, &config); err != nil {
return nil, errors.Wrap(err, "parse aliyun oss config file failed")
}
return NewBucketWithConfig(logger, config, component, rt)
return NewBucketWithConfig(logger, config, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket using the provided oss config struct.
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if err := validate(config); err != nil {
return nil, err
}
client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret)
if rt != nil {
clientOption := func(client *alioss.Client) {
client.HTTPClient = &http.Client{Transport: rt}
var clientOptions []alioss.ClientOption
if wrapRoundtripper != nil {
rt, err := exthttp.DefaultTransport(exthttp.DefaultHTTPConfig)
if err != nil {
return nil, err
}
client, err = alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret, clientOption)
clientOptions = append(clientOptions, func(client *alioss.Client) {
client.HTTPClient = &http.Client{
Transport: wrapRoundtripper(rt),
}
})
}
client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret, clientOptions...)
if err != nil {
return nil, errors.Wrap(err, "create aliyun oss client failed")
}
Expand Down
5 changes: 2 additions & 3 deletions providers/oss/oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
AccessKeySecret: "123",
Bucket: "test",
}
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt)
bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", errutil.WrapRoundtripper)
// We expect an error from the RoundTripper
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "test")
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errors.Is(err, errutil.Rt_err), "Expected RoundTripper error, got: %v", err)
}
Loading

0 comments on commit 7e645b1

Please sign in to comment.