Skip to content

Commit

Permalink
instrumental plugin, rewrite connection retries
Browse files Browse the repository at this point in the history
closes #1412

separate hello and authenticate functions,
force connection close at end of write cycle so we don't
hold open idle connections,
which has the benefit of mostly removing
the chance of getting hopelessly connection lost

bump instrumental agent version

fix test to deal with better better connect/reconnect logic and changed ident & auth handshake

Update CHANGELOG.md

correct URL from instrumental fork to origin and put the change in the correct part of the file

go fmt

undo split hello and auth commands, to reduce roundtrips
  • Loading branch information
janxious authored and sparrc committed Jul 14, 2016
1 parent 4651ab8 commit 21add2c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ should now look like:
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior

## v1.0 beta 2 [2016-06-21]

Expand Down
14 changes: 11 additions & 3 deletions plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Instrumental struct {
}

const (
DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
DefaultHost = "collector.instrumentalapp.com"
HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
)

var (
Expand All @@ -52,6 +54,7 @@ var sampleConfig = `

func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)

if err != nil {
i.conn = nil
return err
Expand Down Expand Up @@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return err
}

// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()

return nil
}

Expand All @@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string {
}

func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
_, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil {
return err
}
Expand Down
10 changes: 2 additions & 8 deletions plugins/outputs/instrumental/instrumental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func TestWrite(t *testing.T) {
ApiToken: "abc123token",
Prefix: "my.prefix",
}
i.Connect()

// Default to gauge
m1, _ := telegraf.NewMetric(
Expand All @@ -40,10 +39,8 @@ func TestWrite(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2}
i.Write(metrics)
i.Close()

// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
Expand All @@ -70,7 +67,6 @@ func TestWrite(t *testing.T) {
i.Write(metrics)

wg.Wait()
i.Close()
}

func TCPServer(t *testing.T, wg *sync.WaitGroup) {
Expand All @@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp := textproto.NewReader(reader)

hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)

conn.Write([]byte("ok\nok\n"))

data1, _ := tp.ReadLine()
Expand All @@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp = textproto.NewReader(reader)

hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)

conn.Write([]byte("ok\nok\n"))

data3, _ := tp.ReadLine()
Expand Down

0 comments on commit 21add2c

Please sign in to comment.