Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

feat: Generic list and detail #1000

Merged
merged 23 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/aws/smithy-go"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/semaphore"
)

type AWSService string
Expand All @@ -33,6 +35,10 @@ type SupportedServiceRegionsData struct {
regionVsPartition map[string]string
}

type ListResolver func(ctx context.Context, meta schema.ClientMeta) ([]interface{}, error)
bbernays marked this conversation as resolved.
Show resolved Hide resolved

type DetailResolver func(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, summary interface{})

const (
ApigatewayService AWSService = "apigateway"
Athena AWSService = "athena"
Expand All @@ -51,6 +57,8 @@ const (
WorkspacesService AWSService = "workspaces"
)

const MAX_GOROUTINES = 10

const (
PartitionServiceRegionFile = "data/partition_service_region.json"
defaultPartition = "aws"
Expand Down Expand Up @@ -382,3 +390,39 @@ func TagsToMap(tagSlice interface{}) map[string]string {
TagsIntoMap(tagSlice, ret)
return ret
}

func ListAndDetailResolver(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}, list ListResolver, details DetailResolver) error {
var diags diag.Diagnostics
var wg sync.WaitGroup
sem := semaphore.NewWeighted(int64(MAX_GOROUTINES))
errorChan := make(chan error)
response, err := list(ctx, meta)
bbernays marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return diag.WrapError(err)
}
// All items will be attempted to be fetched, but could return an error
wg.Add(len(response))
for _, item := range response {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary interface{}) {
defer wg.Done()
defer sem.Release(1)
details(ctx, meta, res, errorChan, summary)
}(item)
}
// Ensure all items have been attempted to be fetched
wg.Wait()

// empty the error channel and convert to Diags
for len(errorChan) > 0 {
disq marked this conversation as resolved.
Show resolved Hide resolved
diags = diags.Add(diag.FromError(<-errorChan, diag.RESOLVING))
}

// Only return the diags if there is an error
bbernays marked this conversation as resolved.
Show resolved Hide resolved
if diags.HasErrors() {
bbernays marked this conversation as resolved.
Show resolved Hide resolved
return diags
}
return nil
}
53 changes: 22 additions & 31 deletions resources/services/athena/work_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
"github.com/cloudquery/cq-provider-aws/client"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

//go:generate cq-gen --resource work_groups --config gen.hcl --output .
func WorkGroups() *schema.Table {
return &schema.Table{
Name: "aws_athena_work_groups",
Description: "A workgroup, which contains a name, description, creation time, state, and other configuration, listed under WorkGroup$Configuration",
Resolver: fetchAthenaWorkGroups,
Name: "aws_athena_work_groups",
Description: "A workgroup, which contains a name, description, creation time, state, and other configuration, listed under WorkGroup$Configuration",
Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
err := diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listWorkGroups, workGroupDetail))
bbernays marked this conversation as resolved.
Show resolved Hide resolved
return err
},
Multiplex: client.ServiceAccountRegionMultiplexer("athena"),
IgnoreError: client.IgnoreAccessDeniedServiceDisabled,
DeleteFilter: client.DeleteAccountRegionFilter,
Expand Down Expand Up @@ -408,41 +409,30 @@ func WorkGroups() *schema.Table {
// Table Resolver Functions
// ====================================================================================================================

func fetchAthenaWorkGroups(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
func listWorkGroups(ctx context.Context, meta schema.ClientMeta) ([]interface{}, error) {
c := meta.(*client.Client)
svc := c.Services().Athena
input := athena.ListWorkGroupsInput{}
var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES))
workGroups := make([]interface{}, 0)
for {
response, err := svc.ListWorkGroups(ctx, &input, func(options *athena.Options) {
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
return nil, diag.WrapError(err)
}
errs, ctx := errgroup.WithContext(ctx)
for _, d := range response.WorkGroups {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary types.WorkGroupSummary) {
errs.Go(func() error {
defer sem.Release(1)
return fetchWorkGroup(ctx, res, svc, c.Region, summary)
})
}(d)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
for _, item := range response.WorkGroups {
workGroups = append(workGroups, item)
}

if aws.ToString(response.NextToken) == "" {
break
}
input.NextToken = response.NextToken
}
return nil
return workGroups, nil
}

func ResolveAthenaWorkGroupArn(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
cl := meta.(*client.Client)
dc := resource.Item.(types.WorkGroup)
Expand Down Expand Up @@ -517,7 +507,6 @@ func fetchAthenaWorkGroupQueryExecutions(ctx context.Context, meta schema.Client
}
for _, d := range response.QueryExecutionIds {
dc, err := svc.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{

QueryExecutionId: aws.String(d),
}, func(options *athena.Options) {
options.Region = c.Region
Expand Down Expand Up @@ -571,17 +560,19 @@ func fetchAthenaWorkGroupNamedQueries(ctx context.Context, meta schema.ClientMet
// User Defined Helpers
// ====================================================================================================================

func fetchWorkGroup(ctx context.Context, res chan<- interface{}, svc client.AthenaClient, region string, groupSummary types.WorkGroupSummary) error {
func workGroupDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, summary interface{}) {
c := meta.(*client.Client)
svc := c.Services().Athena
wg := summary.(types.WorkGroupSummary)
dc, err := svc.GetWorkGroup(ctx, &athena.GetWorkGroupInput{
WorkGroup: groupSummary.Name,
WorkGroup: wg.Name,
}, func(options *athena.Options) {
options.Region = region
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
errorChan <- err
}
res <- *dc.WorkGroup
return nil
resultsChan <- *dc.WorkGroup
}

func createWorkGroupArn(cl *client.Client, groupName string) string {
Expand Down