Skip to content

Commit

Permalink
influxdb output Close connections before leaking them
Browse files Browse the repository at this point in the history
closes #1058
  • Loading branch information
sparrc committed Apr 19, 2016
1 parent 92e57ee commit 4d3719e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a
github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb e3fef5593c21644f2b43af55d6e17e70910b0e48
github.com/influxdata/influxdb 21db76b3374c733f37ed16ad93f3484020034351
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720
github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36
Expand Down
43 changes: 29 additions & 14 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,9 @@ func (i *InfluxDB) Connect() error {
return err
}

// Create Database if it doesn't exist
_, e := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", i.Database),
})

if e != nil {
log.Println("Database creation failed: " + e.Error())
err = createDatabase(c, i.Database)
if err != nil {
log.Println("Database creation failed: " + err.Error())
continue
}

Expand All @@ -144,8 +140,24 @@ func (i *InfluxDB) Connect() error {
return nil
}

func createDatabase(c client.Client, database string) error {
// Create Database if it doesn't exist
_, err := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", database),
})
return err
}

func (i *InfluxDB) Close() error {
// InfluxDB client does not provide a Close() function
var errS string
for j, _ := range i.conns {
if err := i.conns[j].Close(); err != nil {
errS += err.Error()
}
}
if errS != "" {
return fmt.Errorf("output influxdb close failed: %s", errS)
}
return nil
}

Expand Down Expand Up @@ -185,18 +197,21 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
p := rand.Perm(len(i.conns))
for _, n := range p {
if e := i.conns[n].Write(bp); e != nil {
log.Println("ERROR: " + e.Error())
// Log write failure
log.Printf("ERROR: %s", e)
// If the database was not found, try to recreate it
if strings.Contains(e.Error(), "database not found") {
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
log.Printf("ERROR: Database %s not found and failed to recreate\n",
i.Database)
}
}
} else {
err = nil
break
}
}

// If all of the writes failed, create a new connection array so that
// i.Connect() will be called on the next gather.
if err != nil {
i.conns = make([]client.Client, 0)
}
return err
}

Expand Down

0 comments on commit 4d3719e

Please sign in to comment.