From 413189cf5fdb852112e4016b6916ca6aaaad4650 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Thu, 1 Aug 2024 14:21:27 +0800 Subject: [PATCH] feat(parser): add parser opentsdb_json --- plugins/parsers/all/opentsdbjson.go | 5 + plugins/parsers/opentsdb_json/README.md | 34 ++++++ plugins/parsers/opentsdb_json/parser.go | 103 +++++++++++++++++++ plugins/parsers/opentsdb_json/parser_test.go | 71 +++++++++++++ 4 files changed, 213 insertions(+) create mode 100644 plugins/parsers/all/opentsdbjson.go create mode 100644 plugins/parsers/opentsdb_json/README.md create mode 100644 plugins/parsers/opentsdb_json/parser.go create mode 100644 plugins/parsers/opentsdb_json/parser_test.go diff --git a/plugins/parsers/all/opentsdbjson.go b/plugins/parsers/all/opentsdbjson.go new file mode 100644 index 0000000000000..781b190d2f026 --- /dev/null +++ b/plugins/parsers/all/opentsdbjson.go @@ -0,0 +1,5 @@ +//go:build !custom || parsers || parsers.opentsdbtelnet + +package all + +import _ "github.com/influxdata/telegraf/plugins/parsers/opentsdbjson" // register plugin diff --git a/plugins/parsers/opentsdb_json/README.md b/plugins/parsers/opentsdb_json/README.md new file mode 100644 index 0000000000000..f965f9f18e842 --- /dev/null +++ b/plugins/parsers/opentsdb_json/README.md @@ -0,0 +1,34 @@ +# OpenTSDB JSON Style Parser Plugin + +## Configuration + +```toml +[[inputs.file]] + files = ["example"] + data_format = "opentsdb_json" +``` + +## Example + +```json +[ + { + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 18, + "tags": { + "host": "web01", + "dc": "lga" + } + }, + { + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 9, + "tags": { + "host": "web02", + "dc": "lga" + } + } +] +``` diff --git a/plugins/parsers/opentsdb_json/parser.go b/plugins/parsers/opentsdb_json/parser.go new file mode 100644 index 0000000000000..502df0a509e1a --- /dev/null +++ b/plugins/parsers/opentsdb_json/parser.go @@ -0,0 +1,103 @@ +package opentsdb + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" +) + +func init() { + parsers.Add("opentsdb_json", + func(_ string) telegraf.Parser { + return &Parser{} + }, + ) +} + +type point struct { + Metric string `json:"metric"` + Time int64 `json:"timestamp"` + Value float64 `json:"value"` + Tags map[string]string `json:"tags,omitempty"` +} + +type Parser struct { + DefaultTags map[string]string `toml:"-"` +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + var multi bool + switch buf[0] { + case '{': + case '[': + multi = true + default: + return nil, errors.New("expected JSON array or hash") + } + + points := make([]point, 1) + if dec := json.NewDecoder(bytes.NewReader(buf)); multi { + if err := dec.Decode(&points); err != nil { + return nil, errors.New("json array decode error for data format: opentsdb") + } + } else { + if err := dec.Decode(&points[0]); err != nil { + return nil, errors.New("json object decode error for data format: opentsdb") + } + } + + metrics := make([]telegraf.Metric, 0, len(points)) + for i := range points { + pt := points[i] + + // Convert timestamp to Go time. + // If time value is over a billion then it's microseconds. + var ts time.Time + if pt.Time < 10000000000 { + ts = time.Unix(pt.Time, 0) + } else { + ts = time.Unix(pt.Time/1000, (pt.Time%1000)*1000) + } + + var tags map[string]string + if len(p.DefaultTags) > 0 { + tags = make(map[string]string) + for k, v := range p.DefaultTags { + tags[k] = v + } + for k, v := range pt.Tags { + tags[k] = v + } + } else { + tags = pt.Tags + } + + mt := metric.New(pt.Metric, tags, map[string]interface{}{"value": pt.Value}, ts) + metrics = append(metrics, mt) + } + + return metrics, nil +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("can not parse the line: %s, for data format: opentsdb", line) + } + + return metrics[0], nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} diff --git a/plugins/parsers/opentsdb_json/parser_test.go b/plugins/parsers/opentsdb_json/parser_test.go new file mode 100644 index 0000000000000..21dc09282ec6b --- /dev/null +++ b/plugins/parsers/opentsdb_json/parser_test.go @@ -0,0 +1,71 @@ +package opentsdb + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const JsonArray = `[ + { + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 18, + "tags": { + "host": "web01", + "dc": "lga" + } + }, + { + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 9, + "tags": { + "host": "web02", + "dc": "lga" + } + } +]` + +const JsonObject = `{ + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 18, + "tags": { + "host": "web01", + "dc": "lga" + } +}` + +func TestParseJSONArray(t *testing.T) { + parser := &Parser{} + metrics, err := parser.Parse([]byte(JsonArray)) + require.NoError(t, err) + require.Len(t, metrics, 2) + require.Equal(t, "sys.cpu.nice", metrics[0].Name()) + require.Equal(t, map[string]string{ + "host": "web01", + "dc": "lga", + }, metrics[0].Tags()) + require.Equal(t, map[string]interface{}{ + "value": float64(18), + }, metrics[0].Fields()) + require.Equal(t, time.Unix(1346846400, 0), metrics[0].Time()) +} + +func TestParseJSONObject(t *testing.T) { + parser := &Parser{} + metrics, err := parser.Parse([]byte(JsonObject)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "sys.cpu.nice", metrics[0].Name()) + require.Equal(t, map[string]string{ + "host": "web01", + "dc": "lga", + }, metrics[0].Tags()) + require.Equal(t, map[string]interface{}{ + "value": float64(18), + }, metrics[0].Fields()) + require.Equal(t, time.Unix(1346846400, 0), metrics[0].Time()) +}