Skip to content

Commit

Permalink
Redis: include per-db keyspace info
Browse files Browse the repository at this point in the history
Closes #205
  • Loading branch information
sparrc committed Sep 23, 2015
1 parent b92a0d5 commit f8d64a7
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 202 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## v0.1.10 [unreleased]

### Release Notes

### Features
- [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info

### Bugfixes

## v0.1.9 [2015-09-22]

### Release Notes
Expand All @@ -14,7 +23,9 @@ file with only the cpu plugin defined, and the influxdb output defined.
- **Breaking Change**: The CPU collection plugin has been refactored to fix some
bugs and outdated dependency issues. At the same time, I also decided to fix
a naming consistency issue, so cpu_percentageIdle will become cpu_usage_idle.
Also, all CPU time measurements now have it indicated in their name, so cpu_idle will become cpu_time_idle. Additionally, cpu_time measurements are going to be dropped in the default config.
Also, all CPU time measurements now have it indicated in their name, so cpu_idle
will become cpu_time_idle. Additionally, cpu_time measurements are going to be
dropped in the default config.
- **Breaking Change**: The memory plugin has been refactored and some measurements
have been renamed for consistency. Some measurements have also been removed from being outputted. They are still being collected by gopsutil, and could easily be
re-added in a "verbose" mode if there is demand for it.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ifeq ($(UNAME), Linux)
endif

test: prepare docker-compose
$(GOBIN)/godep go test -v ./...
$(GOBIN)/godep go test ./...

test-short: prepare
$(GOBIN)/godep go test -short ./...
Expand Down
152 changes: 77 additions & 75 deletions plugins/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"net"
"net/url"
// "strconv"
"strconv"
"strings"
"sync"

Expand Down Expand Up @@ -137,88 +137,90 @@ func (r *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
}
}

c.Write([]byte("info\r\n"))

c.Write([]byte("INFO\r\n"))
c.Write([]byte("EOF\r\n"))
rdr := bufio.NewReader(c)

// Setup tags for all redis metrics
_, rPort, err := net.SplitHostPort(addr.Host)
if err != nil {
rPort = defaultPort
}
tags := map[string]string{"host": addr.String(), "port": rPort}

return gatherInfoOutput(rdr, acc, tags)
}

// gatherInfoOutput gathers
func gatherInfoOutput(
rdr *bufio.Reader,
acc plugins.Accumulator,
tags map[string]string,
) error {
scanner := bufio.NewScanner(rdr)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Println("reading standard input:", err)
}
line := scanner.Text()
if strings.Contains(line, "ERR") {
break
}

if len(line) == 0 || line[0] == '#' {
continue
}

parts := strings.SplitN(line, ":", 2)
if len(parts) < 2 {
continue
}

name := string(parts[0])
metric, ok := Tracking[name]
if !ok {
kline := strings.TrimSpace(string(parts[1]))
gatherKeyspaceLine(name, kline, acc, tags)
continue
}

// line, err := rdr.ReadString('\n')
// if err != nil {
// return err
// }

// if line[0] != '$' {
// return fmt.Errorf("bad line start: %s", ErrProtocolError)
// }

// line = strings.TrimSpace(line)

// szStr := line[0:]

// sz, err := strconv.Atoi(szStr)
// if err != nil {
// return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError)
// }

// var read int

// for read < sz {
// line, err := rdr.ReadString('\n')
// fmt.Printf(line)
// if err != nil {
// return err
// }

// read += len(line)
// if len(line) == 1 || line[0] == '#' {
// continue
// }

// _, rPort, err := net.SplitHostPort(addr.Host)
// if err != nil {
// rPort = defaultPort
// }
// tags := map[string]string{"host": addr.String(), "port": rPort}

// parts := strings.SplitN(line, ":", 2)
// if len(parts) < 2 {
// continue
// }
// name := string(parts[0])
// metric, ok := Tracking[name]
// if !ok {
// // See if this is the keyspace line
// if strings.Contains(string(parts[1]), "keys=") {
// tags["database"] = name
// acc.Add("foo", 999, tags)
// }
// continue
// }

// val := strings.TrimSpace(parts[1])
// ival, err := strconv.ParseUint(val, 10, 64)
// if err == nil {
// acc.Add(metric, ival, tags)
// continue
// }

// fval, err := strconv.ParseFloat(val, 64)
// if err != nil {
// return err
// }

// acc.Add(metric, fval, tags)
// }
val := strings.TrimSpace(parts[1])
ival, err := strconv.ParseUint(val, 10, 64)
if err == nil {
acc.Add(metric, ival, tags)
continue
}

fval, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}

acc.Add(metric, fval, tags)
}
return nil
}

// Parse the special Keyspace line at end of redis stats
// This is a special line that looks something like:
// db0:keys=2,expires=0,avg_ttl=0
// And there is one for each db on the redis instance
func gatherKeyspaceLine(
name string,
line string,
acc plugins.Accumulator,
tags map[string]string,
) {
if strings.Contains(line, "keys=") {
tags["database"] = name
dbparts := strings.Split(line, ",")
for _, dbp := range dbparts {
kv := strings.Split(dbp, "=")
ival, err := strconv.ParseUint(kv[1], 10, 64)
if err == nil {
acc.Add(kv[0], ival, tags)
}
}
}
}

func init() {
plugins.Add("redis", func() plugins.Plugin {
return &Redis{}
Expand Down
135 changes: 13 additions & 122 deletions plugins/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,151 +3,37 @@ package redis
import (
"bufio"
"fmt"
"net"
"strings"
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRedisGeneratesMetrics(t *testing.T) {
func TestRedisConnect(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

l, err := net.Listen("tcp", ":0")
require.NoError(t, err)

defer l.Close()

go func() {
c, err := l.Accept()
if err != nil {
return
}

buf := bufio.NewReader(c)

for {
line, err := buf.ReadString('\n')
if err != nil {
return
}

if line != "info\r\n" {
return
}

fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
}
}()

addr := fmt.Sprintf("redis://%s", l.Addr().String())
addr := fmt.Sprintf(testutil.GetLocalHost() + ":6379")

r := &Redis{
Servers: []string{addr},
}

var acc testutil.Accumulator

err = r.Gather(&acc)
err := r.Gather(&acc)
require.NoError(t, err)

checkInt := []struct {
name string
value uint64
}{
{"uptime", 238},
{"clients", 1},
{"used_memory", 1003936},
{"used_memory_rss", 811008},
{"used_memory_peak", 1003936},
{"used_memory_lua", 33792},
{"rdb_changes_since_last_save", 0},
{"total_connections_received", 2},
{"total_commands_processed", 1},
{"instantaneous_ops_per_sec", 0},
{"sync_full", 0},
{"sync_partial_ok", 0},
{"sync_partial_err", 0},
{"expired_keys", 0},
{"evicted_keys", 0},
{"keyspace_hits", 0},
{"keyspace_misses", 0},
{"pubsub_channels", 0},
{"pubsub_patterns", 0},
{"latest_fork_usec", 0},
{"connected_slaves", 0},
{"master_repl_offset", 0},
{"repl_backlog_active", 0},
{"repl_backlog_size", 1048576},
{"repl_backlog_histlen", 0},
}

for _, c := range checkInt {
assert.True(t, acc.CheckValue(c.name, c.value))
}

checkFloat := []struct {
name string
value float64
}{
{"mem_fragmentation_ratio", 0.81},
{"used_cpu_sys", 0.14},
{"used_cpu_user", 0.05},
{"used_cpu_sys_children", 0.00},
{"used_cpu_user_children", 0.00},
}

for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}
}

func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

l, err := net.Listen("tcp", ":0")
require.NoError(t, err)

defer l.Close()

go func() {
c, err := l.Accept()
if err != nil {
return
}

buf := bufio.NewReader(c)

for {
line, err := buf.ReadString('\n')
if err != nil {
return
}

if line != "info\r\n" {
return
}

fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
}
}()

addr := fmt.Sprintf("redis://%s", l.Addr().String())

r := &Redis{
Servers: []string{addr},
}

func TestRedis_ParseMetrics(t *testing.T) {
var acc testutil.Accumulator
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(testOutput))

err = r.Gather(&acc)
err := gatherInfoOutput(rdr, &acc, tags)
require.NoError(t, err)

checkInt := []struct {
Expand Down Expand Up @@ -179,6 +65,9 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
{"repl_backlog_active", 0},
{"repl_backlog_size", 1048576},
{"repl_backlog_histlen", 0},
{"keys", 2},
{"expires", 0},
{"avg_ttl", 0},
}

for _, c := range checkInt {
Expand Down Expand Up @@ -284,5 +173,7 @@ used_cpu_sys_children:0.00
used_cpu_user_children:0.00
# Keyspace
db0:keys=2,expires=0,avg_ttl=0
(error) ERR unknown command 'eof'
`
2 changes: 1 addition & 1 deletion scripts/circle-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ exit_if_fail godep go install -v ./...

# Run the tests
exit_if_fail godep go vet ./...
exit_if_fail godep go test -v -short ./...
exit_if_fail godep go test -short ./...

# Build binaries
build "linux" "amd64" $VERSION
Expand Down
Loading

0 comments on commit f8d64a7

Please sign in to comment.