Skip to content

Commit

Permalink
[Flow Aggregator] Determine bucket region in S3 uploader (antrea-io#4237
Browse files Browse the repository at this point in the history
)

We determine the region programmatically in the S3 uploader, using the
region "hint" provided in the config.
Before this change, there was a mismatch between the description of the
"region" config parameter and its actual implementation in the S3
uploader.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored and heanlan committed Mar 29, 2023
1 parent fb88a1f commit d225a4b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
8 changes: 2 additions & 6 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ const (
)

// PrepareClickHouseConnection is used for unit testing
var (
PrepareClickHouseConnection = func(input ClickHouseInput) (string, *sql.DB, error) {
return PrepareConnection(input)
}
)
var PrepareClickHouseConnection = prepareConnection

type stopPayload struct {
flushQueue bool
Expand Down Expand Up @@ -380,7 +376,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco
}
}

func PrepareConnection(input ClickHouseInput) (string, *sql.DB, error) {
func prepareConnection(input ClickHouseInput) (string, *sql.DB, error) {
dsn, err := input.GetDataSourceName()
if err != nil {
return "", nil, fmt.Errorf("error when parsing ClickHouse DSN: %v", err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/flowaggregator/exporter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exporter

import (
"context"
"testing"
"time"

Expand All @@ -28,6 +29,13 @@ import (
)

func TestS3_UpdateOptions(t *testing.T) {
GetS3BucketRegionSaved := s3uploader.GetS3BucketRegion
s3uploader.GetS3BucketRegion = func(ctx context.Context, bucket string, regionHint string) (string, error) {
return "us-west-2", nil
}
defer func() {
s3uploader.GetS3BucketRegion = GetS3BucketRegionSaved
}()
compress := true
opt := &options.Options{
Config: &flowaggregator.FlowAggregatorConfig{
Expand Down
32 changes: 29 additions & 3 deletions pkg/flowaggregator/s3uploader/s3uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
maxNumBuffersPendingUpload = 5
)

// GetS3BucketRegion is used for unit testing
var GetS3BucketRegion = getBucketRegion

type stopPayload struct {
flushQueue bool
}
Expand Down Expand Up @@ -100,22 +103,45 @@ func (u *S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3
return awsS3Uploader.Upload(ctx, input, opts...)
}

// getBucketRegion determines the exact region in which the bucket is
// located. regionHint can be any region in the same partition as the one in
// which the bucket is located.
func getBucketRegion(ctx context.Context, bucket string, regionHint string) (string, error) {
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(regionHint))
if err != nil {
return "", fmt.Errorf("unable to load AWS SDK config: %w", err)

}
s3Client := s3.NewFromConfig(awsCfg)
bucketRegion, err := s3manager.GetBucketRegion(ctx, s3Client, bucket)
if err != nil {
return "", fmt.Errorf("unable to determine region for bucket '%s', make sure the bucket exists and set the region parameter appropriately: %w", bucket, err)
}
return bucketRegion, err
}

func NewS3UploadProcess(input S3Input, clusterUUID string) (*S3UploadProcess, error) {
config := input.Config
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(config.Region))
region, err := GetS3BucketRegion(context.TODO(), config.BucketName, config.Region)
if err != nil {
return nil, err
}
klog.InfoS("S3 bucket region successfully determined for flow upload", "bucket", config.BucketName, "region", region)
awsCfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(region))
if err != nil {
return nil, fmt.Errorf("error when loading AWS config: %w", err)
}
awsS3Client := s3.NewFromConfig(cfg)
awsS3Client := s3.NewFromConfig(awsCfg)
awsS3Uploader := s3manager.NewUploader(awsS3Client)

buf := &bytes.Buffer{}
// #nosec G404: random number generator not used for security purposes
nameRand := rand.New(rand.NewSource(time.Now().UnixNano()))

s3ExportProcess := &S3UploadProcess{
bucketName: config.BucketName,
bucketPrefix: config.BucketPrefix,
region: config.Region,
region: region,
compress: *config.Compress,
maxRecordPerFile: config.MaxRecordsPerFile,
uploadInterval: input.UploadInterval,
Expand Down

0 comments on commit d225a4b

Please sign in to comment.