diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 14ca353eb13..1e9865b81cb 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -88,11 +88,7 @@ const ( ) // PrepareClickHouseConnection is used for unit testing -var ( - PrepareClickHouseConnection = func(input ClickHouseInput) (string, *sql.DB, error) { - return PrepareConnection(input) - } -) +var PrepareClickHouseConnection = prepareConnection type stopPayload struct { flushQueue bool @@ -380,7 +376,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco } } -func PrepareConnection(input ClickHouseInput) (string, *sql.DB, error) { +func prepareConnection(input ClickHouseInput) (string, *sql.DB, error) { dsn, err := input.GetDataSourceName() if err != nil { return "", nil, fmt.Errorf("error when parsing ClickHouse DSN: %v", err) diff --git a/pkg/flowaggregator/exporter/s3_test.go b/pkg/flowaggregator/exporter/s3_test.go index 427b5340f71..5b7ea3081b3 100644 --- a/pkg/flowaggregator/exporter/s3_test.go +++ b/pkg/flowaggregator/exporter/s3_test.go @@ -15,6 +15,7 @@ package exporter import ( + "context" "testing" "time" @@ -28,6 +29,13 @@ import ( ) func TestS3_UpdateOptions(t *testing.T) { + GetS3BucketRegionSaved := s3uploader.GetS3BucketRegion + s3uploader.GetS3BucketRegion = func(ctx context.Context, bucket string, regionHint string) (string, error) { + return "us-west-2", nil + } + defer func() { + s3uploader.GetS3BucketRegion = GetS3BucketRegionSaved + }() compress := true opt := &options.Options{ Config: &flowaggregator.FlowAggregatorConfig{ diff --git a/pkg/flowaggregator/s3uploader/s3uploader.go b/pkg/flowaggregator/s3uploader/s3uploader.go index 2dab3481e0d..2f6d98417ea 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader.go +++ b/pkg/flowaggregator/s3uploader/s3uploader.go @@ -40,6 +40,9 @@ const ( maxNumBuffersPendingUpload = 5 ) +// GetS3BucketRegion is used for unit testing +var GetS3BucketRegion = getBucketRegion + type stopPayload struct { flushQueue bool } @@ -100,14 +103,37 @@ func (u *S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3 return awsS3Uploader.Upload(ctx, input, opts...) } +// getBucketRegion determines the exact region in which the bucket is +// located. regionHint can be any region in the same partition as the one in +// which the bucket is located. +func getBucketRegion(ctx context.Context, bucket string, regionHint string) (string, error) { + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(regionHint)) + if err != nil { + return "", fmt.Errorf("unable to load AWS SDK config: %w", err) + + } + s3Client := s3.NewFromConfig(awsCfg) + bucketRegion, err := s3manager.GetBucketRegion(ctx, s3Client, bucket) + if err != nil { + return "", fmt.Errorf("unable to determine region for bucket '%s', make sure the bucket exists and set the region parameter appropriately: %w", bucket, err) + } + return bucketRegion, err +} + func NewS3UploadProcess(input S3Input, clusterUUID string) (*S3UploadProcess, error) { config := input.Config - cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(config.Region)) + region, err := GetS3BucketRegion(context.TODO(), config.BucketName, config.Region) + if err != nil { + return nil, err + } + klog.InfoS("S3 bucket region successfully determined for flow upload", "bucket", config.BucketName, "region", region) + awsCfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(region)) if err != nil { return nil, fmt.Errorf("error when loading AWS config: %w", err) } - awsS3Client := s3.NewFromConfig(cfg) + awsS3Client := s3.NewFromConfig(awsCfg) awsS3Uploader := s3manager.NewUploader(awsS3Client) + buf := &bytes.Buffer{} // #nosec G404: random number generator not used for security purposes nameRand := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -115,7 +141,7 @@ func NewS3UploadProcess(input S3Input, clusterUUID string) (*S3UploadProcess, er s3ExportProcess := &S3UploadProcess{ bucketName: config.BucketName, bucketPrefix: config.BucketPrefix, - region: config.Region, + region: region, compress: *config.Compress, maxRecordPerFile: config.MaxRecordsPerFile, uploadInterval: input.UploadInterval,