From 45fb23a1a01ebb22ea0caef2f952985c017198ad Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 15 Jul 2024 19:05:29 +0200 Subject: [PATCH 01/12] Retry fetching CloudWatch metrics a few times CloudWatch metrics can take time to appear after a we create a brand new resource. --- .../module/aws/sqs/sqs_integration_test.go | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index ccab4e3690d..aa36873a109 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -8,8 +8,7 @@ package sqs import ( "testing" - - "github.com/stretchr/testify/assert" + "time" _ "github.com/elastic/beats/v7/libbeat/processors/actions" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" @@ -18,11 +17,23 @@ import ( func TestFetch(t *testing.T) { config := mtest.GetConfigForTest(t, "sqs", "300s") - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - events, errs := mbtest.ReportingFetchV2Error(metricSet) - if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + + retries := 2 + for i := 0; i < retries; i++ { + // The CloudWatch metrics can take a few minutes to appear, + // so we retry a few times + events, errs := mbtest.ReportingFetchV2Error(metricSet) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + + if len(events) > 0 { + break + } + + // No metrics yet, wait and retry + time.Sleep(5 * time.Minute) } assert.NotEmpty(t, events) From 7afca5d5d53ca54e24aa4770e6e478ea2fdd04de Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 15 Jul 2024 20:22:38 +0200 Subject: [PATCH 02/12] fix wrong import --- x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index aa36873a109..7418c85fe2d 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + _ "github.com/elastic/beats/v7/libbeat/processors/actions" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" From cbf43124e1d334f7f9d08a031b79a724f6c6135d Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 15 Jul 2024 22:32:46 +0200 Subject: [PATCH 03/12] Fix events --- x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index 7418c85fe2d..a0b04b9d2fe 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -13,11 +13,15 @@ import ( "github.com/stretchr/testify/assert" _ "github.com/elastic/beats/v7/libbeat/processors/actions" + "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" ) func TestFetch(t *testing.T) { + var events []mb.Event + var errs []error + config := mtest.GetConfigForTest(t, "sqs", "300s") metricSet := mbtest.NewReportingMetricSetV2Error(t, config) @@ -25,7 +29,7 @@ func TestFetch(t *testing.T) { for i := 0; i < retries; i++ { // The CloudWatch metrics can take a few minutes to appear, // so we retry a few times - events, errs := mbtest.ReportingFetchV2Error(metricSet) + events, errs = mbtest.ReportingFetchV2Error(metricSet) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } From 3b7c381567824a43e247dec85f902cd9ee9ac700 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 15 Jul 2024 23:35:30 +0200 Subject: [PATCH 04/12] Retry 5 times every 60s --- x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index a0b04b9d2fe..57c7dc61f75 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -25,7 +25,7 @@ func TestFetch(t *testing.T) { config := mtest.GetConfigForTest(t, "sqs", "300s") metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - retries := 2 + retries := 5 for i := 0; i < retries; i++ { // The CloudWatch metrics can take a few minutes to appear, // so we retry a few times @@ -39,7 +39,7 @@ func TestFetch(t *testing.T) { } // No metrics yet, wait and retry - time.Sleep(5 * time.Minute) + time.Sleep(1 * time.Minute) } assert.NotEmpty(t, events) From 5b39dc1c3de110da5d2a8f6d2630334d8dba7a74 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 00:42:37 +0200 Subject: [PATCH 05/12] Switch to PeriodicReportingFetchV2Error --- metricbeat/mb/testing/modules.go | 30 +++++++++++ .../module/aws/sqs/sqs_integration_test.go | 52 ++++++++++++------- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 1dcc9b075b8..4d6a7f46fc8 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -57,6 +57,7 @@ package testing import ( "context" + "github.com/elastic/go-concert/timed" "sync" "testing" "time" @@ -259,6 +260,35 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, return r.events, r.errs } +// PeriodicReportingFetchV2Error runs the given metricset and returns all the +// events and errors that occur during that period. +func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + r := &CapturingReporterV2{} + _ = timed.Periodic(ctx, period, func() error { + // Fetch the metrics and store them in the + // reporter. + if err := metricSet.Fetch(r); err != nil { + r.errs = append(r.errs, err) + return err + } + + if len(r.events) > 0 { + // We have metrics, stop the periodic + // and return the metrics. + cancel() + } + + // No metrics yet, retry again + // in the next period. + return nil + }) + + return r.events, r.errs +} + // ReportingFetchV2WithContext runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) { diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index 57c7dc61f75..0ddc8d9af5e 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -13,33 +13,45 @@ import ( "github.com/stretchr/testify/assert" _ "github.com/elastic/beats/v7/libbeat/processors/actions" - "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" ) -func TestFetch(t *testing.T) { - var events []mb.Event - var errs []error +//func TestFetch(t *testing.T) { +// var events []mb.Event +// var errs []error +// +// config := mtest.GetConfigForTest(t, "sqs", "300s") +// metricSet := mbtest.NewReportingMetricSetV2Error(t, config) +// +// retries := 5 +// for i := 0; i < retries; i++ { +// // The CloudWatch metrics can take a few minutes to appear, +// // so we retry a few times +// events, errs = mbtest.ReportingFetchV2Error(metricSet) +// if len(errs) > 0 { +// t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) +// } +// +// if len(events) > 0 { +// break +// } +// +// // No metrics yet, wait and retry +// time.Sleep(1 * time.Minute) +// } +// +// assert.NotEmpty(t, events) +// mbtest.TestMetricsetFieldsDocumented(t, metricSet, events) +//} +func TestFetch(t *testing.T) { config := mtest.GetConfigForTest(t, "sqs", "300s") - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - retries := 5 - for i := 0; i < retries; i++ { - // The CloudWatch metrics can take a few minutes to appear, - // so we retry a few times - events, errs = mbtest.ReportingFetchV2Error(metricSet) - if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) - } - - if len(events) > 0 { - break - } - - // No metrics yet, wait and retry - time.Sleep(1 * time.Minute) + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.PeriodicReportingFetchV2Error(metricSet, 1*time.Minute, 8*time.Minute) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } assert.NotEmpty(t, events) From 6ba87cb914425955f3e80894ee5b91d19de0abdf Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 00:56:24 +0200 Subject: [PATCH 06/12] Fix linter complaints --- metricbeat/mb/testing/modules.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 4d6a7f46fc8..cc6b0480af1 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -57,11 +57,12 @@ package testing import ( "context" - "github.com/elastic/go-concert/timed" "sync" "testing" "time" + "github.com/elastic/go-concert/timed" + "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" From 27e2d7a343435209da17e6cdb2f9b5e7c1af966f Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 00:58:57 +0200 Subject: [PATCH 07/12] Remove unused code --- metricbeat/mb/testing/modules.go | 45 -------------------------------- 1 file changed, 45 deletions(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index cc6b0480af1..d177e521bf8 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -57,7 +57,6 @@ package testing import ( "context" - "sync" "testing" "time" @@ -134,25 +133,6 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re return metricsets } -func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet { - metricSet := NewMetricSet(t, config) - - reportingMetricSet, ok := metricSet.(mb.ReportingMetricSet) - if !ok { - t.Fatal("MetricSet does not implement ReportingMetricSet") - } - - return reportingMetricSet -} - -// ReportingFetch runs the given reporting metricset and returns all of the -// events and errors that occur during that period. -func ReportingFetch(metricSet mb.ReportingMetricSet) ([]mapstr.M, []error) { - r := &capturingReporter{} - metricSet.Fetch(r) - return r.events, r.errs -} - // NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 { @@ -341,31 +321,6 @@ func (r *capturingReporter) Done() <-chan struct{} { return r.done } -// RunPushMetricSet run the given push metricset for the specific amount of time -// and returns all of the events and errors that occur during that period. -func RunPushMetricSet(duration time.Duration, metricSet mb.PushMetricSet) ([]mapstr.M, []error) { - r := &capturingReporter{done: make(chan struct{})} - - // Run the metricset. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - metricSet.Run(r) - }() - - // Let it run for some period, then stop it by closing the done channel. - time.AfterFunc(duration, func() { - close(r.done) - }) - - // Wait for the PushMetricSet to completely stop. - wg.Wait() - - // Return all events and errors that were collected. - return r.events, r.errs -} - // NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. From 374cfc497fbadef93ab5893d3e6bd7e294289396 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 01:01:08 +0200 Subject: [PATCH 08/12] Remove unused code --- metricbeat/mb/testing/modules.go | 41 -------------------------------- 1 file changed, 41 deletions(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index d177e521bf8..803e90da14c 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -64,7 +64,6 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/mapstr" ) type TestModule struct { @@ -281,46 +280,6 @@ func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ( return r.events, r.errs } -// NewPushMetricSet instantiates a new PushMetricSet using the given -// configuration. The ModuleFactory and MetricSetFactory are obtained from the -// global Registry. -func NewPushMetricSet(t testing.TB, config interface{}) mb.PushMetricSet { - metricSet := NewMetricSet(t, config) - - pushMetricSet, ok := metricSet.(mb.PushMetricSet) - if !ok { - t.Fatal("MetricSet does not implement PushMetricSet") - } - - return pushMetricSet -} - -type capturingReporter struct { - events []mapstr.M - errs []error - done chan struct{} -} - -func (r *capturingReporter) Event(event mapstr.M) bool { - r.events = append(r.events, event) - return true -} - -func (r *capturingReporter) ErrorWith(err error, meta mapstr.M) bool { - r.events = append(r.events, meta) - r.errs = append(r.errs, err) - return true -} - -func (r *capturingReporter) Error(err error) bool { - r.errs = append(r.errs, err) - return true -} - -func (r *capturingReporter) Done() <-chan struct{} { - return r.done -} - // NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. From 8fab1db759dbf8b4cfeaa0d41fc660013860eac9 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 01:27:49 +0200 Subject: [PATCH 09/12] Address linter complaints --- metricbeat/mb/testing/modules.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 803e90da14c..1712ca27764 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -167,7 +167,7 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting // NewReportingMetricSetV2Errors returns an array of new ReportingMetricSetV2 instances. func NewReportingMetricSetV2Errors(t testing.TB, config interface{}) []mb.ReportingMetricSetV2Error { metricSets := NewMetricSets(t, config) - var reportingMetricSets []mb.ReportingMetricSetV2Error + reportingMetricSets := make([]mb.ReportingMetricSetV2Error, 0, len(metricSets)) for _, metricSet := range metricSets { rMS, ok := metricSet.(mb.ReportingMetricSetV2Error) if !ok { @@ -374,15 +374,15 @@ func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event { // BlockingCapture blocks until waitEvents n of events are captured func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event { var events []mb.Event - for { - select { - case e := <-r.eventsC: - events = append(events, e) - if waitEvents > 0 && len(events) >= waitEvents { - return events - } + + for e := range r.eventsC { + events = append(events, e) + if waitEvents > 0 && len(events) >= waitEvents { + return events } } + + return events } // RunPushMetricSetV2 run the given push metricset for the specific amount of From 4a06f1a9ffeb9408fee68867ac8866e216ab51f6 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 01:39:43 +0200 Subject: [PATCH 10/12] Address linter complaints --- metricbeat/mb/testing/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 1712ca27764..e9db1dfbe6d 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -373,7 +373,7 @@ func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event { // BlockingCapture blocks until waitEvents n of events are captured func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event { - var events []mb.Event + events := make([]mb.Event, 0, waitEvents) for e := range r.eventsC { events = append(events, e) From eb285fcc438111481d29b95b81950a4df1e115e5 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 18:12:55 +0200 Subject: [PATCH 11/12] Improve function docs --- metricbeat/mb/testing/modules.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index e9db1dfbe6d..8c6e09df537 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -240,8 +240,14 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, return r.events, r.errs } -// PeriodicReportingFetchV2Error runs the given metricset and returns all the -// events and errors that occur during that period. +// PeriodicReportingFetchV2Error runs the given metricset and returns +// the first batch of events or errors that occur during that period. +// +// `period` is the time between each fetch. +// `timeout` is the maximum time to wait for the first event. +// +// The function tries to fetch the metrics every `period` until it gets +// the first batch of metrics or the `timeout` is reached. func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() From 82fafee8b31d0620efd4c1c05b45c6342e23c0cd Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 16 Jul 2024 18:13:03 +0200 Subject: [PATCH 12/12] Cleanup --- .../module/aws/sqs/sqs_integration_test.go | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index 0ddc8d9af5e..4970e481b8f 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -17,34 +17,6 @@ import ( "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" ) -//func TestFetch(t *testing.T) { -// var events []mb.Event -// var errs []error -// -// config := mtest.GetConfigForTest(t, "sqs", "300s") -// metricSet := mbtest.NewReportingMetricSetV2Error(t, config) -// -// retries := 5 -// for i := 0; i < retries; i++ { -// // The CloudWatch metrics can take a few minutes to appear, -// // so we retry a few times -// events, errs = mbtest.ReportingFetchV2Error(metricSet) -// if len(errs) > 0 { -// t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) -// } -// -// if len(events) > 0 { -// break -// } -// -// // No metrics yet, wait and retry -// time.Sleep(1 * time.Minute) -// } -// -// assert.NotEmpty(t, events) -// mbtest.TestMetricsetFieldsDocumented(t, metricSet, events) -//} - func TestFetch(t *testing.T) { config := mtest.GetConfigForTest(t, "sqs", "300s")