Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

feat: Generic list and detail #1000

Merged
merged 23 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/aws/smithy-go"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/semaphore"
)

type AWSService string
Expand All @@ -33,6 +35,12 @@ type SupportedServiceRegionsData struct {
regionVsPartition map[string]string
}

// ListResolver is responsible for iterating through entire list of resources that should be grabbed (if API is paginated). It should send list of items via the `resultsChan` so that the DetailResolver can grab the details of each item. All errors should be sent to the error channel.
type ListResolverFunc func(ctx context.Context, meta schema.ClientMeta, detailChan chan<- interface{}) error

// DetailResolveFunc is responsible for grabbing any and all metadata for a resource. All errors should be sent to the error channel.
type DetailResolverFunc func(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, summary interface{})

const (
ApigatewayService AWSService = "apigateway"
Athena AWSService = "athena"
Expand All @@ -51,6 +59,8 @@ const (
WorkspacesService AWSService = "workspaces"
)

const MAX_GOROUTINES = 10

const (
PartitionServiceRegionFile = "data/partition_service_region.json"
defaultPartition = "aws"
Expand Down Expand Up @@ -377,3 +387,46 @@ func TagsToMap(tagSlice interface{}) map[string]string {
TagsIntoMap(tagSlice, ret)
return ret
}

func ListAndDetailResolver(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}, list ListResolverFunc, details DetailResolverFunc) error {
var diags diag.Diagnostics

errorChan := make(chan error)
detailChan := make(chan interface{})
// Channel that will communicate with goroutine that is aggregating the errors
done := make(chan struct{})
go func() {
defer close(done)
for detailError := range errorChan {
diags = diags.Add(diag.FromError(detailError, diag.RESOLVING))
}
}()
sem := semaphore.NewWeighted(int64(MAX_GOROUTINES))

go func() {
defer close(errorChan)
for item := range detailChan {
if err := sem.Acquire(ctx, 1); err != nil {
continue
}
func(summary interface{}) {
defer sem.Release(1)
details(ctx, meta, res, errorChan, summary)
}(item)
}
}()

err := list(ctx, meta, detailChan)
close(detailChan)
if err != nil {
return diag.WrapError(err)
}

// All items will be attempted to be fetched, and all errors will be aggregated
<-done

if diags.HasDiags() {
return diags
}
return nil
}
47 changes: 19 additions & 28 deletions resources/services/athena/work_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import (
"github.com/cloudquery/cq-provider-aws/client"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

//go:generate cq-gen --resource work_groups --config gen.hcl --output .
func WorkGroups() *schema.Table {
return &schema.Table{
Name: "aws_athena_work_groups",
Description: "A workgroup, which contains a name, description, creation time, state, and other configuration, listed under WorkGroup$Configuration",
Resolver: fetchAthenaWorkGroups,
Name: "aws_athena_work_groups",
Description: "A workgroup, which contains a name, description, creation time, state, and other configuration, listed under WorkGroup$Configuration",
Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listWorkGroups, workGroupDetail))
},
Multiplex: client.ServiceAccountRegionMultiplexer("athena"),
IgnoreError: client.IgnoreCommonErrors,
DeleteFilter: client.DeleteAccountRegionFilter,
Expand Down Expand Up @@ -411,41 +411,30 @@ func WorkGroups() *schema.Table {
// Table Resolver Functions
// ====================================================================================================================

func fetchAthenaWorkGroups(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
func listWorkGroups(ctx context.Context, meta schema.ClientMeta, detailChan chan<- interface{}) error {
c := meta.(*client.Client)
svc := c.Services().Athena
input := athena.ListWorkGroupsInput{}
var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES))
for {
response, err := svc.ListWorkGroups(ctx, &input, func(options *athena.Options) {
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
}
errs, ctx := errgroup.WithContext(ctx)
for _, d := range response.WorkGroups {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary types.WorkGroupSummary) {
errs.Go(func() error {
defer sem.Release(1)
return fetchWorkGroup(ctx, res, c, summary)
})
}(d)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
for _, item := range response.WorkGroups {
detailChan <- item
}

if aws.ToString(response.NextToken) == "" {
break
}
input.NextToken = response.NextToken
}

return nil
}

func ResolveAthenaWorkGroupArn(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
cl := meta.(*client.Client)
dc := resource.Item.(types.WorkGroup)
Expand Down Expand Up @@ -582,21 +571,23 @@ func fetchAthenaWorkGroupNamedQueries(ctx context.Context, meta schema.ClientMet
// User Defined Helpers
// ====================================================================================================================

func fetchWorkGroup(ctx context.Context, res chan<- interface{}, c *client.Client, groupSummary types.WorkGroupSummary) error {
func workGroupDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, summary interface{}) {
c := meta.(*client.Client)
svc := c.Services().Athena
wg := summary.(types.WorkGroupSummary)
dc, err := svc.GetWorkGroup(ctx, &athena.GetWorkGroupInput{
WorkGroup: groupSummary.Name,
WorkGroup: wg.Name,
}, func(options *athena.Options) {
options.Region = c.Region
})
if err != nil {
if c.IsNotFoundError(err) {
return nil
return
}
return diag.WrapError(err)
errorChan <- diag.WrapError(err)
bbernays marked this conversation as resolved.
Show resolved Hide resolved
return
}
res <- *dc.WorkGroup
return nil
resultsChan <- *dc.WorkGroup
}

func createWorkGroupArn(cl *client.Client, groupName string) string {
Expand Down