Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PeriodicReportingFetchV2Error to fetch metrics multiple times #40251

Merged
merged 12 commits into from
Jul 17, 2024
141 changes: 46 additions & 95 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ package testing

import (
"context"
"sync"
"testing"
"time"

"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type TestModule struct {
Expand Down Expand Up @@ -132,25 +132,6 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re
return metricsets
}

func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet {
metricSet := NewMetricSet(t, config)

reportingMetricSet, ok := metricSet.(mb.ReportingMetricSet)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSet")
}

return reportingMetricSet
}

// ReportingFetch runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetch(metricSet mb.ReportingMetricSet) ([]mapstr.M, []error) {
r := &capturingReporter{}
metricSet.Fetch(r)
return r.events, r.errs
}

// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 {
Expand Down Expand Up @@ -186,7 +167,7 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting
// NewReportingMetricSetV2Errors returns an array of new ReportingMetricSetV2 instances.
func NewReportingMetricSetV2Errors(t testing.TB, config interface{}) []mb.ReportingMetricSetV2Error {
metricSets := NewMetricSets(t, config)
var reportingMetricSets []mb.ReportingMetricSetV2Error
reportingMetricSets := make([]mb.ReportingMetricSetV2Error, 0, len(metricSets))
for _, metricSet := range metricSets {
rMS, ok := metricSet.(mb.ReportingMetricSetV2Error)
if !ok {
Expand Down Expand Up @@ -259,6 +240,41 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event,
return r.events, r.errs
}

// PeriodicReportingFetchV2Error runs the given metricset and returns
// the first batch of events or errors that occur during that period.
//
// `period` is the time between each fetch.
// `timeout` is the maximum time to wait for the first event.
//
// The function tries to fetch the metrics every `period` until it gets
// the first batch of metrics or the `timeout` is reached.
func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

r := &CapturingReporterV2{}
_ = timed.Periodic(ctx, period, func() error {
// Fetch the metrics and store them in the
// reporter.
if err := metricSet.Fetch(r); err != nil {
r.errs = append(r.errs, err)
return err
}

if len(r.events) > 0 {
// We have metrics, stop the periodic
// and return the metrics.
cancel()
}

// No metrics yet, retry again
// in the next period.
return nil
})

return r.events, r.errs
}

// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) {
Expand All @@ -270,71 +286,6 @@ func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) (
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
func NewPushMetricSet(t testing.TB, config interface{}) mb.PushMetricSet {
metricSet := NewMetricSet(t, config)

pushMetricSet, ok := metricSet.(mb.PushMetricSet)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSet")
}

return pushMetricSet
}

type capturingReporter struct {
events []mapstr.M
errs []error
done chan struct{}
}

func (r *capturingReporter) Event(event mapstr.M) bool {
r.events = append(r.events, event)
return true
}

func (r *capturingReporter) ErrorWith(err error, meta mapstr.M) bool {
r.events = append(r.events, meta)
r.errs = append(r.errs, err)
return true
}

func (r *capturingReporter) Error(err error) bool {
r.errs = append(r.errs, err)
return true
}

func (r *capturingReporter) Done() <-chan struct{} {
return r.done
}

// RunPushMetricSet run the given push metricset for the specific amount of time
// and returns all of the events and errors that occur during that period.
func RunPushMetricSet(duration time.Duration, metricSet mb.PushMetricSet) ([]mapstr.M, []error) {
r := &capturingReporter{done: make(chan struct{})}

// Run the metricset.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
metricSet.Run(r)
}()

// Let it run for some period, then stop it by closing the done channel.
time.AfterFunc(duration, func() {
close(r.done)
})

// Wait for the PushMetricSet to completely stop.
wg.Wait()

// Return all events and errors that were collected.
return r.events, r.errs
}

// NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down Expand Up @@ -428,16 +379,16 @@ func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event {

// BlockingCapture blocks until waitEvents n of events are captured
func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event {
var events []mb.Event
for {
select {
case e := <-r.eventsC:
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
events := make([]mb.Event, 0, waitEvents)

for e := range r.eventsC {
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
}

return events
}

// RunPushMetricSetV2 run the given push metricset for the specific amount of
Expand Down
3 changes: 2 additions & 1 deletion x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sqs

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -20,7 +21,7 @@ func TestFetch(t *testing.T) {
config := mtest.GetConfigForTest(t, "sqs", "300s")

metricSet := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(metricSet)
events, errs := mbtest.PeriodicReportingFetchV2Error(metricSet, 1*time.Minute, 8*time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning on replacing ReportingfetchV2Error with PeriodicReportingFetchV2Error in other metricsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to focus this PR on removing the flakiness from the test suite so that other teams can continue with their tasks.

Do you know of other tests that need more than one call to Fetch() to succeed due to latency or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if this solution proves effective, we can adopt it for more tests that need more than one fetch.

if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down
Loading