From 8b2578b475f796fa08dd46064179b5384506bd22 Mon Sep 17 00:00:00 2001 From: "Dragostin Yanev (netixen)" Date: Thu, 11 Feb 2016 01:28:52 +0200 Subject: [PATCH 1/2] Add NATS consumer input plugin. --- Godeps | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/nats_consumer/README.md | 38 ++++ plugins/inputs/nats_consumer/nats_consumer.go | 202 ++++++++++++++++++ .../nats_consumer/nats_consumer_test.go | 152 +++++++++++++ 5 files changed, 394 insertions(+) create mode 100644 plugins/inputs/nats_consumer/README.md create mode 100644 plugins/inputs/nats_consumer/nats_consumer.go create mode 100644 plugins/inputs/nats_consumer/nats_consumer_test.go diff --git a/Godeps b/Godeps index 5cdfecbe72c7d..0b9a1672761dd 100644 --- a/Godeps +++ b/Godeps @@ -28,6 +28,7 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3 github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9 +github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988 github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index e7329b042d990..794885129f252 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -22,6 +22,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" + _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md new file mode 100644 index 0000000000000..f3b67c9d54a7f --- /dev/null +++ b/plugins/inputs/nats_consumer/README.md @@ -0,0 +1,38 @@ +# NATS Consumer + +The [NATS](http://www.nats.io/about/) consumer plugin reads from +specified NATS subjects and adds messages to InfluxDB. The plugin expects messages +in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). +A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/) +is used when subscribing to subjects so multiple instances of telegraf can read +from a NATS cluster in parallel. + +## Configuration +``` +# Read metrics from NATS subject(s) +[[inputs.nats_consumer]] + ### urls of NATS servers + servers = ["nats://localhost:4222"] + ### Use Transport Layer Security + secure = false + ### subject(s) to consume + subjects = ["telegraf"] + ### name a queue group + queue_group = "telegraf_consumers" + ### Maximum number of points to buffer between collection intervals + point_buffer = 100000 + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +## Testing + +To run tests: + +``` +go test +``` \ No newline at end of file diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go new file mode 100644 index 0000000000000..4b25fa0a1941f --- /dev/null +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -0,0 +1,202 @@ +package natsconsumer + +import ( + "fmt" + "log" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/nats-io/nats" +) + +type natsError struct { + conn *nats.Conn + sub *nats.Subscription + err error +} + +func (e natsError) Error() string { + return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s", + e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue) +} + +type natsConsumer struct { + QueueGroup string + Subjects []string + Servers []string + Secure bool + + PointBuffer int + parser parsers.Parser + + sync.Mutex + Conn *nats.Conn + Subs []*nats.Subscription + + // channel for all incoming NATS messages + in chan *nats.Msg + // channel for all NATS read errors + errs chan error + // channel for all incoming parsed points + metricC chan telegraf.Metric + done chan struct{} +} + +var sampleConfig = ` + ### urls of NATS servers + servers = ["nats://localhost:4222"] + ### Use Transport Layer Security + secure = false + ### subject(s) to consume + subjects = ["telegraf"] + ### name a queue group + queue_group = "telegraf_consumers" + ### Maximum number of points to buffer between collection intervals + point_buffer = 100000 + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (n *natsConsumer) SampleConfig() string { + return sampleConfig +} + +func (n *natsConsumer) Description() string { + return "Read metrics from NATS subject(s)" +} + +func (n *natsConsumer) SetParser(parser parsers.Parser) { + n.parser = parser +} + +func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) { + select { + case n.errs <- natsError{conn: c, sub: s, err: e}: + default: + return + } +} + +// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. +func (n *natsConsumer) Start() error { + n.Lock() + defer n.Unlock() + + var connectErr error + + opts := nats.DefaultOptions + opts.Servers = n.Servers + opts.Secure = n.Secure + + if n.Conn == nil || n.Conn.IsClosed() { + n.Conn, connectErr = opts.Connect() + if connectErr != nil { + return connectErr + } + + // Setup message and error channels + n.errs = make(chan error) + n.Conn.SetErrorHandler(n.natsErrHandler) + + n.in = make(chan *nats.Msg) + for _, subj := range n.Subjects { + sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in) + if err != nil { + return err + } + n.Subs = append(n.Subs, sub) + } + } + + n.done = make(chan struct{}) + if n.PointBuffer == 0 { + n.PointBuffer = 100000 + } + + n.metricC = make(chan telegraf.Metric, n.PointBuffer) + + // Start the message reader + go n.receiver() + log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", + n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) + + return nil +} + +// receiver() reads all incoming messages from NATS, and parses them into +// influxdb metric points. +func (n *natsConsumer) receiver() { + defer n.clean() + for { + select { + case <-n.done: + return + case err := <-n.errs: + log.Printf("error reading from %s\n", err.Error()) + case msg := <-n.in: + metrics, err := n.parser.Parse(msg.Data) + if err != nil { + log.Printf("subject: %s, error: %s", msg.Subject, err.Error()) + } + + for _, metric := range metrics { + select { + case n.metricC <- metric: + continue + default: + log.Printf("NATS Consumer buffer is full, dropping a metric." + + " You may want to increase the point_buffer setting") + } + } + + } + } +} + +func (n *natsConsumer) clean() { + n.Lock() + defer n.Unlock() + close(n.in) + close(n.metricC) + close(n.errs) + + for _, sub := range n.Subs { + if err := sub.Unsubscribe(); err != nil { + log.Printf("Error unsubscribing from subject %s in queue %s: %s\n", + sub.Subject, sub.Queue, err.Error()) + } + } + + if n.Conn != nil && !n.Conn.IsClosed() { + n.Conn.Close() + } +} + +func (n *natsConsumer) Stop() { + n.Lock() + close(n.done) + n.Unlock() +} + +func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { + n.Lock() + defer n.Unlock() + npoints := len(n.metricC) + for i := 0; i < npoints; i++ { + point := <-n.metricC + acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + } + return nil +} + +func init() { + inputs.Add("nats_consumer", func() telegraf.Input { + return &natsConsumer{} + }) +} diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go new file mode 100644 index 0000000000000..50c663cb4a99b --- /dev/null +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -0,0 +1,152 @@ +package natsconsumer + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/nats-io/nats" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" + testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" + testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257" + pointBuffer = 5 +) + +func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { + in := make(chan *nats.Msg, pointBuffer) + n := &natsConsumer{ + QueueGroup: "test", + Subjects: []string{"telegraf"}, + Servers: []string{"nats://localhost:4222"}, + Secure: false, + PointBuffer: pointBuffer, + in: in, + errs: make(chan error, pointBuffer), + done: make(chan struct{}), + metricC: make(chan telegraf.Metric, pointBuffer), + } + return n, in +} + +// Test that the parser parses NATS messages into points +func TestRunParser(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- natsMsg(testMsg) + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } +} + +// Test that the parser ignores invalid messages +func TestRunParserInvalidMsg(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- natsMsg(invalidMsg) + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +// Test that points are dropped when we hit the buffer limit +func TestRunParserRespectsBuffer(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + for i := 0; i < pointBuffer+1; i++ { + in <- natsMsg(testMsg) + } + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != pointBuffer { + t.Errorf("got %v, expected %v", a, pointBuffer) + } +} + +// Test that the parser parses nats messages into points +func TestRunParserAndGather(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- natsMsg(testMsg) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses nats messages into points +func TestRunParserAndGatherGraphite(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go n.receiver() + in <- natsMsg(testMsgGraphite) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "cpu_load_short_graphite", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses nats messages into points +func TestRunParserAndGatherJSON(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + go n.receiver() + in <- natsMsg(testMsgJSON) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "nats_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} + +func natsMsg(val string) *nats.Msg { + return &nats.Msg{ + Subject: "telegraf", + Data: []byte(val), + } +} From 5c4146fe68df2d7c453e9c1c0b7195f75643bb1e Mon Sep 17 00:00:00 2001 From: "Dragostin Yanev (netixen)" Date: Fri, 12 Feb 2016 12:05:33 +0200 Subject: [PATCH 2/2] Change point_buffer to metric_buffer to conform will changes in https://github.com/influxdata/telegraf/pull/676 --- plugins/inputs/nats_consumer/README.md | 4 +- plugins/inputs/nats_consumer/nats_consumer.go | 28 +++++++------- .../nats_consumer/nats_consumer_test.go | 38 +++++++++---------- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index f3b67c9d54a7f..b2d027039f86f 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -19,8 +19,8 @@ from a NATS cluster in parallel. subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 4b25fa0a1941f..56d56990f27a8 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -28,8 +28,8 @@ type natsConsumer struct { Servers []string Secure bool - PointBuffer int - parser parsers.Parser + MetricBuffer int + parser parsers.Parser sync.Mutex Conn *nats.Conn @@ -39,7 +39,7 @@ type natsConsumer struct { in chan *nats.Msg // channel for all NATS read errors errs chan error - // channel for all incoming parsed points + // channel for all incoming parsed metrics metricC chan telegraf.Metric done chan struct{} } @@ -53,8 +53,8 @@ var sampleConfig = ` subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read @@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error { } n.done = make(chan struct{}) - if n.PointBuffer == 0 { - n.PointBuffer = 100000 + if n.MetricBuffer == 0 { + n.MetricBuffer = 100000 } - n.metricC = make(chan telegraf.Metric, n.PointBuffer) + n.metricC = make(chan telegraf.Metric, n.MetricBuffer) // Start the message reader go n.receiver() @@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error { } // receiver() reads all incoming messages from NATS, and parses them into -// influxdb metric points. +// telegraf metrics. func (n *natsConsumer) receiver() { defer n.clean() for { @@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() { continue default: log.Printf("NATS Consumer buffer is full, dropping a metric." + - " You may want to increase the point_buffer setting") + " You may want to increase the metric_buffer setting") } } @@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() { func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { n.Lock() defer n.Unlock() - npoints := len(n.metricC) - for i := 0; i < npoints; i++ { - point := <-n.metricC - acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + nmetrics := len(n.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-n.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 50c663cb4a99b..214695d91a672 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -15,26 +15,26 @@ const ( testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - pointBuffer = 5 + metricBuffer = 5 ) func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { - in := make(chan *nats.Msg, pointBuffer) + in := make(chan *nats.Msg, metricBuffer) n := &natsConsumer{ - QueueGroup: "test", - Subjects: []string{"telegraf"}, - Servers: []string{"nats://localhost:4222"}, - Secure: false, - PointBuffer: pointBuffer, - in: in, - errs: make(chan error, pointBuffer), - done: make(chan struct{}), - metricC: make(chan telegraf.Metric, pointBuffer), + QueueGroup: "test", + Subjects: []string{"telegraf"}, + Servers: []string{"nats://localhost:4222"}, + Secure: false, + MetricBuffer: metricBuffer, + in: in, + errs: make(chan error, metricBuffer), + done: make(chan struct{}), + metricC: make(chan telegraf.Metric, metricBuffer), } return n, in } -// Test that the parser parses NATS messages into points +// Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -64,24 +64,24 @@ func TestRunParserInvalidMsg(t *testing.T) { } } -// Test that points are dropped when we hit the buffer limit +// Test that metrics are dropped when we hit the buffer limit func TestRunParserRespectsBuffer(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() - for i := 0; i < pointBuffer+1; i++ { + for i := 0; i < metricBuffer+1; i++ { in <- natsMsg(testMsg) } time.Sleep(time.Millisecond) - if a := len(n.metricC); a != pointBuffer { - t.Errorf("got %v, expected %v", a, pointBuffer) + if a := len(n.metricC); a != metricBuffer { + t.Errorf("got %v, expected %v", a, metricBuffer) } } -// Test that the parser parses nats messages into points +// Test that the parser parses line format messages into metrics func TestRunParserAndGather(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -101,7 +101,7 @@ func TestRunParserAndGather(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses nats messages into points +// Test that the parser parses graphite format messages into metrics func TestRunParserAndGatherGraphite(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -121,7 +121,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses nats messages into points +// Test that the parser parses json format messages into metrics func TestRunParserAndGatherJSON(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done)