Skip to content

Commit

Permalink
Support Filter Plugins
Browse files Browse the repository at this point in the history
closes #1726
  • Loading branch information
sparrc committed Sep 12, 2016
1 parent c3aa43a commit 739e72a
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 6 deletions.
18 changes: 12 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
case metric := <-metricC:
mS := []telegraf.Metric{metric}
for _, filter := range a.Config.Filters {
mS = filter.Apply(mS...)
}
for _, m := range mS {
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
_ "github.com/influxdata/telegraf/plugins/filters/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down
12 changes: 12 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package telegraf

type FilterPlugin interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

// Apply the filter to the given metric
Apply(in ...Metric) []Metric
}
66 changes: 66 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -50,6 +51,7 @@ type Config struct {
Agent *AgentConfig
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Filters []*models.RunningFilterPlugin
}

func NewConfig() *Config {
Expand All @@ -64,6 +66,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Filters: make([]*models.RunningFilterPlugin, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand Down Expand Up @@ -532,6 +535,24 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filters":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addFilterPlugin(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addFilterPlugin(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand Down Expand Up @@ -572,6 +593,32 @@ func parseFile(fpath string) (*ast.Table, error) {
return toml.Parse(contents)
}

func (c *Config) addFilterPlugin(name string, table *ast.Table) error {
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

filterConfig, err := buildFilterPlugin(name, table)
if err != nil {
return err
}

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

rf := &models.RunningFilterPlugin{
Name: name,
FilterPlugin: filter,
Config: filterConfig,
}

c.Filters = append(c.Filters, rf)
return nil
}

func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
Expand Down Expand Up @@ -652,6 +699,25 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return nil
}

// buildFilterPlugin TODO doc
func buildFilterPlugin(name string, tbl *ast.Table) (*models.FilterPluginConfig, error) {
conf := &models.FilterPluginConfig{Name: name}
unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop",
"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
}
}

var err error
conf.Filter, err = buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}

// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the models.OutputConfig/models.InputConfig
Expand Down
37 changes: 37 additions & 0 deletions internal/models/running_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package models

import (
"github.com/influxdata/telegraf"
)

type RunningFilterPlugin struct {
Name string
FilterPlugin telegraf.FilterPlugin
Config *FilterPluginConfig
}

// FilterConfig containing a name and filter
type FilterPluginConfig struct {
Name string
Filter Filter
}

func (rf *RunningFilterPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric {
ret := []telegraf.Metric{}

for _, metric := range in {
if rf.Config.Filter.IsActive() {
// check if the filter should be applied to this metric
if ok := rf.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok {
// this means filter should not be applied
ret = append(ret, metric)
continue
}
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rf.FilterPlugin.Apply(metric)...)
}

return ret
}
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/printer"
)
35 changes: 35 additions & 0 deletions plugins/filters/printer/printer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package printer

import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/filters"
)

type Printer struct {
}

var sampleConfig = `
`

func (p *Printer) SampleConfig() string {
return sampleConfig
}

func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
fmt.Println(metric.String())
}
return in
}

func init() {
filters.Add("printer", func() telegraf.FilterPlugin {
return &Printer{}
})
}
11 changes: 11 additions & 0 deletions plugins/filters/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package filters

import "github.com/influxdata/telegraf"

type Creator func() telegraf.FilterPlugin

var Filters = map[string]Creator{}

func Add(name string, creator Creator) {
Filters[name] = creator
}

0 comments on commit 739e72a

Please sign in to comment.