Skip to content

Commit

Permalink
Cherry-pick elastic#12913 to 7.3: Fix keyspace configuration in redis…
Browse files Browse the repository at this point in the history
… key metricset (elastic#12999)

Fix incoherent behaviour when keyspace is specified in the redis host
URL and not in some of the key patterns. If it was not specified it used
the default 0 instead of using the one configured in the redis host.

(cherry picked from commit 03d7870)
  • Loading branch information
jsoriano authored Jul 23, 2019
1 parent c687958 commit 020a86f
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Reuse connections in PostgreSQL metricsets. {issue}12504[12504] {pull}12603[12603]
- PdhExpandWildCardPathW will not expand counter paths in 32 bit windows systems, workaround will use a different function.{issue}12590[12590]{pull}12622[12622]
- In the elasticsearch/node_stats metricset, if xpack is enabled, make parsing of ES node load average optional as ES on Windows doesn't report load average. {pull}12866[12866]
- Fix incoherent behaviour in redis key metricset when keyspace is specified both in host URL and key pattern {pull}12913[12913]
- Fix connections leak in redis module {pull}12914[12914] {pull}12950[12950]
- Fix wrong uptime reporting by system/uptime metricset under Windows. {pull}12915[12915]
- Print errors that were being omitted in vSphere metricsets {pull}12816[12816]
Expand Down
21 changes: 14 additions & 7 deletions metricbeat/module/redis/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type MetricSet struct {

// KeyPattern contains the information required to query keys
type KeyPattern struct {
Keyspace uint `config:"keyspace"`
Keyspace *uint `config:"keyspace"`
Pattern string `config:"pattern" validate:"required"`
Limit uint `config:"limit"`
}
Expand Down Expand Up @@ -79,16 +79,22 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}()

for _, p := range m.patterns {
if err := redis.Select(conn, p.Keyspace); err != nil {
msg := errors.Wrapf(err, "Failed to select keyspace %d", p.Keyspace)
var keyspace uint
if p.Keyspace == nil {
keyspace = m.OriginalDBNumber()
} else {
keyspace = *p.Keyspace
}
if err := redis.Select(conn, keyspace); err != nil {
msg := errors.Wrapf(err, "Failed to select keyspace %d", keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}

keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit)
if err != nil {
msg := errors.Wrapf(err, "Failed to list keys in keyspace %d with pattern '%s'", p.Keyspace, p.Pattern)
msg := errors.Wrapf(err, "Failed to list keys in keyspace %d with pattern '%s'", keyspace, p.Pattern)
m.Logger().Error(msg)
r.Error(err)
continue
Expand All @@ -101,14 +107,15 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
for _, key := range keys {
keyInfo, err := redis.FetchKeyInfo(conn, key)
if err != nil {
msg := fmt.Errorf("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
msg := fmt.Errorf("Failed to fetch key info for key %s in keyspace %d", key, keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}
event := eventMapping(p.Keyspace, keyInfo)
event := eventMapping(keyspace, keyInfo)
if !r.Event(event) {
return errors.New("metricset has closed")
m.Logger().Debug("Failed to report event, interrupting fetch")
return nil
}
}
}
Expand Down
66 changes: 61 additions & 5 deletions metricbeat/module/redis/key/key_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package key

import (
"fmt"
"testing"

rd "github.com/garyburd/redigo/redis"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/tests/compose"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/redis"
Expand All @@ -35,7 +37,7 @@ var host = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort()
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "redis")

addEntry(t)
addEntry(t, "foo", 1)

ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
Expand All @@ -50,7 +52,7 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "redis")

addEntry(t)
addEntry(t, "foo", 1)

ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
Expand All @@ -59,15 +61,69 @@ func TestData(t *testing.T) {
}
}

func TestFetchMultipleKeyspaces(t *testing.T) {
compose.EnsureUp(t, "redis")

expectedKeyspaces := map[string]uint{
"foo": 0,
"bar": 1,
"baz": 2,
}
expectedEvents := len(expectedKeyspaces)

for name, keyspace := range expectedKeyspaces {
addEntry(t, name, keyspace)
}

config := getConfig()
config["key.patterns"] = []map[string]interface{}{
{
"pattern": "foo",
"keyspace": 0,
},
{
"pattern": "bar",
// keyspace set to 1 in the host url
},
{
"pattern": "baz",
"keyspace": 2,
},
}

ms := mbtest.NewReportingMetricSetV2Error(t, config)
events, err := mbtest.ReportingFetchV2Error(ms)

assert.Len(t, err, 0)
assert.Len(t, events, expectedEvents)

for _, event := range events {
name := event.MetricSetFields["name"].(string)
expectedKeyspace, found := expectedKeyspaces[name]
if !assert.True(t, found, name+" not expected") {
continue
}
id := event.MetricSetFields["id"].(string)
assert.Equal(t, fmt.Sprintf("%d:%s", expectedKeyspace, name), id)
keyspace := event.ModuleFields["keyspace"].(common.MapStr)
keyspaceID := keyspace["id"].(string)
assert.Equal(t, fmt.Sprintf("db%d", expectedKeyspace), keyspaceID)
}
}

// addEntry adds an entry to redis
func addEntry(t *testing.T) {
func addEntry(t *testing.T, key string, keyspace uint) {
// Insert at least one event to make sure db exists
c, err := rd.Dial("tcp", host)
if err != nil {
t.Fatal("connect", err)
}
_, err = c.Do("SELECT", keyspace)
if err != nil {
t.Fatal("select", err)
}
defer c.Close()
_, err = c.Do("SET", "foo", "bar", "EX", "360")
_, err = c.Do("SET", key, "bar", "EX", "360")
if err != nil {
t.Fatal("SET", err)
}
Expand All @@ -77,7 +133,7 @@ func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "redis",
"metricsets": []string{"key"},
"hosts": []string{host},
"hosts": []string{host + "/1"},
"key.patterns": []map[string]interface{}{
{
"pattern": "foo",
Expand Down
8 changes: 7 additions & 1 deletion metricbeat/module/redis/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// MetricSet for fetching Redis server information and statistics.
type MetricSet struct {
mb.BaseMetricSet
pool *rd.Pool
pool *Pool
}

// NewMetricSet creates the base for Redis metricsets
Expand Down Expand Up @@ -74,6 +74,12 @@ func (m *MetricSet) Close() error {
return m.pool.Close()
}

// OriginalDBNumber returns the originally configured database number, this can be used by
// metricsets that change keyspace to go back to the originally configured one
func (m *MetricSet) OriginalDBNumber() uint {
return uint(m.pool.DBNumber())
}

func getPasswordDBNumber(hostData mb.HostData) (string, int, error) {
// If there are more than one place specified password/db-number, use password/db-number in query
uriParsed, err := url.Parse(hostData.URI)
Expand Down
25 changes: 21 additions & 4 deletions metricbeat/module/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"strings"
"time"

"github.com/elastic/beats/libbeat/logp"

rd "github.com/garyburd/redigo/redis"

"github.com/elastic/beats/libbeat/logp"
)

// Redis types
Expand Down Expand Up @@ -171,14 +171,26 @@ func Select(c rd.Conn, keyspace uint) error {
return err
}

// Pool is a redis pool that keeps track of the database number originally configured
type Pool struct {
*rd.Pool

dbNumber int
}

// DBNumber returns the db number originally used to configure this pool
func (p *Pool) DBNumber() int {
return p.dbNumber
}

// CreatePool creates a redis connection pool
func CreatePool(
host, password, network string,
dbNumber int,
maxConn int,
idleTimeout, connTimeout time.Duration,
) *rd.Pool {
return &rd.Pool{
) *Pool {
pool := &rd.Pool{
MaxIdle: maxConn,
IdleTimeout: idleTimeout,
Dial: func() (rd.Conn, error) {
Expand All @@ -190,4 +202,9 @@ func CreatePool(
rd.DialWriteTimeout(connTimeout))
},
}

return &Pool{
Pool: pool,
dbNumber: dbNumber,
}
}

0 comments on commit 020a86f

Please sign in to comment.