Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output stats to the Instrumental TCP Collector #1139

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
Expand Down
25 changes: 25 additions & 0 deletions plugins/outputs/instrumental/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Instrumental Output Plugin

This plugin writes to the [Instrumental Collector API](https://instrumentalapp.com/docs/tcp-collector)
and requires a Project-specific API token.

Instrumental accepts stats in a format very close to Graphite, with the only difference being that
the type of stat (gauge, increment) is the first token, separated from the metric itself
by whitespace. The `increment` type is only used if the metric comes in as a counter through `[[input.statsd]]`.

## Configuration:

```toml
[[outputs.instrumental]]
## Project API Token (required)
api_token = "API Token" # required
## Prefix the metrics with a given name
prefix = ""
## Stats output template (Graphite formatting)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## Timeout in seconds to connect
timeout = "2s"
## Debug true - Print communcation to Instrumental
debug = false
```
192 changes: 192 additions & 0 deletions plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package instrumental

import (
"fmt"
"io"
"log"
"net"
"regexp"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)

type Instrumental struct {
Host string
ApiToken string
Prefix string
DataFormat string
Template string
Timeout internal.Duration
Debug bool

conn net.Conn
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to keep an open connection at all times or to open a connection for every Write call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, best to keep an open connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Do I also need to check for closed connections and re-connect or is that something Telegraf will notice and re-call Connect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you will need to check for that

}

const (
DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
)

var (
StatIncludesBadChar = regexp.MustCompile("[^[:alnum:][:blank:]-_.]")
)

var sampleConfig = `
## Project API Token (required)
api_token = "API Token" # required
## Prefix the metrics with a given name
prefix = ""
## Stats output template (Graphite formatting)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## Timeout in seconds to connect
timeout = "2s"
## Display Communcation to Instrumental
debug = false
`

func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil {
i.conn = nil
return err
}

err = i.authenticate(connection)
if err != nil {
i.conn = nil
return err
}

return nil
}

func (i *Instrumental) Close() error {
i.conn.Close()
i.conn = nil
return nil
}

func (i *Instrumental) Write(metrics []telegraf.Metric) error {
if i.conn == nil {
err := i.Connect()
if err != nil {
return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err)
}
}

s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
if err != nil {
return err
}

var points []string
var metricType string
var toSerialize telegraf.Metric
var newTags map[string]string

for _, metric := range metrics {
// Pull the metric_type out of the metric's tags. We don't want the type
// to show up with the other tags pulled from the system, as they go in the
// beginning of the line instead.
// e.g we want:
//
// increment some_prefix.host.tag1.tag2.tag3.field value timestamp
//
// vs
//
// increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
//
newTags = metric.Tags()
metricType = newTags["metric_type"]
delete(newTags, "metric_type")

toSerialize, _ = telegraf.NewMetric(
metric.Name(),
newTags,
metric.Fields(),
metric.Time(),
)

stats, err := s.Serialize(toSerialize)
if err != nil {
log.Printf("Error serializing a metric to Instrumental: %s", err)
}

switch metricType {
case "counter":
fallthrough
case "histogram":
metricType = "increment"
default:
metricType = "gauge"
}

for _, stat := range stats {
if !StatIncludesBadChar.MatchString(stat) {
points = append(points, fmt.Sprintf("%s %s", metricType, stat))
} else if i.Debug {
log.Printf("Unable to send bad stat: %s", stat)
}
}
}

allPoints := strings.Join(points, "\n") + "\n"
_, err = fmt.Fprintf(i.conn, allPoints)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can check here if err == io.EOF, which would indicate that the connection has been closed


if i.Debug {
log.Println(allPoints)
}

if err != nil {
if err == io.EOF {
i.Close()
}

return err
}

return nil
}

func (i *Instrumental) Description() string {
return "Configuration for sending metrics to an Instrumental project"
}

func (i *Instrumental) SampleConfig() string {
return sampleConfig
}

func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After authenticating, you should get two "ok\n"s back from the socket, one for the HELLO and one for the AUTHENTICATE. You should not send any metrics before receiving those responses.

if err != nil {
return err
}

// The response here will either be two "ok"s or an error message.
responses := make([]byte, 512)
if _, err = conn.Read(responses); err != nil {
return err
}

if string(responses)[:6] != "ok\nok\n" {
return fmt.Errorf("Authentication failed: %s", responses)
}

i.conn = conn
return nil
}

func init() {
outputs.Add("instrumental", func() telegraf.Output {
return &Instrumental{
Host: DefaultHost,
Template: graphite.DEFAULT_TEMPLATE,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what the best way to provide default values is here. Does this get over-written by the Config parsing or does any already-set value take precedence?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will get overwritten by the config file parsing

}
})
}
114 changes: 114 additions & 0 deletions plugins/outputs/instrumental/instrumental_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package instrumental

import (
"bufio"
"net"
"net/textproto"
"sync"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
)

func TestWrite(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go TCPServer(t, &wg)
// Give the fake TCP server some time to start:
time.Sleep(time.Millisecond * 100)

i := Instrumental{
Host: "127.0.0.1",
ApiToken: "abc123token",
Prefix: "my.prefix",
}
i.Connect()

// Default to gauge
m1, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "metric_type": "set"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2}
i.Write(metrics)
i.Close()

// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
"my_histogram",
map[string]string{"host": "192.168.0.1", "metric_type": "histogram"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// We will drop metrics that simply won't be accepted by Instrumental
m4, _ := telegraf.NewMetric(
"bad_values",
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
map[string]interface{}{"value": "\" 3:30\""},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m5, _ := telegraf.NewMetric(
"my_counter",
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

metrics = []telegraf.Metric{m3, m4, m5}
i.Write(metrics)

wg.Wait()
i.Close()
}

func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
defer wg.Done()
conn, _ := tcpServer.Accept()
conn.SetDeadline(time.Now().Add(1 * time.Second))
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)

hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)

conn.Write([]byte("ok\nok\n"))

data1, _ := tp.ReadLine()
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
data2, _ := tp.ReadLine()
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)

conn, _ = tcpServer.Accept()
conn.SetDeadline(time.Now().Add(1 * time.Second))
reader = bufio.NewReader(conn)
tp = textproto.NewReader(reader)

hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)

conn.Write([]byte("ok\nok\n"))

data3, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
data4, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data4)

conn.Close()
}