diff --git a/resources/services/s3/buckets.go b/resources/services/s3/buckets.go index 6b6b44170..424dea104 100644 --- a/resources/services/s3/buckets.go +++ b/resources/services/s3/buckets.go @@ -3,6 +3,7 @@ package s3 import ( "context" "encoding/json" + "sync" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -13,16 +14,18 @@ import ( "github.com/cloudquery/cq-provider-sdk/provider/schema" ) +// fetchS3BucketsPoolSize describes the amount of go routines that resolve the S3 buckets +const fetchS3BucketsPoolSize = 10 + func S3Buckets() *schema.Table { return &schema.Table{ - Name: "aws_s3_buckets", - Description: "An Amazon S3 bucket is a public cloud storage resource available in Amazon Web Services' (AWS) Simple Storage Service (S3)", - Resolver: fetchS3Buckets, - Multiplex: client.AccountMultiplex, - IgnoreError: client.IgnoreAccessDeniedServiceDisabled, - DeleteFilter: client.DeleteAccountFilter, - PostResourceResolver: resolveS3BucketsAttributes, - Options: schema.TableCreationOptions{PrimaryKeys: []string{"account_id", "name"}}, + Name: "aws_s3_buckets", + Description: "An Amazon S3 bucket is a public cloud storage resource available in Amazon Web Services' (AWS) Simple Storage Service (S3)", + Resolver: fetchS3Buckets, + Multiplex: client.AccountMultiplex, + IgnoreError: client.IgnoreAccessDeniedServiceDisabled, + DeleteFilter: client.DeleteAccountFilter, + Options: schema.TableCreationOptions{PrimaryKeys: []string{"account_id", "name"}}, Columns: []schema.Column{ { Name: "account_id", @@ -97,7 +100,6 @@ func S3Buckets() *schema.Table { Name: "replication_role", Description: "The Amazon Resource Name (ARN) of the AWS Identity and Access Management (IAM) role that Amazon S3 assumes when replicating objects", Type: schema.TypeString, - Resolver: schema.PathResolver("Role"), IgnoreInTests: true, }, { @@ -451,27 +453,62 @@ func S3Buckets() *schema.Table { // ==================================================================================================================== // Table Resolver Functions // ==================================================================================================================== + func fetchS3Buckets(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { svc := meta.(*client.Client).Services().S3 response, err := svc.ListBuckets(ctx, nil) if err != nil { return diag.WrapError(err) } - wb := make([]*WrappedBucket, len(response.Buckets)) - for i, b := range response.Buckets { - wb[i] = &WrappedBucket{b, nil, nil} - } - res <- wb - return nil + var wg sync.WaitGroup + buckets := make(chan types.Bucket) + errs := make(chan error) + for i := 0; i < fetchS3BucketsPoolSize; i++ { + wg.Add(1) + go fetchS3BucketsWorker(ctx, meta, buckets, errs, res, &wg) + } + go func() { + defer close(buckets) + for _, bucket := range response.Buckets { + select { + case <-ctx.Done(): + return + case buckets <- bucket: + } + } + }() + var diags diag.Diagnostics + done := make(chan struct{}) + go func() { + for err = range errs { + diags = diags.Add(err) + } + close(done) + }() + wg.Wait() + close(errs) + <-done + + return diags +} + +func fetchS3BucketsWorker(ctx context.Context, meta schema.ClientMeta, buckets <-chan types.Bucket, err chan<- error, res chan<- interface{}, wg *sync.WaitGroup) { + defer wg.Done() + for bucket := range buckets { + wb := &WrappedBucket{Bucket: bucket} + e := resolveS3BucketsAttributes(ctx, meta, wb) + res <- wb + err <- e + } } -func resolveS3BucketsAttributes(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource) error { + +func resolveS3BucketsAttributes(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket) error { log := meta.Logger() - r := resource.Item.(*WrappedBucket) - log.Debug("fetching bucket attributes", "bucket", aws.ToString(r.Name)) + log.Debug("fetching bucket attributes", "bucket", aws.ToString(resource.Name)) c := meta.(*client.Client) mgr := c.Services().S3Manager - output, err := mgr.GetBucketRegion(ctx, *r.Name) + output, err := mgr.GetBucketRegion(ctx, *resource.Name) if err != nil { if c.IsNotFoundError(err) { return nil @@ -483,37 +520,35 @@ func resolveS3BucketsAttributes(ctx context.Context, meta schema.ClientMeta, res // This is a weird corner case by AWS API https://github.com/aws/aws-sdk-net/issues/323#issuecomment-196584538 bucketRegion = output } - if err := resource.Set("region", bucketRegion); err != nil { - return err - } - if err := resolveBucketLogging(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + resource.Region = bucketRegion + if err = resolveBucketLogging(ctx, meta, resource, bucketRegion); err != nil { if c.IsNotFoundError(err) { return nil } return err } - if err := resolveBucketPolicy(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + if err = resolveBucketPolicy(ctx, meta, resource, bucketRegion); err != nil { return err } - if err := resolveBucketVersioning(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + if err = resolveBucketVersioning(ctx, meta, resource, bucketRegion); err != nil { return err } - if err := resolveBucketPublicAccessBlock(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + if err = resolveBucketPublicAccessBlock(ctx, meta, resource, bucketRegion); err != nil { return err } - if err := resolveBucketReplication(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + if err = resolveBucketReplication(ctx, meta, resource, bucketRegion); err != nil { return err } - if err := resolveBucketTagging(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + if err = resolveBucketTagging(ctx, meta, resource, bucketRegion); err != nil { return err } - if err := resolveBucketOwnershipControls(ctx, meta, resource, *r.Name, bucketRegion); err != nil { + if err = resolveBucketOwnershipControls(ctx, meta, resource, bucketRegion); err != nil { return err } @@ -527,6 +562,9 @@ func fetchS3BucketGrants(ctx context.Context, meta schema.ClientMeta, parent *sc options.Region = parent.Get("region").(string) }) if err != nil { + if client.IsAWSError(err, "NoSuchBucket") { + return nil + } return diag.WrapError(err) } res <- aclOutput.Grants @@ -540,7 +578,7 @@ func fetchS3BucketCorsRules(ctx context.Context, meta schema.ClientMeta, parent options.Region = parent.Get("region").(string) }) if err != nil { - if client.IsAWSError(err, "NoSuchCORSConfiguration") { + if client.IsAWSError(err, "NoSuchCORSConfiguration", "NoSuchBucket") { return nil } return err @@ -640,18 +678,30 @@ func resolveS3BucketLifecycleTransitions(ctx context.Context, meta schema.Client type WrappedBucket struct { types.Bucket - ReplicationRole *string - ReplicationRules []types.ReplicationRule + ReplicationRole *string + ReplicationRules []types.ReplicationRule + Region string + LoggingTargetBucket *string + LoggingTargetPrefix *string + Policy *string + VersioningStatus types.BucketVersioningStatus + VersioningMfaDelete types.MFADeleteStatus + BlockPublicAcls bool + BlockPublicPolicy bool + IgnorePublicAcls bool + RestrictPublicBuckets bool + Tags *string + OwnershipControls []string } -func resolveBucketLogging(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketLogging(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { svc := meta.(*client.Client).Services().S3 - loggingOutput, err := svc.GetBucketLogging(ctx, &s3.GetBucketLoggingInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + loggingOutput, err := svc.GetBucketLogging(ctx, &s3.GetBucketLoggingInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) if err != nil { if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetBucketLogging", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetBucketLogging", "bucket", resource.Name, "err", err) return nil } return err @@ -659,29 +709,25 @@ func resolveBucketLogging(ctx context.Context, meta schema.ClientMeta, resource if loggingOutput.LoggingEnabled == nil { return nil } - if err := resource.Set("logging_target_bucket", loggingOutput.LoggingEnabled.TargetBucket); err != nil { - return err - } - if err := resource.Set("logging_target_prefix", loggingOutput.LoggingEnabled.TargetPrefix); err != nil { - return err - } + resource.LoggingTargetBucket = loggingOutput.LoggingEnabled.TargetBucket + resource.LoggingTargetPrefix = loggingOutput.LoggingEnabled.TargetPrefix return nil } -func resolveBucketPolicy(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketPolicy(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { c := meta.(*client.Client) svc := c.Services().S3 - policyOutput, err := svc.GetBucketPolicy(ctx, &s3.GetBucketPolicyInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + policyOutput, err := svc.GetBucketPolicy(ctx, &s3.GetBucketPolicyInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) // check if we got an error but its access denied we can continue if err != nil { - // if we got an error and its not a NoSuchBucketError, return err + // if we got an error, and it's not a NoSuchBucketError, return err if client.IsAWSError(err, "NoSuchBucketPolicy") { return nil } if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetBucketPolicy", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetBucketPolicy", "bucket", resource.Name, "err", err) return nil } return err @@ -689,35 +735,32 @@ func resolveBucketPolicy(ctx context.Context, meta schema.ClientMeta, resource * if policyOutput == nil { return nil } - return resource.Set("policy", policyOutput.Policy) + resource.Policy = policyOutput.Policy + return nil } -func resolveBucketVersioning(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketVersioning(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { c := meta.(*client.Client) svc := c.Services().S3 - versioningOutput, err := svc.GetBucketVersioning(ctx, &s3.GetBucketVersioningInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + versioningOutput, err := svc.GetBucketVersioning(ctx, &s3.GetBucketVersioningInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) if err != nil { if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetBucketVersioning", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetBucketVersioning", "bucket", resource.Name, "err", err) return nil } return err } - if err := resource.Set("versioning_status", versioningOutput.Status); err != nil { - return err - } - if err := resource.Set("versioning_mfa_delete", versioningOutput.MFADelete); err != nil { - return err - } + resource.VersioningStatus = versioningOutput.Status + resource.VersioningMfaDelete = versioningOutput.MFADelete return nil } -func resolveBucketPublicAccessBlock(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketPublicAccessBlock(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { c := meta.(*client.Client) svc := c.Services().S3 - publicAccessOutput, err := svc.GetPublicAccessBlock(ctx, &s3.GetPublicAccessBlockInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + publicAccessOutput, err := svc.GetPublicAccessBlock(ctx, &s3.GetPublicAccessBlockInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) if err != nil { @@ -726,30 +769,22 @@ func resolveBucketPublicAccessBlock(ctx context.Context, meta schema.ClientMeta, return nil } if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetPublicAccessBlock", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetPublicAccessBlock", "bucket", resource.Name, "err", err) return nil } return err } - if err := resource.Set("block_public_acls", publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicAcls); err != nil { - return err - } - if err := resource.Set("block_public_policy", publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicPolicy); err != nil { - return err - } - if err := resource.Set("ignore_public_acls", publicAccessOutput.PublicAccessBlockConfiguration.IgnorePublicAcls); err != nil { - return err - } - if err := resource.Set("restrict_public_buckets", publicAccessOutput.PublicAccessBlockConfiguration.RestrictPublicBuckets); err != nil { - return err - } + resource.BlockPublicAcls = publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicAcls + resource.BlockPublicPolicy = publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicPolicy + resource.IgnorePublicAcls = publicAccessOutput.PublicAccessBlockConfiguration.IgnorePublicAcls + resource.RestrictPublicBuckets = publicAccessOutput.PublicAccessBlockConfiguration.RestrictPublicBuckets return nil } -func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { c := meta.(*client.Client) svc := c.Services().S3 - replicationOutput, err := svc.GetBucketReplication(ctx, &s3.GetBucketReplicationInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + replicationOutput, err := svc.GetBucketReplication(ctx, &s3.GetBucketReplicationInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) @@ -759,7 +794,7 @@ func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resou return nil } if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetBucketReplication", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetBucketReplication", "bucket", resource.Name, "err", err) return nil } return err @@ -767,18 +802,15 @@ func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resou if replicationOutput.ReplicationConfiguration == nil { return nil } - if err := resource.Set("replication_role", replicationOutput.ReplicationConfiguration.Role); err != nil { - return err - } - // We set this here for fetchReplicationRules to get and insert - resource.Item.(*WrappedBucket).ReplicationRules = replicationOutput.ReplicationConfiguration.Rules + resource.ReplicationRole = replicationOutput.ReplicationConfiguration.Role + resource.ReplicationRules = replicationOutput.ReplicationConfiguration.Rules return nil } -func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { c := meta.(*client.Client) svc := c.Services().S3 - taggingOutput, err := svc.GetBucketTagging(ctx, &s3.GetBucketTaggingInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + taggingOutput, err := svc.GetBucketTagging(ctx, &s3.GetBucketTaggingInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) if err != nil { @@ -787,7 +819,7 @@ func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource return nil } if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetBucketTagging", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetBucketTagging", "bucket", resource.Name, "err", err) return nil } return err @@ -799,14 +831,21 @@ func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource for _, t := range taggingOutput.TagSet { tags[*t.Key] = t.Value } - return resource.Set("tags", tags) + + b, err := json.Marshal(tags) + if err != nil { + return err + } + t := string(b) + resource.Tags = &t + return nil } -func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error { +func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error { c := meta.(*client.Client) svc := c.Services().S3 - getBucketOwnershipControlOutput, err := svc.GetBucketOwnershipControls(ctx, &s3.GetBucketOwnershipControlsInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) { + getBucketOwnershipControlOutput, err := svc.GetBucketOwnershipControls(ctx, &s3.GetBucketOwnershipControlsInput{Bucket: resource.Name}, func(options *s3.Options) { options.Region = bucketRegion }) @@ -817,7 +856,7 @@ func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta, } if client.IgnoreAccessDeniedServiceDisabled(err) { - meta.Logger().Warn("received access denied on GetBucketOwnershipControls", "bucket", bucketName, "err", err) + meta.Logger().Warn("received access denied on GetBucketOwnershipControls", "bucket", resource.Name, "err", err) return nil } @@ -840,5 +879,6 @@ func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta, stringArray = append(stringArray, string(ownershipControlRule.ObjectOwnership)) } - return resource.Set("ownership_controls", stringArray) + resource.OwnershipControls = stringArray + return nil }