Skip to content

Commit

Permalink
[MetricBeat] [AWS] Fix aws metric tags with resourcegroupstaggingapi …
Browse files Browse the repository at this point in the history
…paginator (elastic#26385)
  • Loading branch information
kwinstonix committed Jun 30, 2021
1 parent 076e0a6 commit 3ce2da3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix metric grouping for windows/perfmon module {issue}23489[23489] {pull}23505[23505]
- Major refactor of system/cpu and system/core metrics. {pull}25771[25771]
- Fix GCP Project ID being ingested as `cloud.account.id` in `gcp.billing` module {issue}26357[26357] {pull}26412[26412]
- Fix aws metric tags with resourcegroupstaggingapi paginator. {issue}26385[26385] {pull}26443[26443]

*Packetbeat*

Expand Down
8 changes: 8 additions & 0 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,15 @@ func (m *MockCloudWatchClientWithoutDim) GetMetricDataRequest(input *cloudwatch.
func (m *MockResourceGroupsTaggingClient) GetResourcesRequest(input *resourcegroupstaggingapi.GetResourcesInput) resourcegroupstaggingapi.GetResourcesRequest {
httpReq, _ := http.NewRequest("", "", nil)
return resourcegroupstaggingapi.GetResourcesRequest{
Input: input,
Copy: m.GetResourcesRequest,
Request: &awssdk.Request{
Operation: &awssdk.Operation{
Name: "GetResources",
HTTPMethod: "POST",
HTTPPath: "/",
Paginator: nil,
},
Data: &resourcegroupstaggingapi.GetResourcesOutput{
PaginationToken: awssdk.String(""),
ResourceTagMappingList: []resourcegroupstaggingapi.ResourceTagMapping{
Expand Down
30 changes: 12 additions & 18 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,39 +177,33 @@ func GetResourcesTags(svc resourcegroupstaggingapiiface.ClientAPI, resourceTypeF
ResourceTypeFilters: resourceTypeFilters,
}

init := true
for init || *getResourcesInput.PaginationToken != "" {
init = false
getResourcesRequest := svc.GetResourcesRequest(getResourcesInput)
output, err := getResourcesRequest.Send(context.TODO())
if err != nil {
err = errors.Wrap(err, "error GetResources")
return nil, err
}

getResourcesInput.PaginationToken = output.PaginationToken
if resourceTypeFilters == nil || len(output.ResourceTagMappingList) == 0 {
return nil, nil
}

for _, resourceTag := range output.ResourceTagMappingList {
getResourcesRequest := svc.GetResourcesRequest(getResourcesInput)
paginator := resourcegroupstaggingapi.NewGetResourcesPaginator(getResourcesRequest)
for paginator.Next(context.TODO()) {
page := paginator.CurrentPage()
for _, resourceTag := range page.ResourceTagMappingList {
shortIdentifier, err := FindShortIdentifierFromARN(*resourceTag.ResourceARN)
if err == nil {
resourceTagMap[shortIdentifier] = resourceTag.Tags
} else {
err = errors.Wrap(err, "error occurs when proccessing shortIdentifier")
err = errors.Wrap(err, "error occurs when processing shortIdentifier")
return nil, err
}

wholeIdentifier, err := FindWholeIdentifierFromARN(*resourceTag.ResourceARN)
if err == nil {
resourceTagMap[wholeIdentifier] = resourceTag.Tags
} else {
err = errors.Wrap(err, "error occurs when proccessing longIdentifier")
err = errors.Wrap(err, "error occurs when processing longIdentifier")
return nil, err
}
}
}

if err := paginator.Err(); err != nil {
err = errors.Wrap(err, "error GetResources with Paginator")
return nil, err
}
return resourceTagMap, nil
}

Expand Down
34 changes: 32 additions & 2 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,17 @@ func (m *MockCloudWatchClient) GetMetricDataRequest(input *cloudwatch.GetMetricD

func (m *MockResourceGroupsTaggingClient) GetResourcesRequest(input *resourcegroupstaggingapi.GetResourcesInput) resourcegroupstaggingapi.GetResourcesRequest {
httpReq, _ := http.NewRequest("", "", nil)
return resourcegroupstaggingapi.GetResourcesRequest{
op := &awssdk.Operation{
Name: "GetResources",
HTTPMethod: "POST",
HTTPPath: "/",
Paginator: nil,
}
firstPageResult := resourcegroupstaggingapi.GetResourcesRequest{
Request: &awssdk.Request{
Operation: op,
Data: &resourcegroupstaggingapi.GetResourcesOutput{
PaginationToken: awssdk.String(""),
PaginationToken: awssdk.String("PaginationToken"),
ResourceTagMappingList: []resourcegroupstaggingapi.ResourceTagMapping{
{
ResourceARN: awssdk.String("arn:aws:rds:eu-west-1:123456789012:db:mysql-db-1"),
Expand Down Expand Up @@ -148,6 +155,29 @@ func (m *MockResourceGroupsTaggingClient) GetResourcesRequest(input *resourcegro
},
HTTPRequest: httpReq,
},
Input: input,
Copy: m.GetResourcesRequest,
}

// aws resourcegroupstaggingapi default pagination size is 50, if resource amount is a
// multiple of 50, then last request has an empty result.
lastPageWithEmptyResult := resourcegroupstaggingapi.GetResourcesRequest{
Request: &awssdk.Request{
Data: &resourcegroupstaggingapi.GetResourcesOutput{
PaginationToken: awssdk.String(""),
ResourceTagMappingList: []resourcegroupstaggingapi.ResourceTagMapping{},
},
HTTPRequest: httpReq,
Operation: op,
},
Input: input,
Copy: m.GetResourcesRequest,
}

if input.PaginationToken == nil {
return firstPageResult
} else {
return lastPageWithEmptyResult
}
}

Expand Down

0 comments on commit 3ce2da3

Please sign in to comment.