From 739e72a413b3c78d1ecd1ecf6861cc5822898cc6 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 8 Sep 2016 15:22:10 +0100 Subject: [PATCH] Support Filter Plugins closes #1726 --- agent/agent.go | 18 +++++--- cmd/telegraf/telegraf.go | 1 + filter.go | 12 ++++++ internal/config/config.go | 66 ++++++++++++++++++++++++++++++ internal/models/running_filter.go | 37 +++++++++++++++++ plugins/filters/all/all.go | 5 +++ plugins/filters/printer/printer.go | 35 ++++++++++++++++ plugins/filters/registry.go | 11 +++++ 8 files changed, 179 insertions(+), 6 deletions(-) create mode 100644 filter.go create mode 100644 internal/models/running_filter.go create mode 100644 plugins/filters/all/all.go create mode 100644 plugins/filters/printer/printer.go create mode 100644 plugins/filters/registry.go diff --git a/agent/agent.go b/agent/agent.go index d86037e79edeb..e28a5a440e712 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -270,12 +270,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(copyMetric(m)) + case metric := <-metricC: + mS := []telegraf.Metric{metric} + for _, filter := range a.Config.Filters { + mS = filter.Apply(mS...) + } + for _, m := range mS { + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(m) + } else { + o.AddMetric(copyMetric(m)) + } } } } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index f19b127a8538b..671b218065aa1 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/filters/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" diff --git a/filter.go b/filter.go new file mode 100644 index 0000000000000..83e5e2542a367 --- /dev/null +++ b/filter.go @@ -0,0 +1,12 @@ +package telegraf + +type FilterPlugin interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the filter to the given metric + Apply(in ...Metric) []Metric +} diff --git a/internal/config/config.go b/internal/config/config.go index 30e62789023ec..a60eef6706554 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -50,6 +51,7 @@ type Config struct { Agent *AgentConfig Inputs []*models.RunningInput Outputs []*models.RunningOutput + Filters []*models.RunningFilterPlugin } func NewConfig() *Config { @@ -64,6 +66,7 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), + Filters: make([]*models.RunningFilterPlugin, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -532,6 +535,24 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filters": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + if err = c.addFilterPlugin(pluginName, pluginSubTable); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addFilterPlugin(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -572,6 +593,32 @@ func parseFile(fpath string) (*ast.Table, error) { return toml.Parse(contents) } +func (c *Config) addFilterPlugin(name string, table *ast.Table) error { + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + filterConfig, err := buildFilterPlugin(name, table) + if err != nil { + return err + } + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + rf := &models.RunningFilterPlugin{ + Name: name, + FilterPlugin: filter, + Config: filterConfig, + } + + c.Filters = append(c.Filters, rf) + return nil +} + func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil @@ -652,6 +699,25 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } +// buildFilterPlugin TODO doc +func buildFilterPlugin(name string, tbl *ast.Table) (*models.FilterPluginConfig, error) { + conf := &models.FilterPluginConfig{Name: name} + unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop", + "tagexclude", "taginclude"} + for _, field := range unsupportedFields { + if _, ok := tbl.Fields[field]; ok { + // TODO raise error because field is not supported + } + } + + var err error + conf.Filter, err = buildFilter(tbl) + if err != nil { + return conf, err + } + return conf, nil +} + // buildFilter builds a Filter // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // be inserted into the models.OutputConfig/models.InputConfig diff --git a/internal/models/running_filter.go b/internal/models/running_filter.go new file mode 100644 index 0000000000000..f0d57615a4ae7 --- /dev/null +++ b/internal/models/running_filter.go @@ -0,0 +1,37 @@ +package models + +import ( + "github.com/influxdata/telegraf" +) + +type RunningFilterPlugin struct { + Name string + FilterPlugin telegraf.FilterPlugin + Config *FilterPluginConfig +} + +// FilterConfig containing a name and filter +type FilterPluginConfig struct { + Name string + Filter Filter +} + +func (rf *RunningFilterPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric { + ret := []telegraf.Metric{} + + for _, metric := range in { + if rf.Config.Filter.IsActive() { + // check if the filter should be applied to this metric + if ok := rf.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok { + // this means filter should not be applied + ret = append(ret, metric) + continue + } + } + // This metric should pass through the filter, so call the filter Apply + // function and append results to the output slice. + ret = append(ret, rf.FilterPlugin.Apply(metric)...) + } + + return ret +} diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 0000000000000..aaeb9f2de5ab0 --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/filters/printer" +) diff --git a/plugins/filters/printer/printer.go b/plugins/filters/printer/printer.go new file mode 100644 index 0000000000000..b4adfb5adc46f --- /dev/null +++ b/plugins/filters/printer/printer.go @@ -0,0 +1,35 @@ +package printer + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/filters" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + fmt.Println(metric.String()) + } + return in +} + +func init() { + filters.Add("printer", func() telegraf.FilterPlugin { + return &Printer{} + }) +} diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 0000000000000..53f0174ddc603 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.FilterPlugin + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}