diff --git a/agent/agent.go b/agent/agent.go index d86037e79edeb..46169a0769267 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -270,12 +270,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(copyMetric(m)) + case metric := <-metricC: + mS := []telegraf.Metric{metric} + for _, filter := range a.Config.Filters { + mS = filter.FilterPlugin.Apply(mS...) + } + for _, m := range mS { + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(m) + } else { + o.AddMetric(copyMetric(m)) + } } } } diff --git a/filter.go b/filter.go new file mode 100644 index 0000000000000..83e5e2542a367 --- /dev/null +++ b/filter.go @@ -0,0 +1,12 @@ +package telegraf + +type FilterPlugin interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the filter to the given metric + Apply(in ...Metric) []Metric +} diff --git a/internal/config/config.go b/internal/config/config.go index 30e62789023ec..263107a1de927 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -50,6 +51,7 @@ type Config struct { Agent *AgentConfig Inputs []*models.RunningInput Outputs []*models.RunningOutput + Filters []*models.RunningFilterPlugin } func NewConfig() *Config { @@ -64,6 +66,7 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), + Filters: make([]*models.RunningFilterPlugin, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -532,6 +535,24 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filter": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + if err = c.addFilterPlugin(pluginName, pluginSubTable); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addFilterPlugin(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -572,6 +593,32 @@ func parseFile(fpath string) (*ast.Table, error) { return toml.Parse(contents) } +func (c *Config) addFilterPlugin(name string, table *ast.Table) error { + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + filterConfig, err := buildFilterPlugin(name, table) + if err != nil { + return err + } + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + rf := &models.RunningFilterPlugin{ + Name: name, + FilterPlugin: filter, + Config: filterConfig, + } + + c.Filters = append(c.Filters, rf) + return nil +} + func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil @@ -652,6 +699,25 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } +// buildFilterPlugin TODO doc +func buildFilterPlugin(name string, tbl *ast.Table) (*models.FilterPluginConfig, error) { + conf := &models.FilterPluginConfig{Name: name} + unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop", + "tagexclude", "taginclude"} + for _, field := range unsupportedFields { + if _, ok := tbl.Fields[field]; ok { + // TODO raise error because field is not supported + } + } + + var err error + conf.Filter, err = buildFilter(tbl) + if err != nil { + return conf, err + } + return conf, nil +} + // buildFilter builds a Filter // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // be inserted into the models.OutputConfig/models.InputConfig diff --git a/internal/models/running_filter.go b/internal/models/running_filter.go new file mode 100644 index 0000000000000..67397d2ebfdc6 --- /dev/null +++ b/internal/models/running_filter.go @@ -0,0 +1,17 @@ +package models + +import ( + "github.com/influxdata/telegraf" +) + +type RunningFilterPlugin struct { + Name string + FilterPlugin telegraf.FilterPlugin + Config *FilterPluginConfig +} + +// FilterConfig containing a name and filter +type FilterPluginConfig struct { + Name string + Filter Filter +} diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 0000000000000..a3beb17d4f638 --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,3 @@ +package all + +import () diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 0000000000000..53f0174ddc603 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.FilterPlugin + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}