Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new interfaces for ReporterV2 with context #11981

Merged
merged 10 commits into from
Jun 26, 2019
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
- Use the go-lookslike library for testing in heartbeat. Eventually the mapval package will be replaced with it. {pull}12540[12540]
- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]
- Generate configuration from `mage` for all Beats. {pull}12618[12618]
13 changes: 11 additions & 2 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,24 @@ func mustImplementFetcher(ms MetricSet) error {
ifcs = append(ifcs, "ReportingMetricSetV2Error")
}

if _, ok := ms.(ReportingMetricSetV2WithContext); ok {
ifcs = append(ifcs, "ReportingMetricSetV2WithContext")
}

if _, ok := ms.(PushMetricSetV2); ok {
ifcs = append(ifcs, "PushMetricSetV2")
}

if _, ok := ms.(PushMetricSetV2WithContext); ok {
ifcs = append(ifcs, "PushMetricSetV2WithContext")
}

switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+
"PushMetricSetV2)",
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+
"PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)",
ms.Module().Name(), ms.Name())
case 1:
return nil
Expand Down
17 changes: 17 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ to implement Modules and their associated MetricSets.
package mb

import (
"context"
"fmt"
"net/url"
"time"
Expand Down Expand Up @@ -208,6 +209,13 @@ type ReportingMetricSetV2Error interface {
Fetch(r ReporterV2) error
}

// ReportingMetricSetV2WithContext is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2WithContext interface {
MetricSet
Fetch(ctx context.Context, r ReporterV2) error
}

// PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
Expand All @@ -217,6 +225,15 @@ type PushMetricSetV2 interface {
Run(r PushReporterV2)
}

// PushMetricSetV2WithContext is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the context is closed.
type PushMetricSetV2WithContext interface {
MetricSet
Run(ctx context.Context, r ReporterV2)
}

// HostData contains values parsed from the 'host' configuration. Other
// configuration data like protocols, usernames, and passwords may also be
// used to construct this HostData data.
Expand Down
39 changes: 33 additions & 6 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package module

import (
"context"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -191,9 +192,11 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
ms.Run(reporter.V1())
case mb.PushMetricSetV2:
ms.Run(reporter.V2())
case mb.PushMetricSetV2WithContext:
ms.Run(&channelContext{done}, reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
msw.startPeriodicFetching(reporter)
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
msw.startPeriodicFetching(&channelContext{done}, reporter)
default:
// Earlier startup stages prevent this from happening.
logp.Err("MetricSet '%s/%s' does not implement an event producing interface",
Expand All @@ -204,9 +207,9 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
// startPeriodicFetching performs an immediate fetch for the MetricSet then it
// begins a continuous timer scheduled loop to fetch data. To stop the loop the
// done channel should be closed.
func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) {
// Fetch immediately.
msw.fetch(reporter)
msw.fetch(ctx, reporter)

// Start timer for future fetches.
t := time.NewTicker(msw.Module().Config().Period)
Expand All @@ -216,15 +219,15 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
case <-reporter.V2().Done():
return
case <-t.C:
msw.fetch(reporter)
msw.fetch(ctx, reporter)
}
}
}

// fetch invokes the appropriate Fetch method for the MetricSet and publishes
// the result using the publisher client. This method will recover from panics
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(reporter reporter) {
func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
msw.singleEventFetch(fetcher, reporter)
Expand All @@ -243,6 +246,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) {
reporter.V2().Error(err)
logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
}
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down Expand Up @@ -313,6 +323,23 @@ func (r *eventReporter) V1() mb.PushReporter {
}
func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} }

// channelContext implements context.Context by wrapping a channel
type channelContext struct {
done <-chan struct{}
}

func (r *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
func (r *channelContext) Done() <-chan struct{} { return r.done }
func (r *channelContext) Err() error {
select {
case <-r.done:
return context.Canceled
default:
return nil
}
}
func (r *channelContext) Value(key interface{}) interface{} { return nil }

// reporterV1 wraps V2 to provide a v1 interface.
type reporterV1 struct {
v2 mb.PushReporterV2
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, pa
return WriteEventsReporterV2ErrorCond(f, t, path, nil)
}

// WriteEventsReporterV2WithContext fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEventsReporterV2WithContext(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string) error {
return WriteEventsReporterV2WithContextCond(f, t, path, nil)
}

// WriteEventsReporterV2Cond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error {
Expand Down Expand Up @@ -129,6 +135,21 @@ func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB
return writeEvent(events, f, t, path, cond)
}

// WriteEventsReporterV2WithContextCond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2WithContextCond(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string, cond func(common.MapStr) bool) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

events, errs := ReportingFetchV2WithContext(f)
if len(errs) > 0 {
return errs[0]
}

return writeEvent(events, f, t, path, cond)
}

func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error {
if len(events) == 0 {
return fmt.Errorf("no events were generated")
Expand Down
121 changes: 76 additions & 45 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ that Metricbeat does it and with the same validations.
package testing

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -181,6 +182,19 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting
return reportingMetricSetV2Error
}

// NewReportingMetricSetV2WithContext returns a new ReportingMetricSetV2WithContext instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2WithContext(t testing.TB, config interface{}) mb.ReportingMetricSetV2WithContext {
metricSet := NewMetricSet(t, config)

reportingMetricSet, ok := metricSet.(mb.ReportingMetricSetV2WithContext)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSetV2WithContext")
}

return reportingMetricSet
}

// CapturingReporterV2 is a reporter used for testing which stores all events and errors
type CapturingReporterV2 struct {
events []mb.Event
Expand Down Expand Up @@ -228,6 +242,17 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event,
return r.events, r.errs
}

// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) {
r := &CapturingReporterV2{}
err := metricSet.Fetch(context.Background(), r)
if err != nil {
r.errs = append(r.errs, err)
}
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down Expand Up @@ -301,7 +326,21 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 {

pushMetricSet, ok := metricSet.(mb.PushMetricSetV2)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSet")
t.Fatal("MetricSet does not implement PushMetricSetV2")
}

return pushMetricSet
}

// NewPushMetricSetV2WithContext instantiates a new PushMetricSetV2WithContext
// using the given configuration. The ModuleFactory and MetricSetFactory are
// obtained from the global Registry.
func NewPushMetricSetV2WithContext(t testing.TB, config interface{}) mb.PushMetricSetV2WithContext {
metricSet := NewMetricSet(t, config)

pushMetricSet, ok := metricSet.(mb.PushMetricSetV2WithContext)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSetV2WithContext")
}

return pushMetricSet
Expand All @@ -310,15 +349,19 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 {
// capturingPushReporterV2 stores all the events and errors from a metricset's
// Run method.
type capturingPushReporterV2 struct {
doneC chan struct{}
context.Context
eventsC chan mb.Event
}

func newCapturingPushReporterV2(ctx context.Context) *capturingPushReporterV2 {
return &capturingPushReporterV2{Context: ctx, eventsC: make(chan mb.Event)}
}

// report writes an event to the output channel and returns true. If the output
// is closed it returns false.
func (r *capturingPushReporterV2) report(event mb.Event) bool {
select {
case <-r.doneC:
case <-r.Done():
// Publisher is stopped.
return false
case r.eventsC <- event:
Expand All @@ -336,54 +379,42 @@ func (r *capturingPushReporterV2) Error(err error) bool {
return r.report(mb.Event{Error: err})
}

// Done returns the Done channel for this reporter.
func (r *capturingPushReporterV2) Done() <-chan struct{} {
return r.doneC
func (r *capturingPushReporterV2) capture(waitEvents int) []mb.Event {
var events []mb.Event
for {
select {
case <-r.Done():
// Timeout
return events
case e := <-r.eventsC:
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
}
}
}

// RunPushMetricSetV2 run the given push metricset for the specific amount of
// time and returns all of the events and errors that occur during that period.
func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event {
var (
r = &capturingPushReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)}
wg sync.WaitGroup
events []mb.Event
)
wg.Add(2)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Producer
go func() {
defer wg.Done()
defer close(r.eventsC)
if closer, ok := metricSet.(mb.Closer); ok {
defer closer.Close()
}
metricSet.Run(r)
}()
r := newCapturingPushReporterV2(ctx)

// Consumer
go func() {
defer wg.Done()
defer close(r.doneC)

timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return
case e, ok := <-r.eventsC:
if !ok {
return
}
events = append(events, e)
if waitEvents > 0 && waitEvents <= len(events) {
return
}
}
}
}()
go metricSet.Run(r)
return r.capture(waitEvents)
}

wg.Wait()
return events
// RunPushMetricSetV2WithContext run the given push metricset for the specific amount of
// time and returns all of the events that occur during that period.
func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2WithContext) []mb.Event {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

r := newCapturingPushReporterV2(ctx)

go metricSet.Run(ctx, r)
return r.capture(waitEvents)
}
4 changes: 2 additions & 2 deletions metricbeat/module/docker/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch returns a list of all containers as events.
// This is based on https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/list-containers.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
func (m *MetricSet) Fetch(ctx context.Context, r mb.ReporterV2) error {
// Fetch a list of all containers.
containers, err := m.dockerClient.ContainerList(context.Background(), types.ContainerListOptions{})
containers, err := m.dockerClient.ContainerList(ctx, types.ContainerListOptions{})
if err != nil {
return errors.Wrap(err, "failed to get docker containers list")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2WithContext(t, getConfig())
if err := mbtest.WriteEventsReporterV2WithContext(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
Loading