From adcd962347016da71e4e325ee635a58a9b4218de Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 6 Feb 2020 15:20:16 -0700 Subject: [PATCH] Add autodiscover for aws_ec2 (#14823) * Add autodiscover for aws_ec2 * Add aws.ec2.* to autodiscover template --- CHANGELOG.next.asciidoc | 2 +- libbeat/docs/shared-autodiscover.asciidoc | 45 ++++ .../docs/autodiscover-aws-ec2-config.asciidoc | 25 ++ metricbeat/docs/configuring-howto.asciidoc | 2 + .../libbeat/autodiscover/providers/aws/aws.go | 35 +++ .../providers/aws/{elb => }/config.go | 22 +- .../providers/aws/ec2/_meta/fields.yml | 11 + .../autodiscover/providers/aws/ec2/ec2.go | 134 ++++++++++ .../autodiscover/providers/aws/ec2/fetch.go | 230 ++++++++++++++++++ .../providers/aws/ec2/fetch_test.go | 18 ++ .../providers/aws/ec2/mock_ec2_client_test.go | 18 ++ .../providers/aws/ec2/mocks_test.go | 85 +++++++ .../providers/aws/ec2/provider.go | 150 ++++++++++++ .../providers/aws/ec2/provider_test.go | 109 +++++++++ .../autodiscover/providers/aws/ec2/watch.go | 107 ++++++++ .../providers/aws/ec2/watch_test.go | 65 +++++ .../autodiscover/providers/aws/elb/fetch.go | 12 +- .../providers/aws/elb/fetch_test.go | 3 +- .../providers/aws/elb/lblistener.go | 19 +- .../providers/aws/elb/provider.go | 47 ++-- .../providers/aws/elb/provider_test.go | 10 +- .../providers/aws/test/provider.go | 59 +++++ x-pack/libbeat/cmd/inject.go | 1 + 23 files changed, 1145 insertions(+), 64 deletions(-) create mode 100644 metricbeat/docs/autodiscover-aws-ec2-config.asciidoc rename x-pack/libbeat/autodiscover/providers/aws/{elb => }/config.go (52%) create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go create mode 100644 x-pack/libbeat/autodiscover/providers/aws/test/provider.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a2e4c2ad1bd..03fb4fab87f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -98,7 +98,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* - Add document_id setting to decode_json_fields processor. {pull}15859[15859] - +- Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823] *Auditbeat* diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index 9e7087e72be..5ddda476ddd 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -362,6 +362,51 @@ ifdef::autodiscoverHints[] include::../../{beatname_lc}/docs/autodiscover-hints.asciidoc[] endif::autodiscoverHints[] +ifdef::autodiscoverAWSEC2[] +[float] +===== Amazon EC2s + +*Note: This provider is experimental* + +The Amazon EC2 autodiscover provider discovers https://aws.amazon.com/ec2/[EC2 instances]. +This is useful for users to launch Metricbeat modules to monitor services running on AWS EC2 instances. +For example, to gather MySQL metrics from mysql servers running on EC2 instances with specific tag `service: mysql`. + +This provider will load AWS credentials using the standard AWS environment variables and shared credentials files +see https://docs.aws.amazon.com/general/latest/gr/aws-access-keys-best-practices.html[Best Practices for Managing AWS Access Keys] +for more information. If you do not wish to use these, you may explicitly set the `access_key_id` and +`secret_access_key` variables. + +These are the available fields during within config templating. +The `aws.ec2.*` fields and `cloud.*` fields will be available on each emitted event. + +* cloud.availability_zone +* cloud.instance.id +* cloud.machine.type +* cloud.provider +* cloud.region + +* aws.ec2.architecture +* aws.ec2.image.id +* aws.ec2.kernel.id +* aws.ec2.monitoring.state +* aws.ec2.private.dns_name +* aws.ec2.private.ip +* aws.ec2.public.dns_name +* aws.ec2.public.ip +* aws.ec2.root_device_name +* aws.ec2.state.code +* aws.ec2.state.name +* aws.ec2.subnet.id +* aws.ec2.tags +* aws.ec2.vpc.id + +include::../../{beatname_lc}/docs/autodiscover-aws-ec2-config.asciidoc[] + +This autodiscover provider takes our standard <>. + +endif::autodiscoverAWSEC2[] + [[configuration-autodiscover-advanced]] === Advanced usage diff --git a/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc new file mode 100644 index 00000000000..42f9497c2aa --- /dev/null +++ b/metricbeat/docs/autodiscover-aws-ec2-config.asciidoc @@ -0,0 +1,25 @@ +{beatname_uc} supports templates for modules: + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +metricbeat.autodiscover: + providers: + - type: aws_ec2 + period: 1m + credential_profile_name: elastic-beats + templates: + - condition: + equals: + aws.ec2.tags.service: "mysql" + config: + - module: mysql + metricsets: ["status", "galera_status"] + period: 10s + hosts: ["root:password@tcp(${data.aws.ec2.public.ip}:3306)/"] + username: root + password: password +------------------------------------------------------------------------------------- + +This autodiscover provider takes our standard AWS credentials options. +With this configuration, `mysql` metricbeat module will be launched for all EC2 +instances that have `service: mysql` as a tag. diff --git a/metricbeat/docs/configuring-howto.asciidoc b/metricbeat/docs/configuring-howto.asciidoc index d532b8a5a3e..b282976c097 100644 --- a/metricbeat/docs/configuring-howto.asciidoc +++ b/metricbeat/docs/configuring-howto.asciidoc @@ -77,7 +77,9 @@ include::{libbeat-dir}/shared-env-vars.asciidoc[] :autodiscoverJolokia: :autodiscoverHints: +:autodiscoverAWSEC2: include::{libbeat-dir}/shared-autodiscover.asciidoc[] +:autodiscoverAWSEC2!: :standalone: include::{libbeat-dir}/yaml.asciidoc[] diff --git a/x-pack/libbeat/autodiscover/providers/aws/aws.go b/x-pack/libbeat/autodiscover/providers/aws/aws.go index 34e92c6addb..413866acc9e 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/aws.go +++ b/x-pack/libbeat/autodiscover/providers/aws/aws.go @@ -3,3 +3,38 @@ // you may not use this file except in compliance with the Elastic License. package aws + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/pkg/errors" +) + +// SafeString makes handling AWS *string types easier. +// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. +// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. +// This will return the plain version of the given string or an empty string if the pointer is null +func SafeString(str *string) string { + if str == nil { + return "" + } + + return *str +} + +// GetRegions makes DescribeRegions API call to list all regions from AWS +func GetRegions(svc ec2iface.ClientAPI) (completeRegionsList []string, err error) { + input := &ec2.DescribeRegionsInput{} + req := svc.DescribeRegionsRequest(input) + output, err := req.Send(context.TODO()) + if err != nil { + err = errors.Wrap(err, "Failed DescribeRegions") + return + } + for _, region := range output.Regions { + completeRegionsList = append(completeRegionsList, *region.RegionName) + } + return +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/config.go b/x-pack/libbeat/autodiscover/providers/aws/config.go similarity index 52% rename from x-pack/libbeat/autodiscover/providers/aws/elb/config.go rename to x-pack/libbeat/autodiscover/providers/aws/config.go index 0eb8fb73c0c..e3a0873734e 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/config.go +++ b/x-pack/libbeat/autodiscover/providers/aws/config.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package elb +package aws import ( "time" @@ -10,31 +10,23 @@ import ( "github.com/elastic/beats/x-pack/libbeat/common/aws" "github.com/elastic/beats/libbeat/autodiscover/template" - "github.com/elastic/beats/libbeat/common" ) -// Config for the aws_elb autodiscover provider. +// Config for all aws autodiscover providers. type Config struct { - Type string `config:"type"` - - // Standard autodiscover fields. - - // Hints are currently not supported, but may be implemented in a later release - HintsEnabled bool `config:"hints.enabled"` - Builders []*common.Config `config:"builders"` - Appenders []*common.Config `config:"appenders"` - Templates template.MapperSettings `config:"templates"` + Type string `config:"type"` + Templates template.MapperSettings `config:"templates"` // Period defines how often to poll the AWS API. Period time.Duration `config:"period" validate:"nonzero,required"` // AWS Specific autodiscover fields - - Regions []string `config:"regions" validate:"required"` + Regions []string `config:"regions"` AWSConfig aws.ConfigAWS `config:",inline"` } -func defaultConfig() *Config { +// DefaultConfig for all aws autodiscover providers. +func DefaultConfig() *Config { return &Config{ Period: time.Minute, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml b/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml new file mode 100644 index 00000000000..d634a6697df --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/_meta/fields.yml @@ -0,0 +1,11 @@ +- key: ec2_listener + title: "EC2 Listener" + description: > + AWS EC2 Listeners + short_config: false + release: experimental + fields: + - name: ec2_listener + type: group + description: > + Represents an AWS EC2 Listener, e.g. state of an EC2. diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go new file mode 100644 index 00000000000..3a81acb4d5a --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/ec2.go @@ -0,0 +1,134 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" +) + +type ec2Instance struct { + ec2Instance ec2.Instance +} + +// toMap converts this ec2Instance into the form consumed as metadata in the autodiscovery process. +func (i *ec2Instance) toMap() common.MapStr { + architecture, err := i.ec2Instance.Architecture.MarshalValue() + if err != nil { + logp.Error(errors.Wrap(err, "MarshalValue failed for architecture: ")) + } + + m := common.MapStr{ + "image": i.toImage(), + "vpc": i.toVpc(), + "subnet": i.toSubnet(), + "private": i.toPrivate(), + "public": i.toPublic(), + "monitoring": i.toMonitoringState(), + "kernel": i.toKernel(), + "state": i.stateMap(), + "architecture": architecture, + "root_device_name": awsauto.SafeString(i.ec2Instance.RootDeviceName), + } + + for _, tag := range i.ec2Instance.Tags { + m.Put("tags."+awsauto.SafeString(tag.Key), awsauto.SafeString(tag.Value)) + } + return m +} + +func (i *ec2Instance) instanceID() string { + return awsauto.SafeString(i.ec2Instance.InstanceId) +} + +func (i *ec2Instance) toImage() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.ImageId) + return m +} + +func (i *ec2Instance) toMonitoringState() common.MapStr { + monitoringState, err := i.ec2Instance.Monitoring.State.MarshalValue() + if err != nil { + logp.Error(errors.Wrap(err, "MarshalValue failed for monitoring state: ")) + } + + m := common.MapStr{} + m["state"] = monitoringState + return m +} + +func (i *ec2Instance) toPrivate() common.MapStr { + m := common.MapStr{} + m["ip"] = awsauto.SafeString(i.ec2Instance.PrivateIpAddress) + m["dns_name"] = awsauto.SafeString(i.ec2Instance.PrivateDnsName) + return m +} + +func (i *ec2Instance) toPublic() common.MapStr { + m := common.MapStr{} + m["ip"] = awsauto.SafeString(i.ec2Instance.PublicIpAddress) + m["dns_name"] = awsauto.SafeString(i.ec2Instance.PublicDnsName) + return m +} + +func (i *ec2Instance) toVpc() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.VpcId) + return m +} + +func (i *ec2Instance) toSubnet() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.SubnetId) + return m +} + +func (i *ec2Instance) toKernel() common.MapStr { + m := common.MapStr{} + m["id"] = awsauto.SafeString(i.ec2Instance.KernelId) + return m +} + +func (i *ec2Instance) toCloudMap() common.MapStr { + m := common.MapStr{} + availabilityZone := awsauto.SafeString(i.ec2Instance.Placement.AvailabilityZone) + m["availability_zone"] = availabilityZone + m["provider"] = "aws" + + // The region is just an AZ with the last character removed + m["region"] = availabilityZone[:len(availabilityZone)-1] + + instance := common.MapStr{} + instance["id"] = i.instanceID() + m["instance"] = instance + + instanceType, err := i.ec2Instance.InstanceType.MarshalValue() + if err != nil { + logp.Error(errors.Wrap(err, "MarshalValue failed for instance type: ")) + } + machine := common.MapStr{} + machine["type"] = instanceType + m["machine"] = machine + return m +} + +// stateMap converts the State part of the ec2 struct into a friendlier map with 'reason' and 'code' fields. +func (i *ec2Instance) stateMap() (stateMap common.MapStr) { + state := i.ec2Instance.State + stateMap = common.MapStr{} + nameString, err := state.Name.MarshalValue() + if err != nil { + logp.Error(errors.Wrap(err, "MarshalValue failed for instance state name: ")) + } + + stateMap["name"] = nameString + stateMap["code"] = state.Code + return stateMap +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go new file mode 100644 index 00000000000..6f1e6fd38fb --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch.go @@ -0,0 +1,230 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "context" + "sync" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "go.uber.org/multierr" + + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" +) + +// fetcher is an interface that can fetch a list of ec2Instance objects without pagination being necessary. +type fetcher interface { + fetch(ctx context.Context) ([]*ec2Instance, error) +} + +// apiMultiFetcher fetches results from multiple clients concatenating their results together +// Useful since we have a fetcher per region, this combines them. +type apiMultiFetcher struct { + fetchers []fetcher +} + +func (amf *apiMultiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { + fetchResults := make(chan []*ec2Instance) + fetchErr := make(chan error) + + // Simultaneously fetch all from each region + for _, f := range amf.fetchers { + go func(f fetcher) { + res, err := f.fetch(ctx) + if err != nil { + fetchErr <- err + } else { + fetchResults <- res + } + }(f) + } + + var results []*ec2Instance + var errs []error + + for pending := len(amf.fetchers); pending > 0; pending-- { + select { + case r := <-fetchResults: + results = append(results, r...) + case e := <-fetchErr: + errs = append(errs, e) + } + } + + return results, multierr.Combine(errs...) +} + +// apiFetcher is a concrete implementation of fetcher that hits the real AWS API. +type apiFetcher struct { + client ec2iface.ClientAPI +} + +func newAPIFetcher(clients []ec2iface.ClientAPI) fetcher { + fetchers := make([]fetcher, len(clients)) + for idx, client := range clients { + fetchers[idx] = &apiFetcher{client} + } + return &apiMultiFetcher{fetchers} +} + +// fetch attempts to request the full list of ec2Instance objects. +// It accomplishes this by fetching a page of EC2 instances, then one go routine +// per listener API request. Each page of results has O(n)+1 perf since we need that +// additional fetch per EC2. We let the goroutine scheduler sort things out, and use +// a sync.Pool to limit the number of in-flight requests. +func (f *apiFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { + var MaxResults int64 = 50 + + describeInstanceInput := &ec2.DescribeInstancesInput{MaxResults: &MaxResults} + req := f.client.DescribeInstancesRequest(describeInstanceInput) + + ctx, cancel := context.WithCancel(ctx) + ir := &fetchRequest{ + paginator: ec2.NewDescribeInstancesPaginator(req), + client: f.client, + taskPool: sync.Pool{}, + context: ctx, + cancel: cancel, + logger: logp.NewLogger("autodiscover-ec2-fetch"), + } + + // Limit concurrency against the AWS API by creating a pool of objects + // This is hard coded for now. The concurrency limit of 10 was set semi-arbitrarily. + for i := 0; i < 10; i++ { + ir.taskPool.Put(nil) + } + + return ir.fetch() +} + +// fetchRequest provides a way to get all pages from a +// ec2.DescribeInstancesPaginator and all listeners for the given EC2 instance. +type fetchRequest struct { + paginator ec2.DescribeInstancesPaginator + client ec2iface.ClientAPI + ec2Instances []*ec2Instance + errs []error + resultsLock sync.Mutex + taskPool sync.Pool + pendingTasks sync.WaitGroup + context context.Context + cancel func() + logger *logp.Logger +} + +func (p *fetchRequest) fetch() ([]*ec2Instance, error) { + p.dispatch(p.fetchAllPages) + + // Only fetch future pages when there are no longer requests in-flight from a previous page + p.pendingTasks.Wait() + + // Acquire the results lock to ensure memory + // consistency between the last write and this read + p.resultsLock.Lock() + defer p.resultsLock.Unlock() + + // Since everything is async we have to retrieve any errors that occurred from here + if len(p.errs) > 0 { + return nil, multierr.Combine(p.errs...) + } + + return p.ec2Instances, nil +} + +func (p *fetchRequest) fetchAllPages() { + // Keep fetching pages unless we're stopped OR there are no pages left + for { + select { + case <-p.context.Done(): + p.logger.Debug("done fetching EC2 instances, context cancelled") + return + default: + if !p.fetchNextPage() { + p.logger.Debug("fetched all EC2 instances") + return + } + p.logger.Debug("fetched EC2 instance") + } + } +} + +func (p *fetchRequest) fetchNextPage() (more bool) { + success := p.paginator.Next(p.context) + + if success { + for _, reservation := range p.paginator.CurrentPage().Reservations { + for _, instance := range reservation.Instances { + p.dispatch(func() { p.fetchInstances(instance) }) + } + } + } + + if p.paginator.Err() != nil { + p.recordErrResult(p.paginator.Err()) + } + + return success +} + +// dispatch runs the given func in a new goroutine, properly throttling requests +// with the taskPool and also managing the pendingTasks waitGroup to ensure all +// results are accumulated. +func (p *fetchRequest) dispatch(fn func()) { + p.pendingTasks.Add(1) + + go func() { + slot := p.taskPool.Get() + defer p.taskPool.Put(slot) + defer p.pendingTasks.Done() + + fn() + }() +} + +func (p *fetchRequest) fetchInstances(instance ec2.Instance) { + describeInstancesInput := &ec2.DescribeInstancesInput{InstanceIds: []string{awsauto.SafeString(instance.InstanceId)}} + req := p.client.DescribeInstancesRequest(describeInstancesInput) + listen := ec2.NewDescribeInstancesPaginator(req) + + if listen.Err() != nil { + p.recordErrResult(listen.Err()) + } + + for { + select { + case <-p.context.Done(): + return + default: + if !listen.Next(p.context) { + return + } + + for _, reservation := range listen.CurrentPage().Reservations { + for _, instance := range reservation.Instances { + p.recordGoodResult(instance) + } + } + } + + } +} + +func (p *fetchRequest) recordGoodResult(instance ec2.Instance) { + p.resultsLock.Lock() + defer p.resultsLock.Unlock() + + p.ec2Instances = append(p.ec2Instances, &ec2Instance{instance}) +} + +func (p *fetchRequest) recordErrResult(err error) { + p.resultsLock.Lock() + defer p.resultsLock.Unlock() + + p.errs = append(p.errs, err) + + p.cancel() +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go new file mode 100644 index 00000000000..084322e06e8 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/fetch_test.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "testing" + + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/stretchr/testify/require" +) + +func Test_newAPIFetcher(t *testing.T) { + client := newMockEC2Client(0) + fetcher := newAPIFetcher([]ec2iface.ClientAPI{client}) + require.NotNil(t, fetcher) +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go new file mode 100644 index 00000000000..66c9cf6c6c2 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/mock_ec2_client_test.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" +) + +func newMockEC2Client(numResults int) mockEC2Client { + return mockEC2Client{numResults: numResults} +} + +type mockEC2Client struct { + ec2iface.ClientAPI + numResults int +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go new file mode 100644 index 00000000000..542ddf4925e --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/mocks_test.go @@ -0,0 +1,85 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "context" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" +) + +// mockFetcher is a fetcher that returns a customizable list of results, useful for testing. +type mockFetcher struct { + ec2Instances []*ec2Instance + err error + lock sync.Mutex +} + +func newMockFetcher(lbListeners []*ec2Instance, err error) *mockFetcher { + return &mockFetcher{ec2Instances: lbListeners, err: err} +} + +func (f *mockFetcher) fetch(ctx context.Context) ([]*ec2Instance, error) { + f.lock.Lock() + defer f.lock.Unlock() + + result := make([]*ec2Instance, len(f.ec2Instances)) + copy(result, f.ec2Instances) + + return result, f.err +} + +func (f *mockFetcher) setEC2s(newEC2s []*ec2Instance) { + f.lock.Lock() + defer f.lock.Unlock() + + f.ec2Instances = newEC2s +} + +func (f *mockFetcher) setError(err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.ec2Instances = []*ec2Instance{} + f.err = err +} + +func fakeEC2Instance() *ec2Instance { + runningCode := int64(16) + coreCount := int64(1) + threadsPerCore := int64(1) + publicDNSName := "ec2-1-2-3-4.us-west-1.compute.amazonaws.com" + publicIP := "1.2.3.4" + privateDNSName := "ip-5-6-7-8.us-west-1.compute.internal" + privateIP := "5.6.7.8" + instanceID := "i-123" + + instance := ec2.Instance{ + InstanceId: aws.String(instanceID), + InstanceType: ec2.InstanceTypeT2Medium, + Placement: &ec2.Placement{ + AvailabilityZone: aws.String("us-west-1a"), + }, + ImageId: aws.String("image-123"), + State: &ec2.InstanceState{ + Name: ec2.InstanceStateNameRunning, + Code: &runningCode, + }, + Monitoring: &ec2.Monitoring{ + State: ec2.MonitoringStateDisabled, + }, + CpuOptions: &ec2.CpuOptions{ + CoreCount: &coreCount, + ThreadsPerCore: &threadsPerCore, + }, + PublicDnsName: &publicDNSName, + PublicIpAddress: &publicIP, + PrivateDnsName: &privateDNSName, + PrivateIpAddress: &privateIP, + } + return &ec2Instance{ec2Instance: instance} +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go new file mode 100644 index 00000000000..4c9aa3e8b43 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go @@ -0,0 +1,150 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" +) + +func init() { + autodiscover.Registry.AddProvider("aws_ec2", AutodiscoverBuilder) +} + +// Provider implements autodiscover provider for aws EC2s. +type Provider struct { + config *awsauto.Config + bus bus.Bus + templates *template.Mapper + startListener bus.Listener + stopListener bus.Listener + watcher *watcher + uuid uuid.UUID +} + +// AutodiscoverBuilder is the main builder for this provider. +func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) { + cfgwarn.Experimental("aws_ec2 autodiscover is experimental") + + config := awsauto.DefaultConfig() + err := c.Unpack(&config) + if err != nil { + return nil, err + } + + awsCfg, err := awscommon.GetAWSCredentials( + awscommon.ConfigAWS{ + AccessKeyID: config.AWSConfig.AccessKeyID, + SecretAccessKey: config.AWSConfig.SecretAccessKey, + SessionToken: config.AWSConfig.SessionToken, + ProfileName: config.AWSConfig.ProfileName, + }) + + // Construct MetricSet with a full regions list if there is no region specified. + if config.Regions == nil { + // set default region to make initial aws api call + awsCfg.Region = "us-west-1" + svcEC2 := ec2.New(awsCfg) + completeRegionsList, err := awsauto.GetRegions(svcEC2) + if err != nil { + return nil, err + } + + config.Regions = completeRegionsList + } + + var clients []ec2iface.ClientAPI + for _, region := range config.Regions { + if err != nil { + logp.Error(errors.Wrap(err, "error loading AWS config for aws_ec2 autodiscover provider")) + } + awsCfg.Region = region + clients = append(clients, ec2.New(awsCfg)) + } + + return internalBuilder(uuid, bus, config, newAPIFetcher(clients)) +} + +// internalBuilder is mainly intended for testing via mocks and stubs. +// it can be configured to use a fetcher that doesn't actually hit the AWS API. +func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *awsauto.Config, fetcher fetcher) (*Provider, error) { + mapper, err := template.NewConfigMapper(config.Templates) + if err != nil { + return nil, err + } + + p := &Provider{ + config: config, + bus: bus, + templates: &mapper, + uuid: uuid, + } + + p.watcher = newWatcher( + fetcher, + config.Period, + p.onWatcherStart, + p.onWatcherStop, + ) + + return p, nil +} + +// Start the autodiscover process. +func (p *Provider) Start() { + p.watcher.start() +} + +// Stop the autodiscover process. +func (p *Provider) Stop() { + p.watcher.stop() +} + +func (p *Provider) onWatcherStart(instanceID string, instance *ec2Instance) { + e := bus.Event{ + "start": true, + "provider": p.uuid, + "id": instanceID, + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, + "cloud": instance.toCloudMap(), + "meta": common.MapStr{ + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, + "cloud": instance.toCloudMap(), + }, + } + + if configs := p.templates.GetConfig(e); configs != nil { + e["config"] = configs + } + p.bus.Publish(e) +} + +func (p *Provider) onWatcherStop(instanceID string) { + e := bus.Event{ + "stop": true, + "id": instanceID, + "provider": p.uuid, + } + p.bus.Publish(e) +} + +func (p *Provider) String() string { + return "aws_ec2" +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go new file mode 100644 index 00000000000..b9c3acc1748 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/provider_test.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/test" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_internalBuilder(t *testing.T) { + instance := fakeEC2Instance() + instances := []*ec2Instance{instance} + fetcher := newMockFetcher(instances, nil) + log := logp.NewLogger("ec2") + pBus := bus.New(log, "test") + + cfg := &awsauto.Config{ + Regions: []string{"us-east-1a", "us-west-1b"}, + Period: time.Nanosecond, + } + + uuid, _ := uuid.NewV4() + provider, err := internalBuilder(uuid, pBus, cfg, fetcher) + require.NoError(t, err) + + startListener := pBus.Subscribe("start") + stopListener := pBus.Subscribe("stop") + listenerDone := make(chan struct{}) + defer close(listenerDone) + + var events test.TestEventAccumulator + go func() { + for { + select { + case e := <-startListener.Events(): + events.Add(e) + case e := <-stopListener.Events(): + events.Add(e) + case <-listenerDone: + return + } + } + }() + + // Let run twice to ensure that duplicates don't create two start events + // Since we're turning a list of assets into a list of changes the second once() call should be a noop + provider.watcher.once() + provider.watcher.once() + events.WaitForNumEvents(t, 1, time.Second) + + assert.Equal(t, 1, events.Len()) + + expectedStartEvent := bus.Event{ + "id": instance.instanceID(), + "provider": uuid, + "start": true, + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, + "cloud": instance.toCloudMap(), + "meta": common.MapStr{ + "aws": common.MapStr{ + "ec2": instance.toMap(), + }, + "cloud": instance.toCloudMap(), + }, + } + + require.Equal(t, expectedStartEvent, events.Get()[0]) + + fetcher.setEC2s([]*ec2Instance{}) + + // Let run twice to ensure that duplicates don't cause an issue + provider.watcher.once() + provider.watcher.once() + events.WaitForNumEvents(t, 2, time.Second) + + require.Equal(t, 2, events.Len()) + + expectedStopEvent := bus.Event{ + "stop": true, + "id": awsauto.SafeString(instance.ec2Instance.InstanceId), + "provider": uuid, + } + + require.Equal(t, expectedStopEvent, events.Get()[1]) + + // Test that in an error situation nothing changes. + preErrorEventCount := events.Len() + fetcher.setError(errors.New("oops")) + + // Let run twice to ensure that duplicates don't cause an issue + provider.watcher.once() + provider.watcher.once() + + assert.Equal(t, preErrorEventCount, events.Len()) +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go new file mode 100644 index 00000000000..4ae56465d9c --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch.go @@ -0,0 +1,107 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" +) + +type watcher struct { + // gen tracks changes we increment the 'generation' of each entry in the map. + gen uint64 + fetcher fetcher + onStart func(uuid string, lblMap *ec2Instance) + onStop func(uuid string) + done chan struct{} + ticker *time.Ticker + period time.Duration + ec2Instances map[string]uint64 + logger *logp.Logger +} + +func newWatcher( + fetcher fetcher, + period time.Duration, + onStart func(uuid string, instanceMap *ec2Instance), + onStop func(uuid string)) *watcher { + return &watcher{ + fetcher: fetcher, + onStart: onStart, + onStop: onStop, + done: make(chan struct{}), + ticker: time.NewTicker(period), + period: period, + ec2Instances: map[string]uint64{}, + logger: logp.NewLogger("autodiscover-ec2-watcher"), + } +} + +func (w *watcher) start() { + go w.forever() +} + +func (w *watcher) stop() { + close(w.done) +} + +func (w *watcher) forever() { + for { + select { + case <-w.done: + w.ticker.Stop() + return + case <-w.ticker.C: + err := w.once() + if err != nil { + logp.Error(errors.Wrap(err, "error while fetching AWS EC2s")) + } + } + } +} + +// once executes the watch loop a single time. +// This is mostly useful for testing. +func (w *watcher) once() error { + ctx, cancelCtx := context.WithTimeout(context.Background(), w.period) + defer cancelCtx() // Always cancel to avoid leak + + fetchedEC2s, err := w.fetcher.fetch(ctx) + if err != nil { + return err + } + w.logger.Debugf("fetched %d ec2 instances from AWS for autodiscover", len(fetchedEC2s)) + + oldGen := w.gen + w.gen++ + + // Increment the generation of all EC2s returned by the API request + for _, instance := range fetchedEC2s { + instanceID := awsauto.SafeString(instance.ec2Instance.InstanceId) + if _, exists := w.ec2Instances[instanceID]; !exists { + if w.onStart != nil { + w.onStart(instanceID, instance) + } + } + w.ec2Instances[instanceID] = w.gen + } + + // EC2s not seen in the API request get deleted + for uuid, entryGen := range w.ec2Instances { + if entryGen == oldGen { + if w.onStop != nil { + w.onStop(uuid) + delete(w.ec2Instances, uuid) + } + } + } + + return nil +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go new file mode 100644 index 00000000000..83cae46efed --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/ec2/watch_test.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/stretchr/testify/assert" +) + +func TestWatchTicks(t *testing.T) { + instances := []*ec2Instance{fakeEC2Instance()} + + lock := sync.Mutex{} + var startUUIDs []string + var startEC2s []*ec2Instance + var stopUUIDs []string + + fetcher := newMockFetcher(instances, nil) + watcher := newWatcher( + fetcher, + time.Millisecond, + func(uuid string, lbListener *ec2Instance) { + lock.Lock() + defer lock.Unlock() + + startUUIDs = append(startUUIDs, uuid) + startEC2s = append(startEC2s, lbListener) + }, + func(uuid string) { + lock.Lock() + defer lock.Unlock() + + stopUUIDs = append(stopUUIDs, uuid) + }) + defer watcher.stop() + + // Run through 10 ticks + for i := 0; i < 10; i++ { + err := watcher.once() + require.NoError(t, err) + } + + // The instanceID is the unique identifier used. + instanceIDs := []string{*instances[0].ec2Instance.InstanceId} + + // Test that we've seen one ec2 start, but none stop + assert.Equal(t, instanceIDs, startUUIDs) + assert.Len(t, stopUUIDs, 0) + assert.Equal(t, instances, startEC2s) + + // Stop the ec2 and test that we see a single stop + // and no change to starts + fetcher.setEC2s(nil) + watcher.once() + + assert.Equal(t, instanceIDs, startUUIDs) + assert.Equal(t, instanceIDs, stopUUIDs) +} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go index 1cdb8e89984..33a7e8bb56a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch.go @@ -15,8 +15,6 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -const logSelector = "autodiscover-elb-fetch" - // fetcher is an interface that can fetch a list of lbListener (load balancer + listener) objects without pagination being necessary. type fetcher interface { fetch(ctx context.Context) ([]*lbListener, error) @@ -64,7 +62,7 @@ type apiFetcher struct { client elasticloadbalancingv2iface.ClientAPI } -func newAPIFetcher(ctx context.Context, clients []elasticloadbalancingv2iface.ClientAPI) fetcher { +func newAPIFetcher(clients []elasticloadbalancingv2iface.ClientAPI) fetcher { fetchers := make([]fetcher, len(clients)) for idx, client := range clients { fetchers[idx] = &apiFetcher{client} @@ -89,6 +87,7 @@ func (f *apiFetcher) fetch(ctx context.Context) ([]*lbListener, error) { taskPool: sync.Pool{}, context: ctx, cancel: cancel, + logger: logp.NewLogger("autodiscover-elb-fetch"), } // Limit concurrency against the AWS API by creating a pool of objects @@ -112,6 +111,7 @@ type fetchRequest struct { pendingTasks sync.WaitGroup context context.Context cancel func() + logger *logp.Logger } func (p *fetchRequest) fetch() ([]*lbListener, error) { @@ -138,14 +138,14 @@ func (p *fetchRequest) fetchAllPages() { for { select { case <-p.context.Done(): - logp.Debug(logSelector, "done fetching ELB pages, context cancelled") + p.logger.Debug("done fetching ELB pages, context cancelled") return default: if !p.fetchNextPage() { - logp.Debug(logSelector, "fetched all ELB pages") + p.logger.Debug("fetched all ELB pages") return } - logp.Debug(logSelector, "fetched ELB page") + p.logger.Debug("fetched ELB page") } } } diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go index 747c9738ed3..c1eadd70a3a 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/fetch_test.go @@ -5,7 +5,6 @@ package elb import ( - "context" "testing" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/elasticloadbalancingv2iface" @@ -14,6 +13,6 @@ import ( func Test_newAPIFetcher(t *testing.T) { client := newMockELBClient(0) - fetcher := newAPIFetcher(context.TODO(), []elasticloadbalancingv2iface.ClientAPI{client}) + fetcher := newAPIFetcher([]elasticloadbalancingv2iface.ClientAPI{client}) require.NotNil(t, fetcher) } diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go b/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go index 94e49867781..b673113872d 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/lblistener.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" "github.com/elastic/beats/libbeat/common" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" ) // lbListener is a tuple type representing an elasticloadbalancingv2.Listener and its associated elasticloadbalancingv2.LoadBalancer. @@ -21,8 +22,8 @@ func (l *lbListener) toMap() common.MapStr { // We fully spell out listener_arn to avoid confusion with the ARN for the whole ELB m := common.MapStr{ "listener_arn": l.listener.ListenerArn, - "load_balancer_arn": safeStrp(l.lb.LoadBalancerArn), - "host": safeStrp(l.lb.DNSName), + "load_balancer_arn": awsauto.SafeString(l.lb.LoadBalancerArn), + "host": awsauto.SafeString(l.lb.DNSName), "protocol": l.listener.Protocol, "type": string(l.lb.Type), "scheme": l.lb.Scheme, @@ -31,7 +32,7 @@ func (l *lbListener) toMap() common.MapStr { "state": l.stateMap(), "ip_address_type": string(l.lb.IpAddressType), "security_groups": l.lb.SecurityGroups, - "vpc_id": safeStrp(l.lb.VpcId), + "vpc_id": awsauto.SafeString(l.lb.VpcId), "ssl_policy": l.listener.SslPolicy, } @@ -42,18 +43,6 @@ func (l *lbListener) toMap() common.MapStr { return m } -// safeStrp makes handling AWS *string types easier. -// The AWS lib never returns plain strings, always using pointers, probably for memory efficiency reasons. -// This is a bit odd, because strings are just pointers into byte arrays, however this is the choice they've made. -// This will return the plain version of the given string or an empty string if the pointer is null -func safeStrp(strp *string) string { - if strp == nil { - return "" - } - - return *strp -} - func (l *lbListener) toCloudMap() common.MapStr { m := common.MapStr{} diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go index bad7fae4b15..b965f9ee1e2 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider.go @@ -5,10 +5,7 @@ package elb import ( - "context" - - awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" - + "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/elasticloadbalancingv2iface" "github.com/gofrs/uuid" @@ -19,6 +16,8 @@ import ( "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" + awscommon "github.com/elastic/beats/x-pack/libbeat/common/aws" ) func init() { @@ -27,7 +26,7 @@ func init() { // Provider implements autodiscover provider for aws ELBs. type Provider struct { - config *Config + config *awsauto.Config bus bus.Bus builders autodiscover.Builders appenders autodiscover.Appenders @@ -42,12 +41,32 @@ type Provider struct { func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) { cfgwarn.Experimental("aws_elb autodiscover is experimental") - config := defaultConfig() + config := awsauto.DefaultConfig() err := c.Unpack(&config) if err != nil { return nil, err } + awsCfg, err := awscommon.GetAWSCredentials(awscommon.ConfigAWS{ + AccessKeyID: config.AWSConfig.AccessKeyID, + SecretAccessKey: config.AWSConfig.SecretAccessKey, + SessionToken: config.AWSConfig.SessionToken, + ProfileName: config.AWSConfig.ProfileName, + }) + + // Construct MetricSet with a full regions list if there is no region specified. + if config.Regions == nil { + // set default region to make initial aws api call + awsCfg.Region = "us-west-1" + svcEC2 := ec2.New(awsCfg) + completeRegionsList, err := awsauto.GetRegions(svcEC2) + if err != nil { + return nil, err + } + + config.Regions = completeRegionsList + } + var clients []elasticloadbalancingv2iface.ClientAPI for _, region := range config.Regions { awsCfg, err := awscommon.GetAWSCredentials(awscommon.ConfigAWS{ @@ -63,32 +82,20 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis clients = append(clients, elasticloadbalancingv2.New(awsCfg)) } - return internalBuilder(uuid, bus, config, newAPIFetcher(context.TODO(), clients)) + return internalBuilder(uuid, bus, config, newAPIFetcher(clients)) } // internalBuilder is mainly intended for testing via mocks and stubs. // it can be configured to use a fetcher that doesn't actually hit the AWS API. -func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *Config, fetcher fetcher) (*Provider, error) { +func internalBuilder(uuid uuid.UUID, bus bus.Bus, config *awsauto.Config, fetcher fetcher) (*Provider, error) { mapper, err := template.NewConfigMapper(config.Templates) if err != nil { return nil, err } - builders, err := autodiscover.NewBuilders(config.Builders, nil) - if err != nil { - return nil, err - } - - appenders, err := autodiscover.NewAppenders(config.Appenders) - if err != nil { - return nil, err - } - p := &Provider{ config: config, bus: bus, - builders: builders, - appenders: appenders, templates: &mapper, uuid: uuid, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go index 0378bb7f93e..3447d595330 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go @@ -9,14 +9,14 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" + awsauto "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws" "github.com/gofrs/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/bus" - "github.com/elastic/beats/libbeat/logp" ) type testEventAccumulator struct { @@ -67,7 +67,7 @@ func Test_internalBuilder(t *testing.T) { fetcher := newMockFetcher(lbls, nil) pBus := bus.New(log, "test") - cfg := &Config{ + cfg := &awsauto.Config{ Regions: []string{"us-east-1a", "us-west-1b"}, Period: time.Nanosecond, } diff --git a/x-pack/libbeat/autodiscover/providers/aws/test/provider.go b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go new file mode 100644 index 00000000000..353b6d62631 --- /dev/null +++ b/x-pack/libbeat/autodiscover/providers/aws/test/provider.go @@ -0,0 +1,59 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package test + +import ( + "sync" + "testing" + "time" + + "github.com/elastic/beats/libbeat/common/bus" +) + +// TestEventAccumulator defined a list of events for testing +type TestEventAccumulator struct { + events []bus.Event + lock sync.Mutex +} + +// Add expends events +func (tea *TestEventAccumulator) Add(e bus.Event) { + tea.lock.Lock() + defer tea.lock.Unlock() + + tea.events = append(tea.events, e) +} + +// Len returns length of events +func (tea *TestEventAccumulator) Len() int { + tea.lock.Lock() + defer tea.lock.Unlock() + + return len(tea.events) +} + +// Get copies the event and return it +func (tea *TestEventAccumulator) Get() []bus.Event { + tea.lock.Lock() + defer tea.lock.Unlock() + + res := make([]bus.Event, len(tea.events)) + copy(res, tea.events) + return res +} + +// WaitForNumEvents waits to get target length of events +func (tea *TestEventAccumulator) WaitForNumEvents(t *testing.T, targetLen int, timeout time.Duration) { + start := time.Now() + + for time.Now().Sub(start) < timeout { + if tea.Len() >= targetLen { + return + } + time.Sleep(time.Millisecond) + } + + t.Fatalf("Timed out waiting for num events to be %d", targetLen) +} diff --git a/x-pack/libbeat/cmd/inject.go b/x-pack/libbeat/cmd/inject.go index 71d72a6c7c2..658209bb99b 100644 --- a/x-pack/libbeat/cmd/inject.go +++ b/x-pack/libbeat/cmd/inject.go @@ -12,6 +12,7 @@ import ( _ "github.com/elastic/beats/x-pack/libbeat/management" // register autodiscover providers + _ "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/ec2" _ "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/elb" )