Skip to content

Commit

Permalink
Add PeriodicReportingFetchV2Error to fetch metrics multiple times (#4…
Browse files Browse the repository at this point in the history
…0251)

* Retry fetching CloudWatch metrics a few times

CloudWatch metrics can take time to appear after a we create a brand
new resource.

* fix wrong import

* Fix events

* Retry 5 times every 60s

* Switch to PeriodicReportingFetchV2Error

* Fix linter complaints

* Remove unused code

* Remove unused code

* Address linter complaints

* Address linter complaints

* Improve function docs

* Cleanup

(cherry picked from commit 532133b)
  • Loading branch information
zmoog authored and mergify[bot] committed Jul 18, 2024
1 parent 8af2d57 commit 1a15b3b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 96 deletions.
141 changes: 46 additions & 95 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ package testing

import (
"context"
"sync"
"testing"
"time"

"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type TestModule struct {
Expand Down Expand Up @@ -132,25 +132,6 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re
return metricsets
}

func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet {
metricSet := NewMetricSet(t, config)

reportingMetricSet, ok := metricSet.(mb.ReportingMetricSet)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSet")
}

return reportingMetricSet
}

// ReportingFetch runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetch(metricSet mb.ReportingMetricSet) ([]mapstr.M, []error) {
r := &capturingReporter{}
metricSet.Fetch(r)
return r.events, r.errs
}

// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 {
Expand Down Expand Up @@ -186,7 +167,7 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting
// NewReportingMetricSetV2Errors returns an array of new ReportingMetricSetV2 instances.
func NewReportingMetricSetV2Errors(t testing.TB, config interface{}) []mb.ReportingMetricSetV2Error {
metricSets := NewMetricSets(t, config)
var reportingMetricSets []mb.ReportingMetricSetV2Error
reportingMetricSets := make([]mb.ReportingMetricSetV2Error, 0, len(metricSets))
for _, metricSet := range metricSets {
rMS, ok := metricSet.(mb.ReportingMetricSetV2Error)
if !ok {
Expand Down Expand Up @@ -259,6 +240,41 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event,
return r.events, r.errs
}

// PeriodicReportingFetchV2Error runs the given metricset and returns
// the first batch of events or errors that occur during that period.
//
// `period` is the time between each fetch.
// `timeout` is the maximum time to wait for the first event.
//
// The function tries to fetch the metrics every `period` until it gets
// the first batch of metrics or the `timeout` is reached.
func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

r := &CapturingReporterV2{}
_ = timed.Periodic(ctx, period, func() error {
// Fetch the metrics and store them in the
// reporter.
if err := metricSet.Fetch(r); err != nil {
r.errs = append(r.errs, err)
return err
}

if len(r.events) > 0 {
// We have metrics, stop the periodic
// and return the metrics.
cancel()
}

// No metrics yet, retry again
// in the next period.
return nil
})

return r.events, r.errs
}

// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) {
Expand All @@ -270,71 +286,6 @@ func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) (
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
func NewPushMetricSet(t testing.TB, config interface{}) mb.PushMetricSet {
metricSet := NewMetricSet(t, config)

pushMetricSet, ok := metricSet.(mb.PushMetricSet)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSet")
}

return pushMetricSet
}

type capturingReporter struct {
events []mapstr.M
errs []error
done chan struct{}
}

func (r *capturingReporter) Event(event mapstr.M) bool {
r.events = append(r.events, event)
return true
}

func (r *capturingReporter) ErrorWith(err error, meta mapstr.M) bool {
r.events = append(r.events, meta)
r.errs = append(r.errs, err)
return true
}

func (r *capturingReporter) Error(err error) bool {
r.errs = append(r.errs, err)
return true
}

func (r *capturingReporter) Done() <-chan struct{} {
return r.done
}

// RunPushMetricSet run the given push metricset for the specific amount of time
// and returns all of the events and errors that occur during that period.
func RunPushMetricSet(duration time.Duration, metricSet mb.PushMetricSet) ([]mapstr.M, []error) {
r := &capturingReporter{done: make(chan struct{})}

// Run the metricset.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
metricSet.Run(r)
}()

// Let it run for some period, then stop it by closing the done channel.
time.AfterFunc(duration, func() {
close(r.done)
})

// Wait for the PushMetricSet to completely stop.
wg.Wait()

// Return all events and errors that were collected.
return r.events, r.errs
}

// NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down Expand Up @@ -428,16 +379,16 @@ func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event {

// BlockingCapture blocks until waitEvents n of events are captured
func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event {
var events []mb.Event
for {
select {
case e := <-r.eventsC:
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
events := make([]mb.Event, 0, waitEvents)

for e := range r.eventsC {
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
}

return events
}

// RunPushMetricSetV2 run the given push metricset for the specific amount of
Expand Down
3 changes: 2 additions & 1 deletion x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sqs

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -20,7 +21,7 @@ func TestFetch(t *testing.T) {
config := mtest.GetConfigForTest(t, "sqs", "300s")

metricSet := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(metricSet)
events, errs := mbtest.PeriodicReportingFetchV2Error(metricSet, 1*time.Minute, 8*time.Minute)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down

0 comments on commit 1a15b3b

Please sign in to comment.