From 523bb56b8fa352ed6d802391cf18f6bc927b914d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 5 Feb 2016 17:36:35 -0700 Subject: [PATCH] Implementing generic parser plugins and documentation This constitutes a large change in how we will parse different data formats going forward (for the plugins that support it) This is working off @henrypfhu's changes. --- DATA_FORMATS.md | 67 +++++++++++ Godeps | 8 +- Godeps_windows | 27 ++--- cmd/telegraf/telegraf.go | 1 + internal/encoding/encoder.go | 31 ----- internal/encoding/json/parser.go | 68 ----------- internal/internal.go | 42 ------- plugins/inputs/elasticsearch/elasticsearch.go | 4 +- plugins/inputs/exec/exec.go | 81 +++++-------- plugins/inputs/httpjson/httpjson.go | 40 +++---- plugins/inputs/statsd/statsd.go | 12 +- .../parsers}/graphite/config.go | 0 .../parsers}/graphite/errors.go | 0 .../parsers}/graphite/parser.go | 66 ++++++----- .../parsers}/influx/parser.go | 18 +-- plugins/parsers/json/parser.go | 109 ++++++++++++++++++ plugins/parsers/registry.go | 38 ++++++ 17 files changed, 314 insertions(+), 298 deletions(-) create mode 100644 DATA_FORMATS.md delete mode 100644 internal/encoding/encoder.go delete mode 100644 internal/encoding/json/parser.go rename {internal/encoding => plugins/parsers}/graphite/config.go (100%) rename {internal/encoding => plugins/parsers}/graphite/errors.go (100%) rename {internal/encoding => plugins/parsers}/graphite/parser.go (91%) rename {internal/encoding => plugins/parsers}/influx/parser.go (56%) create mode 100644 plugins/parsers/json/parser.go create mode 100644 plugins/parsers/registry.go diff --git a/DATA_FORMATS.md b/DATA_FORMATS.md new file mode 100644 index 0000000000000..9c50a0ebe734a --- /dev/null +++ b/DATA_FORMATS.md @@ -0,0 +1,67 @@ +# Telegraf Data Formats + +There are many Telegraf plugins that are able to parse generic text data from +a variety of sources, this includes parsing the stdout of executed scripts +(`exec`) and parsing messages received from message brokers (`kafka_consumer`). + +Up until now, these plugins were statically configured to parse just a single +data format. `exec` mostly only supported parsing JSON, and `kafka_consumer` only +supported data in InfluxDB line-protocol. + +But now we are normalizing the parsing of various data formats across all +plugins that can support it. You will be able to tell a plugin that supports +different data formats by the presence of a `data_format` config option, for +example, in the exec plugin: + +``` +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "json" +``` + +## Influx line-protocol Options: + +None! + +## JSON Options: + +``` +### List of tag names to extract from top-level of JSON server response +tag_keys = [ + "my_tag_1", + "my_tag_2" +] +``` + +## Graphite Options: + +``` +### Below configuration will be used for data_format = "graphite", can be ignored for other data_format +### If matching multiple measurement files, this string will be used to join the matched values. +separator = "." + +### Each template line requires a template pattern. It can have an optional +### filter before the template and separated by spaces. It can also have optional extra +### tags following the template. Multiple tags should be separated by commas and no spaces +### similar to the line protocol format. The can be only one default template. +### Templates support below format: +### 1. filter + template +### 2. filter + template + extra tag +### 3. filter + template with field key +### 4. default template +templates = [ + "*.app env.service.resource.measurement", + "stats.* .host.measurement* region=us-west,agent=sensu", + "stats2.* .host.measurement.field", + "measurement*" +] +``` diff --git a/Godeps b/Godeps index 7e43ed61084f2..474356d20d7a6 100644 --- a/Godeps +++ b/Godeps @@ -2,10 +2,8 @@ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9ad github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 -github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d -github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d @@ -14,16 +12,12 @@ github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 -github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/go-msgpack fa3f63826f7c23912c15263591e65d54d080b458 -github.com/hashicorp/raft 057b893fd996696719e98b6c44649ea14968c811 -github.com/hashicorp/raft-boltdb d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 @@ -56,4 +50,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 -gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 \ No newline at end of file +gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 diff --git a/Godeps_windows b/Godeps_windows index 829e2cb35b818..ce6663260975b 100644 --- a/Godeps_windows +++ b/Godeps_windows @@ -1,34 +1,28 @@ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034 -github.com/Shopify/sarama b1da1753dedcf77d053613b7eae907b98a2ddad5 +github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 -github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 -github.com/aws/aws-sdk-go 2a34ea8812f32aae75b43400f9424a0559840659 +github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d -github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 -github.com/fsouza/go-dockerclient 02a8beb401b20e112cff3ea740545960b667eab1 +github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-ole/go-ole 50055884d646dd9434f16bbb5c9801749b9bafe4 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 -github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 -github.com/golang/protobuf 45bba206dd5270d96bac4942dcfe515726613249 -github.com/golang/snappy 1963d058044b19e16595f80d5050fa54e2070438 +github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 +github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/go-msgpack fa3f63826f7c23912c15263591e65d54d080b458 -github.com/hashicorp/raft 057b893fd996696719e98b6c44649ea14968c811 -github.com/hashicorp/raft-boltdb d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 -github.com/influxdata/influxdb 60df13fb566d07ff2cdd07aa23a4796a02b0df3c -github.com/influxdb/influxdb 60df13fb566d07ff2cdd07aa23a4796a02b0df3c +github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 +github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f @@ -45,7 +39,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common 14ca1097bbe21584194c15e391a9dab95ad42a59 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f -github.com/shirou/gopsutil 9d8191d6a6e17dcf43b10a20084a11e8c1aa92e6 +github.com/shirou/gopsutil 85bf0974ed06e4e668595ae2b4de02e772a2819b github.com/shirou/w32 ada3ba68f000aa1b58580e45c9d308fe0b7fc5c5 github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 @@ -54,10 +48,9 @@ github.com/stretchr/testify f390dcf405f7b83c997eac1b06768bb9f44dec18 github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 -golang.org/x/crypto 1f22c0103821b9390939b6776727195525381532 golang.org/x/net 04b9de9b512f58addf28c9853d50ebef61c3953e -golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4 +golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 -gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 \ No newline at end of file +gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 78687d2867f56..a65c5607c7162 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" ) diff --git a/internal/encoding/encoder.go b/internal/encoding/encoder.go deleted file mode 100644 index 129906ce5bda1..0000000000000 --- a/internal/encoding/encoder.go +++ /dev/null @@ -1,31 +0,0 @@ -package encoding - -import ( - "fmt" - - "github.com/influxdata/telegraf" -) - -type Parser interface { - InitConfig(configs map[string]interface{}) error - Parse(buf []byte) ([]telegraf.Metric, error) - ParseLine(line string) (telegraf.Metric, error) -} - -type Creator func() Parser - -var Parsers = map[string]Creator{} - -func Add(name string, creator Creator) { - Parsers[name] = creator -} - -func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) { - creator := Parsers[dataFormat] - if creator == nil { - return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat) - } - parser = creator() - err = parser.InitConfig(configs) - return parser, err -} diff --git a/internal/encoding/json/parser.go b/internal/encoding/json/parser.go deleted file mode 100644 index 69a91d14d98f4..0000000000000 --- a/internal/encoding/json/parser.go +++ /dev/null @@ -1,68 +0,0 @@ -package json - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/influxdata/telegraf" - - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/encoding" -) - -type JsonParser struct { -} - -func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) { - - metrics := make([]telegraf.Metric, 0) - - var jsonOut interface{} - err := json.Unmarshal(buf, &jsonOut) - if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) - return nil, err - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - return nil, err - } - - metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC()) - - if err != nil { - return nil, err - } - return append(metrics, metric), nil -} - -func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line + "\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) - } - - return metrics[0], nil -} - -func NewParser() *JsonParser { - return &JsonParser{} -} - -func (p *JsonParser) InitConfig(configs map[string]interface{}) error { - return nil -} - -func init() { - encoding.Add("json", func() encoding.Parser { - return NewParser() - }) -} diff --git a/internal/internal.go b/internal/internal.go index 27c9d664feffa..82758e5e863e9 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -9,7 +9,6 @@ import ( "fmt" "io/ioutil" "os" - "strconv" "strings" "time" ) @@ -35,47 +34,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error { var NotImplementedError = errors.New("not implemented yet") -type JSONFlattener struct { - Fields map[string]interface{} -} - -// FlattenJSON flattens nested maps/interfaces into a fields map -func (f *JSONFlattener) FlattenJSON( - fieldname string, - v interface{}, -) error { - if f.Fields == nil { - f.Fields = make(map[string]interface{}) - } - fieldname = strings.Trim(fieldname, "_") - switch t := v.(type) { - case map[string]interface{}: - for k, v := range t { - err := f.FlattenJSON(fieldname+"_"+k+"_", v) - if err != nil { - return err - } - } - case []interface{}: - for i, v := range t { - k := strconv.Itoa(i) - err := f.FlattenJSON(fieldname+"_"+k+"_", v) - if err != nil { - return nil - } - } - case float64: - f.Fields[fieldname] = t - case bool, string, nil: - // ignored types - return nil - default: - return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)", - t, t, fieldname) - } - return nil -} - // ReadLines reads contents from a file and splits them by new lines. // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). func ReadLines(filename string) ([]string, error) { diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 2dbd6f3570cf1..f7341686e1b9d 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -10,8 +10,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) const statsPath = "/_nodes/stats" @@ -168,7 +168,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er now := time.Now() for p, s := range stats { - f := internal.JSONFlattener{} + f := jsonparser.JSONFlattener{} err := f.FlattenJSON("", s) if err != nil { return err diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index a53a6f32d5a88..79bc2681a493b 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -9,47 +9,22 @@ import ( "github.com/gonuts/go-shellquote" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/encoding" "github.com/influxdata/telegraf/plugins/inputs" - - _ "github.com/influxdata/telegraf/internal/encoding/graphite" - _ "github.com/influxdata/telegraf/internal/encoding/influx" - _ "github.com/influxdata/telegraf/internal/encoding/json" + "github.com/influxdata/telegraf/plugins/parsers" ) const sampleConfig = ` - # Shell/commands array - # compatible with old version - # we can still use the old command configuration - # command = "/usr/bin/mycollector --foo=bar" - commands = ["/tmp/test.sh","/tmp/test2.sh"] - - # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) - # NOTE json only reads numerical measurements, strings and booleans are ignored. - data_format = "json" + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] - # measurement name suffix (for separating different commands) + ### measurement name suffix (for separating different commands) name_suffix = "_mycollector" - ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format - ### If matching multiple measurement files, this string will be used to join the matched values. - separator = "." - - ### Each template line requires a template pattern. It can have an optional - ### filter before the template and separated by spaces. It can also have optional extra - ### tags following the template. Multiple tags should be separated by commas and no spaces - ### similar to the line protocol format. The can be only one default template. - ### Templates support below format: - ### 1. filter + template - ### 2. filter + template + extra tag - ### 3. filter + template with field key - ### 4. default template - templates = [ - "*.app env.service.resource.measurement", - "stats.* .host.measurement* region=us-west,agent=sensu", - "stats2.* .host.measurement.field", - "measurement*" - ] + ### Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "json" ` type Exec struct { @@ -57,12 +32,13 @@ type Exec struct { Command string DataFormat string + // Data Format Arguments: Separator string Templates []string + TagKeys []string - encodingParser encoding.Parser - - initedConfig bool + // data parser + parser parsers.Parser wg sync.WaitGroup sync.Mutex @@ -108,7 +84,7 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { return } - metrics, err := e.encodingParser.Parse(out) + metrics, err := e.parser.Parse(out) if err != nil { e.errc <- err } else { @@ -126,20 +102,21 @@ func (e *Exec) initConfig() error { e.Commands = []string{e.Command} } - if e.DataFormat == "" { - e.DataFormat = "json" - } - var err error - - configs := make(map[string]interface{}) - configs["Separator"] = e.Separator - configs["Templates"] = e.Templates - - e.encodingParser, err = encoding.NewParser(e.DataFormat, configs) + switch e.DataFormat { + case "", "json": + e.parser, err = parsers.NewJSONParser("exec", + e.TagKeys, nil) + case "influx": + e.parser, err = parsers.NewInfluxParser() + case "graphite": + e.parser, err = parsers.NewGraphiteParser(e.Separator, e.Templates) + default: + err = fmt.Errorf("Invalid data format: %s", e.DataFormat) + } if err != nil { - return fmt.Errorf("exec configuration is error: %s ", err.Error()) + return fmt.Errorf("exec configuration error: %s ", err.Error()) } return nil @@ -150,16 +127,14 @@ func (e *Exec) SampleConfig() string { } func (e *Exec) Description() string { - return "Read metrics from one or more commands that can output JSON, influx or graphite line protocol to stdout" + return "Read metrics from one or more commands that can output to stdout" } func (e *Exec) Gather(acc telegraf.Accumulator) error { - - if !e.initedConfig { + if e.parser == nil { if err := e.initConfig(); err != nil { return err } - e.initedConfig = true } e.Lock() diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 3070e6338aeb8..d3955c31a22cd 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -1,7 +1,6 @@ package httpjson import ( - "encoding/json" "errors" "fmt" "io/ioutil" @@ -12,8 +11,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" ) type HttpJson struct { @@ -137,39 +136,34 @@ func (h *HttpJson) gatherServer( return err } - var jsonOut map[string]interface{} - if err = json.Unmarshal([]byte(resp), &jsonOut); err != nil { - return errors.New("Error decoding JSON response") + var msrmnt_name string + if h.Name == "" { + msrmnt_name = "httpjson" + } else { + msrmnt_name = "httpjson_" + h.Name } - tags := map[string]string{ "server": serverURL, } - for _, tag := range h.TagKeys { - switch v := jsonOut[tag].(type) { - case string: - tags[tag] = v - } - delete(jsonOut, tag) + parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) + if err != nil { + return err } - if responseTime >= 0 { - jsonOut["response_time"] = responseTime - } - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) + metrics, err := parser.Parse([]byte(resp)) if err != nil { return err } - var msrmnt_name string - if h.Name == "" { - msrmnt_name = "httpjson" - } else { - msrmnt_name = "httpjson_" + h.Name + for _, metric := range metrics { + fields := make(map[string]interface{}) + for k, v := range metric.Fields() { + fields[k] = v + } + fields["response_time"] = responseTime + acc.AddFields(metric.Name(), fields, metric.Tags()) } - acc.AddFields(msrmnt_name, f.Fields, tags) return nil } diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 2b80442d66435..ddaf4760dcaec 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -11,7 +11,7 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/services/graphite" + "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -418,18 +418,14 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { } } - o := graphite.Options{ - Separator: "_", - Templates: s.Templates, - DefaultTags: tags, - } - var field string name := bucketparts[0] - p, err := graphite.NewParserWithOptions(o) + p, err := graphite.NewGraphiteParser("_", s.Templates) if err == nil { + p.DefaultTags = tags name, tags, field, _ = p.ApplyTemplate(name) } + if s.ConvertNames { name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, "-", "__", -1) diff --git a/internal/encoding/graphite/config.go b/plugins/parsers/graphite/config.go similarity index 100% rename from internal/encoding/graphite/config.go rename to plugins/parsers/graphite/config.go diff --git a/internal/encoding/graphite/errors.go b/plugins/parsers/graphite/errors.go similarity index 100% rename from internal/encoding/graphite/errors.go rename to plugins/parsers/graphite/errors.go diff --git a/internal/encoding/graphite/parser.go b/plugins/parsers/graphite/parser.go similarity index 91% rename from internal/encoding/graphite/parser.go rename to plugins/parsers/graphite/parser.go index f43c76b871a39..1588a7114b2db 100644 --- a/internal/encoding/graphite/parser.go +++ b/plugins/parsers/graphite/parser.go @@ -1,6 +1,7 @@ package graphite import ( + "bufio" "bytes" "fmt" "io" @@ -10,11 +11,8 @@ import ( "strings" "time" - "bufio" - "github.com/influxdata/influxdb/models" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/encoding" ) // Minimum and maximum supported dates for timestamps. @@ -23,35 +21,35 @@ var ( MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC) ) -// Options are configurable values that can be provided to a Parser -type Options struct { - Separator string - Templates []string -} - // Parser encapsulates a Graphite Parser. type GraphiteParser struct { - matcher *matcher -} + Separator string + Templates []string + DefaultTags map[string]string -func NewParser() *GraphiteParser { - return &GraphiteParser{} + matcher *matcher } -func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error { - +func NewGraphiteParser( + separator string, + templates []string, +) (*GraphiteParser, error) { var err error - options := Options{ - Templates: configs["Templates"].([]string), - Separator: configs["Separator"].(string)} + + if separator == "" { + separator = DefaultSeparator + } + p := &GraphiteParser{ + Separator: separator, + Templates: templates, + } matcher := newMatcher() p.matcher = matcher - defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator) + defaultTemplate, _ := NewTemplate("measurement*", nil, p.Separator) matcher.AddDefaultTemplate(defaultTemplate) - for _, pattern := range options.Templates { - + for _, pattern := range p.Templates { template := pattern filter := "" // Format is [filter]