diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 1dcc9b075b8..8c6e09df537 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -57,13 +57,13 @@ package testing import ( "context" - "sync" "testing" "time" + "github.com/elastic/go-concert/timed" + "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/mapstr" ) type TestModule struct { @@ -132,25 +132,6 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re return metricsets } -func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet { - metricSet := NewMetricSet(t, config) - - reportingMetricSet, ok := metricSet.(mb.ReportingMetricSet) - if !ok { - t.Fatal("MetricSet does not implement ReportingMetricSet") - } - - return reportingMetricSet -} - -// ReportingFetch runs the given reporting metricset and returns all of the -// events and errors that occur during that period. -func ReportingFetch(metricSet mb.ReportingMetricSet) ([]mapstr.M, []error) { - r := &capturingReporter{} - metricSet.Fetch(r) - return r.events, r.errs -} - // NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 { @@ -186,7 +167,7 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting // NewReportingMetricSetV2Errors returns an array of new ReportingMetricSetV2 instances. func NewReportingMetricSetV2Errors(t testing.TB, config interface{}) []mb.ReportingMetricSetV2Error { metricSets := NewMetricSets(t, config) - var reportingMetricSets []mb.ReportingMetricSetV2Error + reportingMetricSets := make([]mb.ReportingMetricSetV2Error, 0, len(metricSets)) for _, metricSet := range metricSets { rMS, ok := metricSet.(mb.ReportingMetricSetV2Error) if !ok { @@ -259,6 +240,41 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, return r.events, r.errs } +// PeriodicReportingFetchV2Error runs the given metricset and returns +// the first batch of events or errors that occur during that period. +// +// `period` is the time between each fetch. +// `timeout` is the maximum time to wait for the first event. +// +// The function tries to fetch the metrics every `period` until it gets +// the first batch of metrics or the `timeout` is reached. +func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + r := &CapturingReporterV2{} + _ = timed.Periodic(ctx, period, func() error { + // Fetch the metrics and store them in the + // reporter. + if err := metricSet.Fetch(r); err != nil { + r.errs = append(r.errs, err) + return err + } + + if len(r.events) > 0 { + // We have metrics, stop the periodic + // and return the metrics. + cancel() + } + + // No metrics yet, retry again + // in the next period. + return nil + }) + + return r.events, r.errs +} + // ReportingFetchV2WithContext runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) { @@ -270,71 +286,6 @@ func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ( return r.events, r.errs } -// NewPushMetricSet instantiates a new PushMetricSet using the given -// configuration. The ModuleFactory and MetricSetFactory are obtained from the -// global Registry. -func NewPushMetricSet(t testing.TB, config interface{}) mb.PushMetricSet { - metricSet := NewMetricSet(t, config) - - pushMetricSet, ok := metricSet.(mb.PushMetricSet) - if !ok { - t.Fatal("MetricSet does not implement PushMetricSet") - } - - return pushMetricSet -} - -type capturingReporter struct { - events []mapstr.M - errs []error - done chan struct{} -} - -func (r *capturingReporter) Event(event mapstr.M) bool { - r.events = append(r.events, event) - return true -} - -func (r *capturingReporter) ErrorWith(err error, meta mapstr.M) bool { - r.events = append(r.events, meta) - r.errs = append(r.errs, err) - return true -} - -func (r *capturingReporter) Error(err error) bool { - r.errs = append(r.errs, err) - return true -} - -func (r *capturingReporter) Done() <-chan struct{} { - return r.done -} - -// RunPushMetricSet run the given push metricset for the specific amount of time -// and returns all of the events and errors that occur during that period. -func RunPushMetricSet(duration time.Duration, metricSet mb.PushMetricSet) ([]mapstr.M, []error) { - r := &capturingReporter{done: make(chan struct{})} - - // Run the metricset. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - metricSet.Run(r) - }() - - // Let it run for some period, then stop it by closing the done channel. - time.AfterFunc(duration, func() { - close(r.done) - }) - - // Wait for the PushMetricSet to completely stop. - wg.Wait() - - // Return all events and errors that were collected. - return r.events, r.errs -} - // NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. @@ -428,16 +379,16 @@ func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event { // BlockingCapture blocks until waitEvents n of events are captured func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event { - var events []mb.Event - for { - select { - case e := <-r.eventsC: - events = append(events, e) - if waitEvents > 0 && len(events) >= waitEvents { - return events - } + events := make([]mb.Event, 0, waitEvents) + + for e := range r.eventsC { + events = append(events, e) + if waitEvents > 0 && len(events) >= waitEvents { + return events } } + + return events } // RunPushMetricSetV2 run the given push metricset for the specific amount of diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index ccab4e3690d..4970e481b8f 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -8,6 +8,7 @@ package sqs import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -20,7 +21,7 @@ func TestFetch(t *testing.T) { config := mtest.GetConfigForTest(t, "sqs", "300s") metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - events, errs := mbtest.ReportingFetchV2Error(metricSet) + events, errs := mbtest.PeriodicReportingFetchV2Error(metricSet, 1*time.Minute, 8*time.Minute) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) }