Skip to content

Commit

Permalink
Refactor endpoint resolver for AWS services (#32921)
Browse files Browse the repository at this point in the history
* Add signing region

* Add changelog entry

* remove awscommon.EnrichAWSConfigWithEndpoint

* add nonAWSBucketResolver when recreating the s3 client for new region

* remove unused functions

* changelog

* fix changelog

* fix docs

Co-authored-by: Andrea Spacca <[email protected]>
(cherry picked from commit d0bc413)

# Conflicts:
#	x-pack/libbeat/common/aws/credentials.go
#	x-pack/libbeat/common/aws/credentials_test.go
  • Loading branch information
sayden authored and mergify[bot] committed Sep 8, 2022
1 parent b821413 commit 023f6f6
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 86 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Affecting all Beats*

- Fix namespacing for agent self-monitoring, CPU no longer reports as zero. {pull}32336[32336]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix in AWS related services initialisation relying on custom endpoint resolver. {issue}32888[32888] {pull}32921[32921]

*Auditbeat*

- auditd module: Fix parsing of audit rules where arguments are quoted (like file paths containing spaces). {pull}32421[32421]
- auditd module: Fix minimum AuditStatus length so that library can support kernels from 2.6.32. {pull}32421[32421]
- system/socket: Reduce memory usage of the dataset. {issue}32191[32191] {pull}32192[32192]
- Fix rendering of MAC addresses to conform to ECS. {issue}32621[32621] {pull}32622[32622]
- Fixes a bug with the auditd module where data is corrupted because it was not copied before the byte slice was reused. {issue}32818[32818] {pull}32823[32823]

*Filebeat*

- Fix counter for number of events published in `httpjson` input. {pull}31993[31993]
- Fix handling of Checkpoint event for R81. {issue}32380[32380] {pull}32458[32458]
- Fix a hang on `apt-get update` stage in packaging. {pull}32580[32580]
- gcp-pubsub input: Restart Pub/Sub client on all errors. {issue}32550[32550] {pull}32712[32712]
- Fix not parsing as json when `json` and `ndjson` content types have charset information in `aws-s3` input {pull}32767[32767]
- Update `cloud.region` parsing in cloudtrail fileset. {pull}32763[32763]
- Fix file.path field in cloudtrail fileset to use json.digestS3Object. {pull}32759[32759]
- Fix rendering of MAC addresses to conform to ECS. {issue}32621[32621] {pull}32622[32622]
- Import dashboards from CEF integration. {pull}32766[32766]
- Fix how to handle IPv6 addresses in the fileset `nginx/ingress_controller` for Filebeat. {pull}32989[32989]

*Auditbeat*

Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ Listing of the S3 bucket will be polled according the time interval defined by
The `aws-s3` input can also poll 3rd party S3 compatible services such as the self hosted Minio.
Using non-AWS S3 compatible buckets requires the use of `access_key_id` and `secret_access_key` for authentication.
To specify the S3 bucket name, use the `non_aws_bucket_name` config and the `endpoint` must be set to replace the default API endpoint.
`endpoint` should be a full URI in the form of `https(s)://<s3 endpoint>`, that will be used as the API endpoint of the service, or a single domain.
If a domain is provided, the full endpoint URI will be constructed with the region name in the standard form of `https://s3.<region>.<domain>` supported by AWS and several 3rd party providers.
`endpoint` should be a full URI in the form of `https(s)://<s3 endpoint>` in the case of `non_aws_bucket_name`, that will be used as the API endpoint of the service.
No `endpoint` is needed if using the native AWS S3 service hosted at `amazonaws.com`.
Please see <<aws-credentials-config,Configuration parameters>> for alternate AWS domains that require a different endpoint.

Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
}
defer client.Close()

logsServiceName := awscommon.CreateServiceName("logs", in.config.AWSConfig.FIPSEnabled, in.config.RegionName)
cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig)
svc := cloudwatchlogs.NewFromConfig(cwConfig)
svc := cloudwatchlogs.NewFromConfig(in.awsConfig, func(o *cloudwatchlogs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})

logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName)
if err != nil {
Expand Down
44 changes: 34 additions & 10 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,31 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) {
s3ServiceName := awscommon.CreateServiceName("s3", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)
sqsServiceName := awscommon.CreateServiceName("sqs", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)

sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, sqsServiceName, in.awsConfig.Region, in.awsConfig)),
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),
queueURL: in.config.QueueURL,
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
longPollWaitTime: in.config.SQSWaitTime,
}

s3API := &awsS3API{
client: s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)),
client: s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),
}

log := ctx.Logger.With("queue_url", in.config.QueueURL)
log.Infof("AWS api_timeout is set to %v.", in.config.APITimeout)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout)
log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)
Expand All @@ -195,8 +199,15 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
return sqsReader, nil
}

type nonAWSBucketResolver struct {
endpoint string
}

func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
s3ServiceName := awscommon.CreateServiceName("s3", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)
var bucketName string
var bucketID string
if in.config.NonAWSBucketName != "" {
Expand All @@ -207,7 +218,14 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
bucketID = in.config.BucketARN
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig), func(o *s3.Options) {
s3Client := s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint}
}

if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle
})
regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName)
Expand All @@ -220,7 +238,14 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
in.awsConfig.Region = regionName

if regionName != originalAwsConfigRegion {
s3Client = s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig), func(o *s3.Options) {
s3Client = s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint}
}

if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle
})
}
Expand All @@ -234,7 +259,6 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func TestGetRegionForBucketARN(t *testing.T) {
t.Fatal(err)
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", cfg))
s3Client := s3.NewFromConfig(cfg)

regionName, err := getRegionForBucket(context.Background(), s3Client, getBucketNameFromARN(tfConfig.BucketName))
assert.NoError(t, err)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestPaginatorListPrefix(t *testing.T) {
t.Fatal(err)
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", cfg))
s3Client := s3.NewFromConfig(cfg)

s3API := &awsS3API{
client: s3Client,
Expand Down
18 changes: 12 additions & 6 deletions x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package ec2
import (
"fmt"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/gofrs/uuid"

Expand Down Expand Up @@ -63,9 +65,11 @@ func AutodiscoverBuilder(
if config.Regions == nil {
// set default region to make initial aws api call
awsCfg.Region = "us-west-1"
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsCfg.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, awsCfg.Region, awsCfg))
svcEC2 := ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})

completeRegionsList, err := awsauto.GetRegions(svcEC2)
if err != nil {
Expand All @@ -81,9 +85,11 @@ func AutodiscoverBuilder(
logp.Error(fmt.Errorf("error loading AWS config for aws_ec2 autodiscover provider: %w", err))
}
awsCfg.Region = region
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, region)
clients = append(clients, ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, region, awsCfg)))
clients = append(clients, ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}))
}

return internalBuilder(uuid, bus, config, newAPIFetcher(clients), keystore)
Expand Down
19 changes: 13 additions & 6 deletions x-pack/libbeat/autodiscover/providers/aws/elb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elb

import (
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
"github.com/gofrs/uuid"
Expand Down Expand Up @@ -64,9 +65,12 @@ func AutodiscoverBuilder(

// Construct MetricSet with a full regions list if there is no region specified.
if config.Regions == nil {
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsCfg.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, awsCfg.Region, awsCfg))
svcEC2 := ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

})

completeRegionsList, err := awsauto.GetRegions(svcEC2)
if err != nil {
Expand All @@ -88,9 +92,12 @@ func AutodiscoverBuilder(
logp.Err("error loading AWS config for aws_elb autodiscover provider: %s", err)
}
awsCfg.Region = region
elbServiceName := awscommon.CreateServiceName("elasticloadbalancing", config.AWSConfig.FIPSEnabled, region)
clients = append(clients, elasticloadbalancingv2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, elbServiceName, region, awsCfg)))
clients = append(clients, elasticloadbalancingv2.NewFromConfig(awsCfg, func(o *elasticloadbalancingv2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

}))
}

return internalBuilder(uuid, bus, config, newAPIFetcher(clients), keystore)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/libbeat/common/aws/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"net/http"
"net/url"
"strings"

"github.com/aws/aws-sdk-go-v2/service/sts"

Expand Down Expand Up @@ -164,6 +163,7 @@ func addStaticCredentialsProviderToAwsConfig(beatsConfig ConfigAWS, awsConfig *a

awsConfig.Credentials = staticCredentialsProvider
}
<<<<<<< HEAD

// EnrichAWSConfigWithEndpoint function enabled endpoint resolver for AWS service clients when endpoint is given in config.
func EnrichAWSConfigWithEndpoint(endpoint string, serviceName string, regionName string, beatsConfig awssdk.Config) awssdk.Config {
Expand Down Expand Up @@ -201,3 +201,5 @@ func CreateServiceName(serviceName string, fipsEnabled bool, region string) stri
}
return serviceName
}
=======
>>>>>>> d0bc413a50 (Refactor endpoint resolver for AWS services (#32921))
4 changes: 3 additions & 1 deletion x-pack/libbeat/common/aws/credentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"testing"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
Expand Down Expand Up @@ -53,6 +52,7 @@ func TestGetAWSCredentials(t *testing.T) {
assert.Equal(t, inputConfig.SessionToken, retrievedAWSConfig.SessionToken)
}

<<<<<<< HEAD
func TestEnrichAWSConfigWithEndpoint(t *testing.T) {
cases := []struct {
title string
Expand Down Expand Up @@ -165,6 +165,8 @@ func TestCreateServiceName(t *testing.T) {
}
}

=======
>>>>>>> d0bc413a50 (Refactor endpoint resolver for AWS services (#32921))
func TestDefaultRegion(t *testing.T) {
cases := []struct {
title string
Expand Down
11 changes: 2 additions & 9 deletions x-pack/libbeat/docs/aws-credentials-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,10 @@ To configure AWS credentials, either put the credentials into the {beatname_uc}
* *credential_profile_name*: profile name in shared credentials file.
* *shared_credential_file*: directory of the shared credentials file.
* *role_arn*: AWS IAM Role to assume.
* *endpoint*: URL of the entry point for an AWS web service.
Most AWS services offer a regional endpoint that can be used to make requests.
The general syntax of a regional endpoint is `protocol://service-code.region-code.endpoint-code`.
Some services, such as IAM, do not support regions. The endpoints for these
services do not include a region. In `aws` module, `endpoint` config is to set
the `endpoint-code` part, such as `amazonaws.com`, `amazonaws.com.cn`, `c2s.ic.gov`,
`sc2s.sgov.gov`.
* *proxy_url*: URL of the proxy to use to connect to AWS web services. The syntax is `http(s)://<IP/Hostname>:<port>`
* *fips_enabled*: Enabling this option changes the service names from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`. All services used by Beats are FIPS compatible except for `tagging` but only certain regions are FIPS compatible. See https://aws.amazon.com/compliance/fips/ or the appropriate service page, https://docs.aws.amazon.com/general/latest/gr/aws-service-information.html, for a full list of FIPS endpoints and regions.
* *fips_enabled*: Enabling this option instructs {beatname_uc} to use the FIPS endpoint of a service. All services used by {beatname_uc} are FIPS compatible except for `tagging` but only certain regions are FIPS compatible. See https://aws.amazon.com/compliance/fips/ or the appropriate service page, https://docs.aws.amazon.com/general/latest/gr/aws-service-information.html, for a full list of FIPS endpoints and regions.
* *ssl*: This specifies SSL/TLS configuration. If the ssl section is missing, the host's CAs are used for HTTPS connections. See <<configuration-ssl>> for more information.
* *default_region*: Default region to query if no other region is set.
* *default_region*: Default region to query if no other region is set. Most AWS services offer a regional endpoint that can be used to make requests. Some services, such as IAM, do not support regions. If a region is not provided by any other way (environment variable, credential or instance profile), the value set here will be used.

[float]
==== Supported Formats
Expand Down
31 changes: 16 additions & 15 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"fmt"

"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -110,33 +109,35 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
awsConfig.Region = config.Regions[0]
}

stsServiceName := awscommon.CreateServiceName("sts", config.AWSConfig.FIPSEnabled, awsConfig.Region)
iamServiceName := awscommon.CreateServiceName("iam", config.AWSConfig.FIPSEnabled, awsConfig.Region)

// Get IAM account id
svcSts := sts.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, stsServiceName, "", awsConfig))
svcSts := sts.NewFromConfig(awsConfig, func(o *sts.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
outputIdentity, err := svcSts.GetCallerIdentity(context.TODO(), &sts.GetCallerIdentityInput{})
if err != nil {
base.Logger().Warn("failed to get caller identity, please check permission setting: ", err)
} else {
metricSet.AccountID = *outputIdentity.Account
base.Logger().Debug("AWS Credentials belong to account ID: ", metricSet.AccountID)
}
iamRegion := ""
if strings.HasPrefix(awsConfig.Region, "us-gov-") {
iamRegion = "us-gov"
}
// Get account name/alias
svcIam := iam.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, iamServiceName, iamRegion, awsConfig))
svcIam := iam.NewFromConfig(awsConfig, func(o *iam.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

})
metricSet.AccountName = getAccountName(svcIam, base, metricSet)

// Construct MetricSet with a full regions list
if config.Regions == nil {
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsConfig.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, "", awsConfig))
svcEC2 := ec2.NewFromConfig(awsConfig, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
completeRegionsList, err := getRegions(svcEC2)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 023f6f6

Please sign in to comment.