Skip to content

Commit

Permalink
[Flow Aggregator] Determine bucket region in S3 uploader (#4237)
Browse files Browse the repository at this point in the history
We determine the region programmatically in the S3 uploader, using the
region "hint" provided in the config.
Before this change, there was a mismatch between the description of the
"region" config parameter and its actual implementation in the S3
uploader.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Sep 20, 2022
1 parent 5ce44ea commit 4caa8c1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
8 changes: 2 additions & 6 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ const (
)

// PrepareClickHouseConnection is used for unit testing
var (
PrepareClickHouseConnection = func(input ClickHouseInput) (string, *sql.DB, error) {
return PrepareConnection(input)
}
)
var PrepareClickHouseConnection = prepareConnection

type stopPayload struct {
flushQueue bool
Expand Down Expand Up @@ -380,7 +376,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco
}
}

func PrepareConnection(input ClickHouseInput) (string, *sql.DB, error) {
func prepareConnection(input ClickHouseInput) (string, *sql.DB, error) {
dsn, err := input.GetDataSourceName()
if err != nil {
return "", nil, fmt.Errorf("error when parsing ClickHouse DSN: %v", err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/flowaggregator/exporter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exporter

import (
"context"
"testing"
"time"

Expand All @@ -28,6 +29,13 @@ import (
)

func TestS3_UpdateOptions(t *testing.T) {
GetS3BucketRegionSaved := s3uploader.GetS3BucketRegion
s3uploader.GetS3BucketRegion = func(ctx context.Context, bucket string, regionHint string) (string, error) {
return "us-west-2", nil
}
defer func() {
s3uploader.GetS3BucketRegion = GetS3BucketRegionSaved
}()
compress := true
opt := &options.Options{
Config: &flowaggregator.FlowAggregatorConfig{
Expand Down
32 changes: 29 additions & 3 deletions pkg/flowaggregator/s3uploader/s3uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
maxNumBuffersPendingUpload = 5
)

// GetS3BucketRegion is used for unit testing
var GetS3BucketRegion = getBucketRegion

type stopPayload struct {
flushQueue bool
}
Expand Down Expand Up @@ -100,22 +103,45 @@ func (u *S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3
return awsS3Uploader.Upload(ctx, input, opts...)
}

// getBucketRegion determines the exact region in which the bucket is
// located. regionHint can be any region in the same partition as the one in
// which the bucket is located.
func getBucketRegion(ctx context.Context, bucket string, regionHint string) (string, error) {
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(regionHint))
if err != nil {
return "", fmt.Errorf("unable to load AWS SDK config: %w", err)

}
s3Client := s3.NewFromConfig(awsCfg)
bucketRegion, err := s3manager.GetBucketRegion(ctx, s3Client, bucket)
if err != nil {
return "", fmt.Errorf("unable to determine region for bucket '%s', make sure the bucket exists and set the region parameter appropriately: %w", bucket, err)
}
return bucketRegion, err
}

func NewS3UploadProcess(input S3Input, clusterUUID string) (*S3UploadProcess, error) {
config := input.Config
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(config.Region))
region, err := GetS3BucketRegion(context.TODO(), config.BucketName, config.Region)
if err != nil {
return nil, err
}
klog.InfoS("S3 bucket region successfully determined for flow upload", "bucket", config.BucketName, "region", region)
awsCfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(region))
if err != nil {
return nil, fmt.Errorf("error when loading AWS config: %w", err)
}
awsS3Client := s3.NewFromConfig(cfg)
awsS3Client := s3.NewFromConfig(awsCfg)
awsS3Uploader := s3manager.NewUploader(awsS3Client)

buf := &bytes.Buffer{}
// #nosec G404: random number generator not used for security purposes
nameRand := rand.New(rand.NewSource(time.Now().UnixNano()))

s3ExportProcess := &S3UploadProcess{
bucketName: config.BucketName,
bucketPrefix: config.BucketPrefix,
region: config.Region,
region: region,
compress: *config.Compress,
maxRecordPerFile: config.MaxRecordsPerFile,
uploadInterval: input.UploadInterval,
Expand Down

0 comments on commit 4caa8c1

Please sign in to comment.