Skip to content

Commit

Permalink
feat: Migrate graphite parser to new style (#11405)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored and MyaLongmire committed Jul 6, 2022
1 parent bcc3de8 commit 003a45d
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 235 deletions.
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := input.Parser.(*models.RunningParser); ok {
require.NoError(t, p.Init())
actual = append(actual, p.Parser)
} else {
actual = append(actual, input.Parser)
Expand Down Expand Up @@ -614,6 +615,7 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := input.Parser.(*models.RunningParser); ok {
require.NoError(t, p.Init())
actual = append(actual, p.Parser)
} else {
actual = append(actual, input.Parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Shopify/sarama"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"

Expand Down Expand Up @@ -115,9 +116,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
k.acc = &acc
defer close(k.done)

var err error
k.parser, err = parsers.NewGraphiteParser("_", []string{}, nil)
require.NoError(t, err)
p := graphite.Parser{Separator: "_", Templates: []string{}}
require.NoError(t, p.Init())
k.parser = &p
go k.receiver()
in <- saramaMsg(testMsgGraphite)
acc.Wait(1)
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type Statsd struct {
// Max duration for each metric to stay cached without being updated.
MaxTTL config.Duration `toml:"max_ttl"`

graphiteParser *graphite.GraphiteParser
graphiteParser *graphite.Parser

acc telegraf.Accumulator

Expand Down Expand Up @@ -713,7 +713,8 @@ func (s *Statsd) parseName(bucket string) (name string, field string, tags map[s
var err error

if p == nil || s.graphiteParser.Separator != s.MetricSeparator {
p, err = graphite.NewGraphiteParser(s.MetricSeparator, s.Templates, nil)
p = &graphite.Parser{Separator: s.MetricSeparator, Templates: s.Templates}
err = p.Init()
s.graphiteParser = p
}

Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/tcp_listener/tcp_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -293,9 +294,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)

var err error
listener.parser, err = parsers.NewGraphiteParser("_", []string{}, nil)
require.NoError(t, err)
p := graphite.Parser{Separator: "_", Templates: []string{}}
require.NoError(t, p.Init())
listener.parser = &p
listener.wg.Add(1)
go listener.tcpParser()

Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/collectd"
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
_ "github.com/influxdata/telegraf/plugins/parsers/graphite"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
_ "github.com/influxdata/telegraf/plugins/parsers/value"
Expand Down
67 changes: 37 additions & 30 deletions plugins/parsers/graphite/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

// Minimum and maximum supported dates for timestamps.
Expand All @@ -20,45 +21,34 @@ var (
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
)

type GraphiteParser struct {
Separator string
Templates []string
DefaultTags map[string]string
templateEngine *templating.Engine
}
type Parser struct {
Separator string `toml:"separator"`
Templates []string `toml:"templates"`
DefaultTags map[string]string ` toml:"-"`

func (p *GraphiteParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
templateEngine *templating.Engine
}

func NewGraphiteParser(
separator string,
templates []string,
defaultTags map[string]string,
) (*GraphiteParser, error) {
var err error

if separator == "" {
separator = DefaultSeparator
}
p := &GraphiteParser{
Separator: separator,
Templates: templates,
func (p *Parser) Init() error {
// Set defaults
if p.Separator == "" {
p.Separator = DefaultSeparator
}

if defaultTags != nil {
p.DefaultTags = defaultTags
defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
if err != nil {
return fmt.Errorf("creating template failed: %w", err)
}
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
p.templateEngine, err = templating.NewEngine(p.Separator, defaultTemplate, p.Templates)

p.templateEngine, err = templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
if err != nil {
return p, fmt.Errorf("exec input parser config is error: %s ", err.Error())
return fmt.Errorf("creating template engine failed: %w ", err)
}
return p, nil

return nil
}

func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline
if len(buf) != 0 && buf[0] == '\n' {
buf = buf[1:]
Expand Down Expand Up @@ -95,7 +85,7 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}

// ParseLine performs Graphite parsing of a single line.
func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 2 && len(fields) != 3 {
Expand Down Expand Up @@ -178,7 +168,8 @@ func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {

// ApplyTemplate extracts the template fields from the given line and
// returns the measurement name and tags.
func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) {
//nolint:revive // This function should be eliminated anyway
func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) {
// Break line into fields (name, value, timestamp), only name is used
fields := strings.Fields(line)
if len(fields) == 0 {
Expand All @@ -196,3 +187,19 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,

return name, tags, field, err
}

func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func init() {
parsers.Add("graphite", func(_ string) telegraf.Parser { return &Parser{} })
}

func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.Templates = append(p.Templates, config.Templates...)
p.Separator = config.Separator
p.DefaultTags = config.DefaultTags

return p.Init()
}
Loading

0 comments on commit 003a45d

Please sign in to comment.