From 9e7f8600864a859efaeeebd58423f06ff11508fd Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 29 Jun 2022 23:31:22 +0200 Subject: [PATCH] feat: Migrate logfmt parser to new style (#11366) --- config/config_test.go | 6 ---- plugins/parsers/all/all.go | 1 + plugins/parsers/logfmt/parser.go | 46 +++++++++++++++------------ plugins/parsers/logfmt/parser_test.go | 39 ++++++++--------------- plugins/parsers/registry.go | 10 ------ 5 files changed, 40 insertions(+), 62 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 9ee7aa21c903d..b8a7dfc3b6bdd 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -415,9 +415,6 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) { }, mask: []string{"TimeFunc"}, }, - "logfmt": { - mask: []string{"Now"}, - }, "xpath_protobuf": { param: map[string]interface{}{ "ProtobufMessageDef": "testdata/addressbook.proto", @@ -555,9 +552,6 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) { }, mask: []string{"TimeFunc"}, }, - "logfmt": { - mask: []string{"Now"}, - }, "xpath_protobuf": { param: map[string]interface{}{ "ProtobufMessageDef": "testdata/addressbook.proto", diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go index b953730713bc3..cd9cb6c6f4775 100644 --- a/plugins/parsers/all/all.go +++ b/plugins/parsers/all/all.go @@ -8,6 +8,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/graphite" _ "github.com/influxdata/telegraf/plugins/parsers/json" _ "github.com/influxdata/telegraf/plugins/parsers/json_v2" + _ "github.com/influxdata/telegraf/plugins/parsers/logfmt" _ "github.com/influxdata/telegraf/plugins/parsers/value" _ "github.com/influxdata/telegraf/plugins/parsers/wavefront" _ "github.com/influxdata/telegraf/plugins/parsers/xpath" diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index f612c8e2e72a9..2cd518545610a 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -2,6 +2,7 @@ package logfmt import ( "bytes" + "errors" "fmt" "strconv" "time" @@ -10,31 +11,18 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" ) -var ( - ErrNoMetric = fmt.Errorf("no metric in line") -) +var ErrNoMetric = errors.New("no metric in line") // Parser decodes logfmt formatted messages into metrics. type Parser struct { - TagKeys []string `toml:"logfmt_tag_keys"` - - MetricName string - DefaultTags map[string]string - Now func() time.Time + TagKeys []string `toml:"logfmt_tag_keys"` + DefaultTags map[string]string `toml:"-"` - tagFilter filter.Filter -} - -// NewParser creates a parser. -func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser { - return &Parser{ - MetricName: metricName, - DefaultTags: defaultTags, - Now: time.Now, - TagKeys: tagKeys, - } + metricName string + tagFilter filter.Filter } // Parse converts a slice of bytes in logfmt format to metrics. @@ -76,7 +64,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { continue } - m := metric.New(p.MetricName, tags, fields, p.Now()) + m := metric.New(p.metricName, tags, fields, time.Now()) metrics = append(metrics, m) } @@ -126,3 +114,21 @@ func (p *Parser) Init() error { return nil } + +func init() { + // Register parser + parsers.Add("logfmt", + func(defaultMetricName string) telegraf.Parser { + return &Parser{metricName: defaultMetricName} + }, + ) +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (p *Parser) InitFromConfig(config *parsers.Config) error { + p.metricName = config.MetricName + p.DefaultTags = config.DefaultTags + p.TagKeys = append(p.TagKeys, config.LogFmtTagKeys...) + + return p.Init() +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index a2ee8178f6072..169b238bc68a3 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -6,27 +6,24 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestParse(t *testing.T) { tests := []struct { name string measurement string - now func() time.Time bytes []byte want []telegraf.Metric wantErr bool }{ { name: "no bytes returns no metrics", - now: func() time.Time { return time.Unix(0, 0) }, want: []telegraf.Metric{}, }, { name: "test without trailing end", bytes: []byte("foo=\"bar\""), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -42,7 +39,6 @@ func TestParse(t *testing.T) { { name: "test with trailing end", bytes: []byte("foo=\"bar\"\n"), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -58,7 +54,6 @@ func TestParse(t *testing.T) { { name: "logfmt parser returns all the fields", bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -77,7 +72,6 @@ func TestParse(t *testing.T) { { name: "logfmt parser parses every line", bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -105,21 +99,18 @@ func TestParse(t *testing.T) { }, { name: "keys without = or values are ignored", - now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`i am no data.`), want: []telegraf.Metric{}, wantErr: false, }, { name: "keys without values are ignored", - now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`foo="" bar=`), want: []telegraf.Metric{}, wantErr: false, }, { name: "unterminated quote produces error", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", bytes: []byte(`bar=baz foo="bar`), want: []telegraf.Metric{}, @@ -127,7 +118,6 @@ func TestParse(t *testing.T) { }, { name: "malformed key", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", bytes: []byte(`"foo=" bar=baz`), want: []telegraf.Metric{}, @@ -137,8 +127,7 @@ func TestParse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { l := Parser{ - MetricName: tt.measurement, - Now: tt.now, + metricName: tt.measurement, } got, err := l.Parse(tt.bytes) if (err != nil) != tt.wantErr { @@ -146,7 +135,7 @@ func TestParse(t *testing.T) { return } - testutil.RequireMetricsEqual(t, tt.want, got) + testutil.RequireMetricsEqual(t, tt.want, got, testutil.IgnoreTime()) }) } } @@ -156,19 +145,16 @@ func TestParseLine(t *testing.T) { name string s string measurement string - now func() time.Time want telegraf.Metric wantErr bool }{ { name: "No Metric In line", - now: func() time.Time { return time.Unix(0, 0) }, want: nil, wantErr: true, }, { name: "Log parser fmt returns all fields", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, want: testutil.MustMetric( @@ -185,7 +171,6 @@ func TestParseLine(t *testing.T) { }, { name: "ParseLine only returns metrics from first string", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", want: testutil.MustMetric( @@ -204,14 +189,13 @@ func TestParseLine(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { l := Parser{ - MetricName: tt.measurement, - Now: tt.now, + metricName: tt.measurement, } got, err := l.ParseLine(tt.s) if (err != nil) != tt.wantErr { t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) } - testutil.RequireMetricEqual(t, tt.want, got) + testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime()) }) } } @@ -277,15 +261,18 @@ func TestTags(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := NewParser(tt.measurement, map[string]string{}, tt.tagKeys) - assert.NoError(t, l.Init()) + l := &Parser{ + metricName: tt.measurement, + DefaultTags: map[string]string{}, + TagKeys: tt.tagKeys, + } + require.NoError(t, l.Init()) got, err := l.ParseLine(tt.s) - if tt.wantErr { - assert.Error(t, err) + require.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) } testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime()) }) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 59603bdbdc984..3391288eebedf 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -8,7 +8,6 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" - "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/prometheus" "github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite" @@ -227,8 +226,6 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatternFiles, config.GrokTimezone, config.GrokUniqueTimestamp) - case "logfmt": - parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags, config.LogFmtTagKeys) case "prometheus": parser, err = NewPrometheusParser( config.DefaultTags, @@ -310,13 +307,6 @@ func NewDropwizardParser( return parser, err } -// NewLogFmtParser returns a logfmt parser with the default options. -func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys []string) (Parser, error) { - parser := logfmt.NewParser(metricName, defaultTags, tagKeys) - err := parser.Init() - return parser, err -} - func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) { return &prometheus.Parser{ DefaultTags: defaultTags,