From 41f55e5d92aae94de16022a7b50f2aecaa54411d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 8 Sep 2016 15:22:10 +0100 Subject: [PATCH] Support Processor & Aggregator Plugins closes #1726 --- agent/agent.go | 59 +++++++++---- aggregator.go | 16 ++++ cmd/telegraf/telegraf.go | 2 + internal/config/config.go | 85 +++++++++++++++++- internal/models/running_filter.go | 37 ++++++++ internal/models/running_filter_test.go | 117 +++++++++++++++++++++++++ internal/models/running_output_test.go | 4 - plugins/aggregators/all/all.go | 5 ++ plugins/aggregators/min/min.go | 35 ++++++++ plugins/aggregators/registry.go | 11 +++ plugins/processors/all/all.go | 5 ++ plugins/processors/printer/printer.go | 35 ++++++++ plugins/processors/registry.go | 11 +++ processor.go | 12 +++ 14 files changed, 409 insertions(+), 25 deletions(-) create mode 100644 aggregator.go create mode 100644 internal/models/running_filter.go create mode 100644 internal/models/running_filter_test.go create mode 100644 plugins/aggregators/all/all.go create mode 100644 plugins/aggregators/min/min.go create mode 100644 plugins/aggregators/registry.go create mode 100644 plugins/processors/all/all.go create mode 100644 plugins/processors/printer/printer.go create mode 100644 plugins/processors/registry.go create mode 100644 processor.go diff --git a/agent/agent.go b/agent/agent.go index d86037e79edeb..95b8bea19a973 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -107,15 +107,13 @@ func (a *Agent) gatherer( input *models.RunningInput, interval time.Duration, metricC chan telegraf.Metric, -) error { +) { defer panicRecover(input) ticker := time.NewTicker(interval) defer ticker.Stop() for { - var outerr error - acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) acc.SetPrecision(a.Config.Agent.Precision.Duration, @@ -128,9 +126,6 @@ func (a *Agent) gatherer( gatherWithTimeout(shutdown, input, acc, interval) elapsed := time.Since(start) - if outerr != nil { - return outerr - } if a.Config.Agent.Debug { log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n", input.Name, interval, elapsed) @@ -138,7 +133,7 @@ func (a *Agent) gatherer( select { case <-shutdown: - return nil + return case <-ticker.C: continue } @@ -259,24 +254,54 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 200) - ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) + // create an output metric channel and a gorouting that continously passes + // each metric onto the output plugins & aggregators. + outMetricC := make(chan telegraf.Metric, 100) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-shutdown: + // TODO aggregators should get stopped here + if len(outMetricC) > 0 { + // keep going until outMetricC is flushed + continue + } + return + case m := <-outMetricC: + // TODO send metrics to aggregators (copy all) + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(m) + } else { + o.AddMetric(copyMetric(m)) + } + } + } + } + }() + ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) for { select { case <-shutdown: log.Println("Hang on, flushing any cached metrics before shutdown") + // wait for outMetricC to get flushed before flushing outputs + wg.Wait() a.flush() return nil case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(copyMetric(m)) - } + case metric := <-metricC: + mS := []telegraf.Metric{metric} + for _, processor := range a.Config.Processors { + mS = processor.Apply(mS...) + } + for _, m := range mS { + outMetricC <- m } } } @@ -353,9 +378,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { } go func(in *models.RunningInput, interv time.Duration) { defer wg.Done() - if err := a.gatherer(shutdown, in, interv, metricC); err != nil { - log.Printf(err.Error()) - } + a.gatherer(shutdown, in, interv, metricC) }(input, interval) } diff --git a/aggregator.go b/aggregator.go new file mode 100644 index 0000000000000..fdd1290364ad9 --- /dev/null +++ b/aggregator.go @@ -0,0 +1,16 @@ +package telegraf + +type Aggregator interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the metric to the aggregator + Apply(in Metric) + + // Start starts the service filter with the given accumulator + Start(acc Accumulator) + Stop() +} diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 022280d6bbbe4..57b65fd3ca90f 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -12,10 +12,12 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/aggregators/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" + _ "github.com/influxdata/telegraf/plugins/processors/all" "github.com/kardianos/service" ) diff --git a/internal/config/config.go b/internal/config/config.go index 30e62789023ec..381d8202d947c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/config" @@ -47,9 +48,10 @@ type Config struct { InputFilters []string OutputFilters []string - Agent *AgentConfig - Inputs []*models.RunningInput - Outputs []*models.RunningOutput + Agent *AgentConfig + Inputs []*models.RunningInput + Outputs []*models.RunningOutput + Processors []*models.RunningProcessor } func NewConfig() *Config { @@ -64,6 +66,7 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), + Processors: make([]*models.RunningProcessor, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -499,6 +502,7 @@ func (c *Config) LoadConfig(path string) error { case "outputs": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { + // legacy [outputs.influxdb] support case *ast.Table: if err = c.addOutput(pluginName, pluginSubTable); err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) @@ -517,6 +521,7 @@ func (c *Config) LoadConfig(path string) error { case "inputs", "plugins": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { + // legacy [inputs.cpu] support case *ast.Table: if err = c.addInput(pluginName, pluginSubTable); err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) @@ -532,6 +537,35 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filters": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addProcessor(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } + case "aggregators": + // TODO support building aggregator plugins + // for pluginName, pluginVal := range subTable.Fields { + // switch pluginSubTable := pluginVal.(type) { + // case []*ast.Table: + // for _, t := range pluginSubTable { + // if err = c.addProcessor(pluginName, t); err != nil { + // return fmt.Errorf("Error parsing %s, %s", path, err) + // } + // } + // default: + // return fmt.Errorf("Unsupported config format: %s, file %s", + // pluginName, path) + // } + // } // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -572,6 +606,32 @@ func parseFile(fpath string) (*ast.Table, error) { return toml.Parse(contents) } +func (c *Config) addProcessor(name string, table *ast.Table) error { + creator, ok := processors.Processors[name] + if !ok { + return fmt.Errorf("Undefined but requested processor: %s", name) + } + processor := creator() + + processorConfig, err := buildProcessor(name, table) + if err != nil { + return err + } + + if err := config.UnmarshalTable(table, processor); err != nil { + return err + } + + rf := &models.RunningProcessor{ + Name: name, + Processor: processor, + Config: processorConfig, + } + + c.Processors = append(c.Processors, rf) + return nil +} + func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil @@ -652,6 +712,25 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } +// buildProcessor TODO doc +func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { + conf := &models.ProcessorConfig{Name: name} + unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop", + "tagexclude", "taginclude"} + for _, field := range unsupportedFields { + if _, ok := tbl.Fields[field]; ok { + // TODO raise error because field is not supported + } + } + + var err error + conf.Filter, err = buildFilter(tbl) + if err != nil { + return conf, err + } + return conf, nil +} + // buildFilter builds a Filter // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // be inserted into the models.OutputConfig/models.InputConfig diff --git a/internal/models/running_filter.go b/internal/models/running_filter.go new file mode 100644 index 0000000000000..617a572ba1514 --- /dev/null +++ b/internal/models/running_filter.go @@ -0,0 +1,37 @@ +package models + +import ( + "github.com/influxdata/telegraf" +) + +type RunningProcessor struct { + Name string + Processor telegraf.Processor + Config *ProcessorConfig +} + +// FilterConfig containing a name and filter +type ProcessorConfig struct { + Name string + Filter Filter +} + +func (rf *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + ret := []telegraf.Metric{} + + for _, metric := range in { + if rf.Config.Filter.IsActive() { + // check if the filter should be applied to this metric + if ok := rf.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok { + // this means filter should not be applied + ret = append(ret, metric) + continue + } + } + // This metric should pass through the filter, so call the filter Apply + // function and append results to the output slice. + ret = append(ret, rf.Processor.Apply(metric)...) + } + + return ret +} diff --git a/internal/models/running_filter_test.go b/internal/models/running_filter_test.go new file mode 100644 index 0000000000000..8a691a9b8f54f --- /dev/null +++ b/internal/models/running_filter_test.go @@ -0,0 +1,117 @@ +package models + +import ( + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +type TestProcessor struct { +} + +func (f *TestProcessor) SampleConfig() string { return "" } +func (f *TestProcessor) Description() string { return "" } + +// Apply renames: +// "foo" to "fuz" +// "bar" to "baz" +// And it also drops measurements named "dropme" +func (f *TestProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + out := make([]telegraf.Metric, 0) + for _, m := range in { + switch m.Name() { + case "foo": + out = append(out, testutil.TestMetric(1, "fuz")) + case "bar": + out = append(out, testutil.TestMetric(1, "baz")) + case "dropme": + // drop the metric! + default: + out = append(out, m) + } + } + return out +} + +func NewTestRunningProcessor() *RunningProcessor { + out := &RunningProcessor{ + Name: "test", + Processor: &TestProcessor{}, + Config: &ProcessorConfig{Filter: Filter{}}, + } + return out +} + +func TestRunningProcessor(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + testutil.TestMetric(1, "baz"), + } + + expectedNames := []string{ + "fuz", + "baz", + "baz", + } + rfp := NewTestRunningProcessor() + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + filteredMetrics[2].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} + +func TestRunningProcessor_WithNameDrop(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + testutil.TestMetric(1, "baz"), + } + + expectedNames := []string{ + "foo", + "baz", + "baz", + } + rfp := NewTestRunningProcessor() + + rfp.Config.Filter.NameDrop = []string{"foo"} + assert.NoError(t, rfp.Config.Filter.Compile()) + + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + filteredMetrics[2].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} + +func TestRunningProcessor_DroppedMetric(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "dropme"), + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + } + + expectedNames := []string{ + "fuz", + "baz", + } + rfp := NewTestRunningProcessor() + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index a42d6fc7e6ff2..2bca79a067b01 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -132,7 +132,6 @@ func TestRunningOutput_PassFilter(t *testing.T) { func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagInclude: []string{"nothing*"}, }, } @@ -154,7 +153,6 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { func TestRunningOutput_TagExcludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagExclude: []string{"tag*"}, }, } @@ -176,7 +174,6 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) { func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagExclude: []string{"nothing*"}, }, } @@ -198,7 +195,6 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { func TestRunningOutput_TagIncludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagInclude: []string{"tag*"}, }, } diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go new file mode 100644 index 0000000000000..a4d92c3f2f4c6 --- /dev/null +++ b/plugins/aggregators/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/aggregators/min" +) diff --git a/plugins/aggregators/min/min.go b/plugins/aggregators/min/min.go new file mode 100644 index 0000000000000..ed6820ec6fbd1 --- /dev/null +++ b/plugins/aggregators/min/min.go @@ -0,0 +1,35 @@ +package min + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +type Min struct { +} + +var sampleConfig = ` +` + +func (m *Min) SampleConfig() string { + return sampleConfig +} + +func (m *Min) Description() string { + return "Aggregate the minimum value of each numerical field." +} + +func (m *Min) Apply(in telegraf.Metric) { +} + +func (m *Min) Start(acc telegraf.Accumulator) { +} + +func (m *Min) Stop() { +} + +func init() { + aggregators.Add("min", func() telegraf.Aggregator { + return &Min{} + }) +} diff --git a/plugins/aggregators/registry.go b/plugins/aggregators/registry.go new file mode 100644 index 0000000000000..77a9c9a643485 --- /dev/null +++ b/plugins/aggregators/registry.go @@ -0,0 +1,11 @@ +package aggregators + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Aggregator + +var Aggregators = map[string]Creator{} + +func Add(name string, creator Creator) { + Aggregators[name] = creator +} diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go new file mode 100644 index 0000000000000..462298f6bbbd1 --- /dev/null +++ b/plugins/processors/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/processors/printer" +) diff --git a/plugins/processors/printer/printer.go b/plugins/processors/printer/printer.go new file mode 100644 index 0000000000000..a65a104e618f9 --- /dev/null +++ b/plugins/processors/printer/printer.go @@ -0,0 +1,35 @@ +package printer + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + fmt.Println(metric.String()) + } + return in +} + +func init() { + processors.Add("printer", func() telegraf.Processor { + return &Printer{} + }) +} diff --git a/plugins/processors/registry.go b/plugins/processors/registry.go new file mode 100644 index 0000000000000..592c688f3dc6b --- /dev/null +++ b/plugins/processors/registry.go @@ -0,0 +1,11 @@ +package processors + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Processor + +var Processors = map[string]Creator{} + +func Add(name string, creator Creator) { + Processors[name] = creator +} diff --git a/processor.go b/processor.go new file mode 100644 index 0000000000000..f2b5133a5bcf4 --- /dev/null +++ b/processor.go @@ -0,0 +1,12 @@ +package telegraf + +type Processor interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the filter to the given metric + Apply(in ...Metric) []Metric +}