From 1f89a43b3636ab33c639254acd78fe6c9dec2b2b Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Wed, 13 Feb 2019 14:48:31 +0100 Subject: [PATCH 01/14] [Metricbeat] Add reporting interface with error In Metricbeat modules we often see the pattern that before fetching the actual data, some processing is done and checks for errors happen. If an error happened, we have 3 lines of code: * creating the error with wrap * reporting the error * logging the error By introducing the reporter interface with support for return an error I would like to eliminate the overhead and allow to directly return and error which is then reported and also logged. So far we logged the error on the Error level. I'm now wondering if we should log these errors actually on the Info level as it's normally not a misbehaving of the Beat but the service does not respond as expected. So for the operator of the Beat normally no actions are needed. As an example metricset I took elasticsearch.node. --- metricbeat/mb/builders.go | 7 +++++-- metricbeat/mb/mb.go | 7 +++++++ metricbeat/mb/module/wrapper.go | 9 ++++++++- metricbeat/module/elasticsearch/node/node.go | 16 ++++------------ 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index db00b072eac..deb2e43ec63 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -237,15 +237,18 @@ func mustImplementFetcher(ms MetricSet) error { ifcs = append(ifcs, "ReportingMetricSetV2") } + if _, ok := ms.(ReportingMetricSetV2Error); ok { + ifcs = append(ifcs, "ReportingMetricSetV2Error") + } + if _, ok := ms.(PushMetricSetV2); ok { ifcs = append(ifcs, "PushMetricSetV2") } - switch len(ifcs) { case 0: return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+ "producing interface (EventFetcher, EventsFetcher, "+ - "ReportingMetricSet, ReportingMetricSetV2, PushMetricSet, or "+ + "ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+ "PushMetricSetV2)", ms.Module().Name(), ms.Name()) case 1: diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 0d38a14e5b9..2aba6573e74 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -201,6 +201,13 @@ type ReportingMetricSetV2 interface { Fetch(r ReporterV2) } +// ReportingMetricSetV2 is a MetricSet that reports events or errors through the +// ReporterV2 interface. Fetch is called periodically to collect events. +type ReportingMetricSetV2Error interface { + MetricSet + Fetch(r ReporterV2) error +} + // PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them // periodically via a Fetch callback). Run is invoked to start the event // subscription and it should block until the MetricSet is ready to stop or diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index c835447e653..245876cd655 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -192,7 +192,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { case mb.PushMetricSetV2: ms.Run(reporter.V2()) case mb.EventFetcher, mb.EventsFetcher, - mb.ReportingMetricSet, mb.ReportingMetricSetV2: + mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error: msw.startPeriodicFetching(reporter) default: // Earlier startup stages prevent this from happening. @@ -236,6 +236,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) { case mb.ReportingMetricSetV2: reporter.StartFetchTimer() fetcher.Fetch(reporter.V2()) + case mb.ReportingMetricSetV2Error: + reporter.StartFetchTimer() + err := fetcher.Fetch(reporter.V2()) + if err != nil { + reporter.V2().Error(err) + logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) } diff --git a/metricbeat/module/elasticsearch/node/node.go b/metricbeat/module/elasticsearch/node/node.go index 8eb0c635bfe..17337553ce6 100644 --- a/metricbeat/module/elasticsearch/node/node.go +++ b/metricbeat/module/elasticsearch/node/node.go @@ -20,7 +20,6 @@ package node import ( "github.com/pkg/errors" - "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/elasticsearch" @@ -64,23 +63,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right format // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - elastic.ReportAndLogError(err, r, m.Log) - return + return err } info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+nodeStatsPath) if err != nil { - err = errors.Wrap(err, "failed to get info from Elasticsearch") - elastic.ReportAndLogError(err, r, m.Log) - return + return errors.Wrap(err, "failed to get info from Elasticsearch") } - err = eventsMapping(r, *info, content) - if err != nil { - elastic.ReportAndLogError(err, r, m.Log) - return - } + return eventsMapping(r, *info, content) } From d79179b85dcf8a5dc177f7078414397ecdb18d2c Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Fri, 15 Feb 2019 21:53:29 +0100 Subject: [PATCH 02/14] fix tests --- metricbeat/mb/testing/modules.go | 13 +++++++++++++ metricbeat/module/elasticsearch/node/node_test.go | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 88d3ce78e0c..c9a727f20e8 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -168,6 +168,19 @@ func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetri return reportingMetricSetV2 } +// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then +// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. +func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.ReportingMetricSetV2Error { + metricSet := newMetricSet(t, config) + + reportingMetricSetV2Error, ok := metricSet.(mb.ReportingMetricSetV2Error) + if !ok { + t.Fatal("MetricSet does not implement ReportingMetricSetV2") + } + + return reportingMetricSetV2Error +} + // CapturingReporterV2 is a reporter used for testing which stores all events and errors type CapturingReporterV2 struct { events []mb.Event diff --git a/metricbeat/module/elasticsearch/node/node_test.go b/metricbeat/module/elasticsearch/node/node_test.go index b6aaf020189..2c3001c36ed 100644 --- a/metricbeat/module/elasticsearch/node/node_test.go +++ b/metricbeat/module/elasticsearch/node/node_test.go @@ -71,7 +71,7 @@ func TestFetch(t *testing.T) { } reporter := &mbtest.CapturingReporterV2{} - metricSet := mbtest.NewReportingMetricSetV2(t, config) + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) metricSet.Fetch(reporter) e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) From 50b7c7655b4fa1407cb2485940d0c04d3d0a4ae8 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Fri, 15 Feb 2019 22:05:48 +0100 Subject: [PATCH 03/14] make hound happy --- metricbeat/mb/testing/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index c9a727f20e8..8ce0d8ec19b 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -168,7 +168,7 @@ func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetri return reportingMetricSetV2 } -// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then +// NewReportingMetricSetV2Error returns a new ReportingMetricSetV2 instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.ReportingMetricSetV2Error { metricSet := newMetricSet(t, config) From 47ca261fc3fb720c5c7d77767ee8808e035f5005 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Mon, 4 Mar 2019 14:51:00 +0100 Subject: [PATCH 04/14] address review feedback --- metricbeat/mb/mb.go | 2 +- metricbeat/mb/testing/modules.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 2aba6573e74..e38655a6907 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -201,7 +201,7 @@ type ReportingMetricSetV2 interface { Fetch(r ReporterV2) } -// ReportingMetricSetV2 is a MetricSet that reports events or errors through the +// ReportingMetricSetV2Error is a MetricSet that reports events or errors through the // ReporterV2 interface. Fetch is called periodically to collect events. type ReportingMetricSetV2Error interface { MetricSet diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 8ce0d8ec19b..f0154576d96 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -175,7 +175,7 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting reportingMetricSetV2Error, ok := metricSet.(mb.ReportingMetricSetV2Error) if !ok { - t.Fatal("MetricSet does not implement ReportingMetricSetV2") + t.Fatal("MetricSet does not implement ReportingMetricSetV2Error") } return reportingMetricSetV2Error From 3cac41b362c330c32f86f26b292af4ce1c3e38a9 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Tue, 5 Mar 2019 08:13:41 +0100 Subject: [PATCH 05/14] add more code for testing purpose --- metricbeat/mb/testing/data_generator.go | 35 +++++++++++++- metricbeat/mb/testing/modules.go | 39 ++++++++++++++++ .../elasticsearch_integration_test.go | 46 ++++++++++++++----- 3 files changed, 107 insertions(+), 13 deletions(-) diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 7677b0dd05c..d50c90c18b7 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -93,6 +93,12 @@ func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string) return WriteEventsReporterV2Cond(f, t, path, nil) } +// WriteEventsReporterV2Error fetches events and writes the first event to a ./_meta/data.json +// file. +func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, path string) error { + return WriteEventsReporterV2ErrorCond(f, t, path, nil) +} + // WriteEventsReporterV2Cond fetches events and writes the first event that matches // the condition to a file. func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error { @@ -120,6 +126,33 @@ func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path str return nil } +// WriteEventsReporterV2Cond fetches events and writes the first event that matches +// the condition to a file. +func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB, path string, cond func(common.MapStr) bool) error { + if !*dataFlag { + t.Skip("skip data generation tests") + } + + events, errs := ReportingFetchV2Error(f) + if len(errs) > 0 { + return errs[0] + } + + if len(events) == 0 { + return fmt.Errorf("no events were generated") + } + + match, err := SelectEventV2(f, events, cond) + if err != nil { + return err + } + + e := StandardizeEvent(f, match, mb.AddMetricSetInfo) + + WriteEventToDataJSON(t, e, path) + return nil +} + // CreateFullEvent builds a full event given the data generated by a MetricSet. // This simulates the output of Metricbeat as if it were // 2016-05-23T08:05:34.853Z and the hostname is host.example.com. @@ -198,7 +231,7 @@ func SelectEvent(events []common.MapStr, cond func(e common.MapStr) bool) (commo } // SelectEventV2 selects the first event that matches an specific condition -func SelectEventV2(f mb.ReportingMetricSetV2, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) { +func SelectEventV2(f mb.MetricSet, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) { if cond == nil && len(events) > 0 { return events[0], nil } diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index f0154576d96..d27abc23f7f 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -209,6 +209,34 @@ func (r *CapturingReporterV2) GetErrors() []error { return r.errs } +// CapturingReporterV2 is a reporter used for testing which stores all events and errors +type CapturingReporterV2Error struct { + events []mb.Event + errs []error +} + +// Event is used to report an event +func (r *CapturingReporterV2Error) Event(event mb.Event) bool { + r.events = append(r.events, event) + return true +} + +// Error is used to report an error +func (r *CapturingReporterV2Error) Error(err error) bool { + r.errs = append(r.errs, err) + return true +} + +// GetEvents returns all reported events +func (r *CapturingReporterV2Error) GetEvents() []mb.Event { + return r.events +} + +// GetErrors returns all reported errors +func (r *CapturingReporterV2Error) GetErrors() []error { + return r.errs +} + // ReportingFetchV2 runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) { @@ -217,6 +245,17 @@ func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) { return r.events, r.errs } +// ReportingFetchV2Error runs the given reporting metricset and returns all of the +// events and errors that occur during that period. +func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, []error) { + r := &CapturingReporterV2{} + err := metricSet.Fetch(r) + if err != nil { + r.errs = append(r.errs, err) + } + return r.events, r.errs +} + // NewPushMetricSet instantiates a new PushMetricSet using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 6e173a95a4c..90407ed1205 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -29,6 +29,8 @@ import ( "os" "testing" + "github.com/elastic/beats/metricbeat/mb" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -85,15 +87,27 @@ func TestFetch(t *testing.T) { for _, metricSet := range metricSets { checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) - events, errs := mbtest.ReportingFetchV2(f) - - assert.Empty(t, errs) - if !assert.NotEmpty(t, events) { - t.FailNow() + var events []mb.Event + var errs []error + if metricSet == "node" { + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet)) + events, errs = mbtest.ReportingFetchV2Error(f) + assert.Empty(t, errs) + if !assert.NotEmpty(t, events) { + t.FailNow() + } + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), + events[0].BeatEvent("elasticsearch", metricSet).Fields.StringToPrint()) + } else { + f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) + events, errs = mbtest.ReportingFetchV2(f) + assert.Empty(t, errs) + if !assert.NotEmpty(t, events) { + t.FailNow() + } + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), + events[0].BeatEvent("elasticsearch", metricSet).Fields.StringToPrint()) } - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), - events[0].BeatEvent("elasticsearch", metricSet).Fields.StringToPrint()) }) } } @@ -111,10 +125,18 @@ func TestData(t *testing.T) { for _, metricSet := range metricSets { checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) - err := mbtest.WriteEventsReporterV2(f, t, metricSet) - if err != nil { - t.Fatal("write", err) + if metricSet == "node" { + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet)) + err := mbtest.WriteEventsReporterV2Error(f, t, metricSet) + if err != nil { + t.Fatal("write", err) + } + } else { + f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) + err := mbtest.WriteEventsReporterV2(f, t, metricSet) + if err != nil { + t.Fatal("write", err) + } } }) } From 16df4886aa53a1a677d078ca3c36cd3d9023c3c4 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Tue, 5 Mar 2019 08:17:03 +0100 Subject: [PATCH 06/14] make hound happy --- metricbeat/mb/testing/data_generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index d50c90c18b7..ae23018560d 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -126,7 +126,7 @@ func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path str return nil } -// WriteEventsReporterV2Cond fetches events and writes the first event that matches +// WriteEventsReporterV2ErrorCond fetches events and writes the first event that matches // the condition to a file. func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB, path string, cond func(common.MapStr) bool) error { if !*dataFlag { From accef70e0653b4d11c2ce011539d71362e016175 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Tue, 5 Mar 2019 08:21:06 +0100 Subject: [PATCH 07/14] simplify code --- metricbeat/mb/testing/data_generator.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index ae23018560d..1bdb6c69e5d 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -111,19 +111,7 @@ func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path str return errs[0] } - if len(events) == 0 { - return fmt.Errorf("no events were generated") - } - - match, err := SelectEventV2(f, events, cond) - if err != nil { - return err - } - - e := StandardizeEvent(f, match, mb.AddMetricSetInfo) - - WriteEventToDataJSON(t, e, path) - return nil + return writeEvent(events, f, t, path, cond) } // WriteEventsReporterV2ErrorCond fetches events and writes the first event that matches @@ -138,6 +126,10 @@ func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB return errs[0] } + return writeEvent(events, f, t, path, cond) +} + +func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error { if len(events) == 0 { return fmt.Errorf("no events were generated") } From 1c9a36ca712d2acc784ce4c7d8c86dcfb17543ca Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Tue, 5 Mar 2019 08:21:46 +0100 Subject: [PATCH 08/14] make hound happy --- metricbeat/mb/testing/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index d27abc23f7f..665ddbf0998 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -209,7 +209,7 @@ func (r *CapturingReporterV2) GetErrors() []error { return r.errs } -// CapturingReporterV2 is a reporter used for testing which stores all events and errors +// CapturingReporterV2Error is a reporter used for testing which stores all events and errors type CapturingReporterV2Error struct { events []mb.Event errs []error From c30e9d431b06670e5093317d40a9b56bc0222b43 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Tue, 5 Mar 2019 08:22:19 +0100 Subject: [PATCH 09/14] sort import packages --- .../module/elasticsearch/elasticsearch_integration_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 90407ed1205..062d8876a2b 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -29,15 +29,13 @@ import ( "os" "testing" - "github.com/elastic/beats/metricbeat/mb" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/tests/compose" "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/elastic/beats/metricbeat/module/elasticsearch" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr" From a5fbb4818196eba74e86d734e2c12b52b28aa18a Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Tue, 5 Mar 2019 14:17:08 +0100 Subject: [PATCH 10/14] skip ccr tests --- .../module/elasticsearch/elasticsearch_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 062d8876a2b..3112e3c8f32 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -50,7 +50,7 @@ import ( ) var metricSets = []string{ - "ccr", + //"ccr", "cluster_stats", "index", "index_recovery", From c2da9909794bebf6920b67312dcd13c18702575d Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Wed, 6 Mar 2019 13:11:13 +0100 Subject: [PATCH 11/14] revert es cahgnes --- .../elasticsearch_integration_test.go | 48 ++++++------------- metricbeat/module/elasticsearch/node/node.go | 16 +++++-- metricbeat/module/php_fpm/process/data.go | 6 +-- metricbeat/module/php_fpm/process/process.go | 10 ++-- .../process/process_integration_test.go | 4 +- 5 files changed, 35 insertions(+), 49 deletions(-) diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 3112e3c8f32..6e173a95a4c 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -30,12 +30,12 @@ import ( "testing" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/tests/compose" "github.com/elastic/beats/metricbeat/helper/elastic" - "github.com/elastic/beats/metricbeat/mb" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/elastic/beats/metricbeat/module/elasticsearch" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr" @@ -50,7 +50,7 @@ import ( ) var metricSets = []string{ - //"ccr", + "ccr", "cluster_stats", "index", "index_recovery", @@ -85,27 +85,15 @@ func TestFetch(t *testing.T) { for _, metricSet := range metricSets { checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { - var events []mb.Event - var errs []error - if metricSet == "node" { - f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet)) - events, errs = mbtest.ReportingFetchV2Error(f) - assert.Empty(t, errs) - if !assert.NotEmpty(t, events) { - t.FailNow() - } - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), - events[0].BeatEvent("elasticsearch", metricSet).Fields.StringToPrint()) - } else { - f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) - events, errs = mbtest.ReportingFetchV2(f) - assert.Empty(t, errs) - if !assert.NotEmpty(t, events) { - t.FailNow() - } - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), - events[0].BeatEvent("elasticsearch", metricSet).Fields.StringToPrint()) + f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) + events, errs := mbtest.ReportingFetchV2(f) + + assert.Empty(t, errs) + if !assert.NotEmpty(t, events) { + t.FailNow() } + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), + events[0].BeatEvent("elasticsearch", metricSet).Fields.StringToPrint()) }) } } @@ -123,18 +111,10 @@ func TestData(t *testing.T) { for _, metricSet := range metricSets { checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { - if metricSet == "node" { - f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet)) - err := mbtest.WriteEventsReporterV2Error(f, t, metricSet) - if err != nil { - t.Fatal("write", err) - } - } else { - f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) - err := mbtest.WriteEventsReporterV2(f, t, metricSet) - if err != nil { - t.Fatal("write", err) - } + f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) + err := mbtest.WriteEventsReporterV2(f, t, metricSet) + if err != nil { + t.Fatal("write", err) } }) } diff --git a/metricbeat/module/elasticsearch/node/node.go b/metricbeat/module/elasticsearch/node/node.go index 17337553ce6..8eb0c635bfe 100644 --- a/metricbeat/module/elasticsearch/node/node.go +++ b/metricbeat/module/elasticsearch/node/node.go @@ -20,6 +20,7 @@ package node import ( "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/elasticsearch" @@ -63,16 +64,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right format // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. -func (m *MetricSet) Fetch(r mb.ReporterV2) error { +func (m *MetricSet) Fetch(r mb.ReporterV2) { content, err := m.HTTP.FetchContent() if err != nil { - return err + elastic.ReportAndLogError(err, r, m.Log) + return } info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+nodeStatsPath) if err != nil { - return errors.Wrap(err, "failed to get info from Elasticsearch") + err = errors.Wrap(err, "failed to get info from Elasticsearch") + elastic.ReportAndLogError(err, r, m.Log) + return } - return eventsMapping(r, *info, content) + err = eventsMapping(r, *info, content) + if err != nil { + elastic.ReportAndLogError(err, r, m.Log) + return + } } diff --git a/metricbeat/module/php_fpm/process/data.go b/metricbeat/module/php_fpm/process/data.go index bb8e6c5cacb..705041b101b 100644 --- a/metricbeat/module/php_fpm/process/data.go +++ b/metricbeat/module/php_fpm/process/data.go @@ -47,12 +47,11 @@ type phpFpmProcess struct { LastRequestMemory int `json:"last request memory"` } -func eventsMapping(r mb.ReporterV2, content []byte) { +func eventsMapping(r mb.ReporterV2, content []byte) error { var status phpFpmStatus err := json.Unmarshal(content, &status) if err != nil { - r.Error(err) - return + return err } //remapping process details to match the naming format for _, process := range status.Processes { @@ -94,4 +93,5 @@ func eventsMapping(r mb.ReporterV2, content []byte) { event.ModuleFields.Put("pool.name", status.Name) r.Event(event) } + return nil } diff --git a/metricbeat/module/php_fpm/process/process.go b/metricbeat/module/php_fpm/process/process.go index 6774d91b4f9..85dde711ae3 100644 --- a/metricbeat/module/php_fpm/process/process.go +++ b/metricbeat/module/php_fpm/process/process.go @@ -60,11 +60,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) { +func (m *MetricSet) Fetch(report mb.ReporterV2) error { u, err := url.Parse(m.GetURI()) if err != nil { - report.Error(err) - return + return err } u, err = parse.SetQueryParams(u, "full") if err == nil { @@ -72,8 +71,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) { } content, err := m.HTTP.FetchContent() if err != nil { - report.Error(err) - return + return err } - eventsMapping(report, content) + return eventsMapping(report, content) } diff --git a/metricbeat/module/php_fpm/process/process_integration_test.go b/metricbeat/module/php_fpm/process/process_integration_test.go index a523648c9ad..a5f7a1d8d64 100644 --- a/metricbeat/module/php_fpm/process/process_integration_test.go +++ b/metricbeat/module/php_fpm/process/process_integration_test.go @@ -32,8 +32,8 @@ import ( func TestFetch(t *testing.T) { compose.EnsureUp(t, "phpfpm") - f := mbtest.NewReportingMetricSetV2(t, getConfig()) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) + events, errs := mbtest.ReportingFetchV2Error(f) assert.Empty(t, errs) if !assert.NotEmpty(t, events) { From 5f4dce41ad21482140a71b9d58d5a53b2abd07cf Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Wed, 6 Mar 2019 13:19:29 +0100 Subject: [PATCH 12/14] make data_test compatible with both interfaces --- metricbeat/mb/testing/data_test.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/metricbeat/mb/testing/data_test.go b/metricbeat/mb/testing/data_test.go index aa0c3c6f18b..803bfec6d3c 100644 --- a/metricbeat/mb/testing/data_test.go +++ b/metricbeat/mb/testing/data_test.go @@ -106,8 +106,21 @@ func runTest(t *testing.T, file string, module, metricSetName, url string) { s := server(t, file, url) defer s.Close() - metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL)) - events, errs := ReportingFetchV2(metricSet) + metricSet := newMetricSet(t, getConfig(module, metricSetName, s.URL)) + + var events []mb.Event + var errs []error + + switch v := metricSet.(type) { + case mb.ReportingMetricSetV2: + metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL)) + events, errs = ReportingFetchV2(metricSet) + case mb.ReportingMetricSetV2Error: + metricSet := NewReportingMetricSetV2Error(t, getConfig(module, metricSetName, s.URL)) + events, errs = ReportingFetchV2Error(metricSet) + default: + t.Fatalf("unknown type: %T", v) + } // Gather errors to build also error events for _, e := range errs { From 4ee1e127414704668e7de7b9970239066ad516cf Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Wed, 6 Mar 2019 14:02:22 +0100 Subject: [PATCH 13/14] revert es change --- metricbeat/module/elasticsearch/node/node_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/elasticsearch/node/node_test.go b/metricbeat/module/elasticsearch/node/node_test.go index 2c3001c36ed..b6aaf020189 100644 --- a/metricbeat/module/elasticsearch/node/node_test.go +++ b/metricbeat/module/elasticsearch/node/node_test.go @@ -71,7 +71,7 @@ func TestFetch(t *testing.T) { } reporter := &mbtest.CapturingReporterV2{} - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet := mbtest.NewReportingMetricSetV2(t, config) metricSet.Fetch(reporter) e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) From 681f41af935f3fb863d0fc2b02071d9ec65b16c2 Mon Sep 17 00:00:00 2001 From: beats-jenkins Date: Wed, 6 Mar 2019 14:13:32 +0100 Subject: [PATCH 14/14] remove not needed interface --- metricbeat/mb/testing/modules.go | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 665ddbf0998..234feee3568 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -209,34 +209,6 @@ func (r *CapturingReporterV2) GetErrors() []error { return r.errs } -// CapturingReporterV2Error is a reporter used for testing which stores all events and errors -type CapturingReporterV2Error struct { - events []mb.Event - errs []error -} - -// Event is used to report an event -func (r *CapturingReporterV2Error) Event(event mb.Event) bool { - r.events = append(r.events, event) - return true -} - -// Error is used to report an error -func (r *CapturingReporterV2Error) Error(err error) bool { - r.errs = append(r.errs, err) - return true -} - -// GetEvents returns all reported events -func (r *CapturingReporterV2Error) GetEvents() []mb.Event { - return r.events -} - -// GetErrors returns all reported errors -func (r *CapturingReporterV2Error) GetErrors() []error { - return r.errs -} - // ReportingFetchV2 runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) {