Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

feat: Generic list and detail #1000

Merged
merged 23 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (

"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/aws/smithy-go"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

type AWSService string
Expand All @@ -33,6 +36,10 @@ type SupportedServiceRegionsData struct {
regionVsPartition map[string]string
}

type ListResolver func(ctx context.Context, meta schema.ClientMeta) ([]interface{}, error)
bbernays marked this conversation as resolved.
Show resolved Hide resolved

type DetailResolver func(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}, summary interface{}) error

const (
ApigatewayService AWSService = "apigateway"
Athena AWSService = "athena"
Expand All @@ -51,6 +58,8 @@ const (
WorkspacesService AWSService = "workspaces"
)

const MAX_GOROUTINES = 10

const (
PartitionServiceRegionFile = "data/partition_service_region.json"
defaultPartition = "aws"
Expand Down Expand Up @@ -382,3 +391,28 @@ func TagsToMap(tagSlice interface{}) map[string]string {
TagsIntoMap(tagSlice, ret)
return ret
}

func ListAndDetailResolver(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}, list ListResolver, details DetailResolver) error {
sem := semaphore.NewWeighted(int64(MAX_GOROUTINES))
errs, ctx := errgroup.WithContext(ctx)
response, err := list(ctx, meta)
bbernays marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return diag.WrapError(err)
}
for _, item := range response {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary interface{}) {
errs.Go(func() error {
defer sem.Release(1)
return details(ctx, meta, res, summary)
bbernays marked this conversation as resolved.
Show resolved Hide resolved
})
}(item)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
}
return nil
}
47 changes: 19 additions & 28 deletions resources/services/athena/work_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import (
"github.com/cloudquery/cq-provider-aws/client"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

//go:generate cq-gen --resource work_groups --config gen.hcl --output .
func WorkGroups() *schema.Table {
return &schema.Table{
Name: "aws_athena_work_groups",
Description: "A workgroup, which contains a name, description, creation time, state, and other configuration, listed under WorkGroup$Configuration",
Resolver: fetchAthenaWorkGroups,
Name: "aws_athena_work_groups",
Description: "A workgroup, which contains a name, description, creation time, state, and other configuration, listed under WorkGroup$Configuration",
Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listWorkGroups, getWorkGroupDetail))
},
Multiplex: client.ServiceAccountRegionMultiplexer("athena"),
IgnoreError: client.IgnoreAccessDeniedServiceDisabled,
DeleteFilter: client.DeleteAccountRegionFilter,
Expand Down Expand Up @@ -408,41 +408,30 @@ func WorkGroups() *schema.Table {
// Table Resolver Functions
// ====================================================================================================================

func fetchAthenaWorkGroups(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
func listWorkGroups(ctx context.Context, meta schema.ClientMeta) ([]interface{}, error) {
c := meta.(*client.Client)
svc := c.Services().Athena
input := athena.ListWorkGroupsInput{}
var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES))
workGroups := make([]interface{}, 0)
for {
response, err := svc.ListWorkGroups(ctx, &input, func(options *athena.Options) {
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
return nil, diag.WrapError(err)
}
errs, ctx := errgroup.WithContext(ctx)
for _, d := range response.WorkGroups {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary types.WorkGroupSummary) {
errs.Go(func() error {
defer sem.Release(1)
return fetchWorkGroup(ctx, res, svc, c.Region, summary)
})
}(d)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
for _, item := range response.WorkGroups {
workGroups = append(workGroups, item)
}

if aws.ToString(response.NextToken) == "" {
break
}
input.NextToken = response.NextToken
}
return nil
return workGroups, nil
}

func ResolveAthenaWorkGroupArn(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
cl := meta.(*client.Client)
dc := resource.Item.(types.WorkGroup)
Expand Down Expand Up @@ -517,7 +506,6 @@ func fetchAthenaWorkGroupQueryExecutions(ctx context.Context, meta schema.Client
}
for _, d := range response.QueryExecutionIds {
dc, err := svc.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{

QueryExecutionId: aws.String(d),
}, func(options *athena.Options) {
options.Region = c.Region
Expand Down Expand Up @@ -571,11 +559,14 @@ func fetchAthenaWorkGroupNamedQueries(ctx context.Context, meta schema.ClientMet
// User Defined Helpers
// ====================================================================================================================

func fetchWorkGroup(ctx context.Context, res chan<- interface{}, svc client.AthenaClient, region string, groupSummary types.WorkGroupSummary) error {
func getWorkGroupDetail(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}, summary interface{}) error {
c := meta.(*client.Client)
svc := c.Services().Athena
wg := summary.(types.WorkGroupSummary)
dc, err := svc.GetWorkGroup(ctx, &athena.GetWorkGroupInput{
WorkGroup: groupSummary.Name,
WorkGroup: wg.Name,
}, func(options *athena.Options) {
options.Region = region
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
Expand Down