Skip to content

Commit

Permalink
fix postgresql 'name', and 'oid' data types by switching to a driver
Browse files Browse the repository at this point in the history
that handles them properly
  • Loading branch information
james-lawrence authored and James Lawrence committed Aug 11, 2016
1 parent 1989a58 commit bccef14
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ should now look like:
evaluated at every flush interval, rather than once at startup. This makes it
consistent with the behavior of `collection_jitter`.

- postgresql plugins switched drivers to better handle oid and name data types.
allowed removal of special casing data types by column name.

### Features

- [#1617](https://github.com/influxdata/telegraf/pull/1617): postgresql_extensible now handles name and oid types correctly.
- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
- [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
Expand Down
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc
github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a
github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720
github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36
github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453
github.com/miekg/dns cce6c130cdb92c752850880fd285bea1d64439dd
github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
Expand Down Expand Up @@ -63,3 +62,4 @@ gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4
github.com/jackc/pgx bb73d8427902891bbad7b949b9c60b32949d935f
102 changes: 94 additions & 8 deletions plugins/inputs/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"bytes"
"database/sql"
"fmt"
"net"
"net/url"
"regexp"
"sort"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"

"github.com/lib/pq"
"github.com/jackc/pgx"
"github.com/jackc/pgx/stdlib"
)

type Postgresql struct {
Expand All @@ -22,7 +25,7 @@ type Postgresql struct {
sanitizedAddress string
}

var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
var ignoredColumns = map[string]bool{"stats_reset": true}

var sampleConfig = `
## specify address via a url matching:
Expand Down Expand Up @@ -66,7 +69,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
p.Address = localhost
}

db, err := sql.Open("postgres", p.Address)
db, err := connect(p.Address)
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +144,7 @@ var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?")
func (p *Postgresql) SanitizedAddress() (_ string, err error) {
var canonicalizedAddress string
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
canonicalizedAddress, err = pq.ParseURL(p.Address)
canonicalizedAddress, err = parseURL(p.Address)
if err != nil {
return p.sanitizedAddress, err
}
Expand Down Expand Up @@ -177,10 +180,7 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
}
if columnMap["datname"] != nil {
// extract the database name from the column map
dbnameChars := (*columnMap["datname"]).([]uint8)
for i := 0; i < len(dbnameChars); i++ {
dbname.WriteString(string(dbnameChars[i]))
}
dbname.WriteString((*columnMap["datname"]).(string))
} else {
dbname.WriteString("postgres")
}
Expand Down Expand Up @@ -210,3 +210,89 @@ func init() {
return &Postgresql{}
})
}

// pulled from lib/pq
// ParseURL no longer needs to be used by clients of this library since supplying a URL as a
// connection string to sql.Open() is now supported:
//
// sql.Open("postgres", "postgres://bob:[email protected]:5432/mydb?sslmode=verify-full")
//
// It remains exported here for backwards-compatibility.
//
// ParseURL converts a url to a connection string for driver.Open.
// Example:
//
// "postgres://bob:[email protected]:5432/mydb?sslmode=verify-full"
//
// converts to:
//
// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full"
//
// A minimal example:
//
// "postgres://"
//
// This will be blank, causing driver.Open to use all of the defaults
func parseURL(uri string) (string, error) {
u, err := url.Parse(uri)
if err != nil {
return "", err
}

if u.Scheme != "postgres" && u.Scheme != "postgresql" {
return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme)
}

var kvs []string
escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`)
accrue := func(k, v string) {
if v != "" {
kvs = append(kvs, k+"="+escaper.Replace(v))
}
}

if u.User != nil {
v := u.User.Username()
accrue("user", v)

v, _ = u.User.Password()
accrue("password", v)
}

if host, port, err := net.SplitHostPort(u.Host); err != nil {
accrue("host", u.Host)
} else {
accrue("host", host)
accrue("port", port)
}

if u.Path != "" {
accrue("dbname", u.Path[1:])
}

q := u.Query()
for k := range q {
accrue(k, q.Get(k))
}

sort.Strings(kvs) // Makes testing easier (not a performance concern)
return strings.Join(kvs, " "), nil
}

func connect(address string) (*sql.DB, error) {
if strings.HasPrefix(address, "postgres://") || strings.HasPrefix(address, "postgresql://") {
return sql.Open("pgx", address)
}

config, err := pgx.ParseDSN(address)
if err != nil {
return nil, err
}

pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: config})
if err != nil {
return nil, err
}

return stdlib.OpenFromConnPool(pool)
}
31 changes: 29 additions & 2 deletions plugins/inputs/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
for _, col := range p.AllColumns {
availableColumns[col] = true
}

intMetrics := []string{
"xact_commit",
"xact_rollback",
Expand All @@ -42,7 +43,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
"temp_files",
"temp_bytes",
"deadlocks",
"numbackends",
"buffers_alloc",
"buffers_backend",
"buffers_backend_fsync",
Expand All @@ -53,9 +53,20 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
"maxwritten_clean",
}

int32Metrics := []string{
"numbackends",
}

floatMetrics := []string{
"blk_read_time",
"blk_write_time",
"checkpoint_write_time",
"checkpoint_sync_time",
}

stringMetrics := []string{
"datname",
"datid",
}

metricsCounted := 0
Expand All @@ -68,6 +79,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
}
}

for _, metric := range int32Metrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasInt32Field("postgresql", metric))
metricsCounted++
}
}

for _, metric := range floatMetrics {
_, ok := availableColumns[metric]
if ok {
Expand All @@ -76,8 +95,16 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
}
}

for _, metric := range stringMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasStringField("postgresql", metric))
metricsCounted++
}
}

assert.True(t, metricsCounted > 0)
//assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
}

func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
Expand Down
Loading

0 comments on commit bccef14

Please sign in to comment.