Skip to content

Commit

Permalink
fix(redis): go-redis v9 regression missing metrics and reconnect hook (
Browse files Browse the repository at this point in the history
…#13415) (#15275)

* fix(redis): go-redis v9 regression missing metrics and reconnect hook

Signed-off-by: phanama <[email protected]>

* fix: golangci lint return values not checked in tests

Signed-off-by: phanama <[email protected]>

* chore: move dnsError var locally into func

Signed-off-by: phanama <[email protected]>

---------

Signed-off-by: phanama <[email protected]>
  • Loading branch information
phanama authored Jan 29, 2024
1 parent 2b45cc8 commit 9e941af
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ require (
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
Expand Down
41 changes: 14 additions & 27 deletions util/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"time"

ioutil "github.com/argoproj/argo-cd/v2/util/io"
Expand Down Expand Up @@ -155,41 +156,27 @@ type MetricsRegistry interface {
ObserveRedisRequestDuration(duration time.Duration)
}

var metricStartTimeKey = struct{}{}

type redisHook struct {
registry MetricsRegistry
}

func (rh *redisHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return context.WithValue(ctx, metricStartTimeKey, time.Now()), nil
}

func (rh *redisHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
cmdErr := cmd.Err()
rh.registry.IncRedisRequest(cmdErr != nil && cmdErr != redis.Nil)

startTime := ctx.Value(metricStartTimeKey).(time.Time)
duration := time.Since(startTime)
rh.registry.ObserveRedisRequestDuration(duration)

return nil
}

func (redisHook) BeforeProcessPipeline(ctx context.Context, _ []redis.Cmder) (context.Context, error) {
return ctx, nil
func (rh *redisHook) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := next(ctx, network, addr)
return conn, err
}
}

func (redisHook) AfterProcessPipeline(_ context.Context, _ []redis.Cmder) error {
return nil
}
func (rh *redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
startTime := time.Now()

func (redisHook) DialHook(next redis.DialHook) redis.DialHook {
return nil
}
err := next(ctx, cmd)
rh.registry.IncRedisRequest(err != nil && err != redis.Nil)
rh.registry.ObserveRedisRequestDuration(time.Since(startTime))

func (redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return nil
return err
}
}

func (redisHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
Expand Down
40 changes: 15 additions & 25 deletions util/cache/redis_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package cache

import (
"context"
"strings"
"errors"
"net"

"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
)

const NoSuchHostErr = "no such host"

type argoRedisHooks struct {
reconnectCallback func()
}
Expand All @@ -18,32 +17,23 @@ func NewArgoRedisHook(reconnectCallback func()) *argoRedisHooks {
return &argoRedisHooks{reconnectCallback: reconnectCallback}
}

func (hook *argoRedisHooks) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return ctx, nil
}

func (hook *argoRedisHooks) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
if cmd.Err() != nil && strings.Contains(cmd.Err().Error(), NoSuchHostErr) {
log.Warnf("Reconnect to redis because error: \"%v\"", cmd.Err())
hook.reconnectCallback()
}
return nil
}

func (hook *argoRedisHooks) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
return ctx, nil
}

func (hook *argoRedisHooks) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
return nil
}

func (hook *argoRedisHooks) DialHook(next redis.DialHook) redis.DialHook {
return nil
return func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := next(ctx, network, addr)
return conn, err
}
}

func (hook *argoRedisHooks) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return nil
return func(ctx context.Context, cmd redis.Cmder) error {
var dnsError *net.DNSError
err := next(ctx, cmd)
if err != nil && errors.As(err, &dnsError) {
log.Warnf("Reconnect to redis because error: \"%v\"", err)
hook.reconnectCallback()
}
return err
}
}

func (hook *argoRedisHooks) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
Expand Down
33 changes: 24 additions & 9 deletions util/cache/redis_hook_test.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,53 @@
package cache

import (
"context"
"errors"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"

"github.com/redis/go-redis/v9"
)

func Test_ReconnectCallbackHookCalled(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()

called := false
hook := NewArgoRedisHook(func() {
called = true
})

cmd := &redis.StringCmd{}
cmd.SetErr(errors.New("Failed to resync revoked tokens. retrying again in 1 minute: dial tcp: lookup argocd-redis on 10.179.0.10:53: no such host"))

_ = hook.AfterProcess(context.Background(), cmd)
faultyDNSRedisClient := redis.NewClient(&redis.Options{Addr: "invalidredishost.invalid:12345"})
faultyDNSRedisClient.AddHook(hook)

faultyDNSClient := NewRedisCache(faultyDNSRedisClient, 60*time.Second, RedisCompressionNone)
err = faultyDNSClient.Set(&Item{Key: "baz", Object: "foo"})
assert.Equal(t, called, true)
assert.Error(t, err)
}

func Test_ReconnectCallbackHookNotCalled(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()

called := false
hook := NewArgoRedisHook(func() {
called = true
})
cmd := &redis.StringCmd{}
cmd.SetErr(errors.New("Something wrong"))

_ = hook.AfterProcess(context.Background(), cmd)
redisClient := redis.NewClient(&redis.Options{Addr: mr.Addr()})
redisClient.AddHook(hook)
client := NewRedisCache(redisClient, 60*time.Second, RedisCompressionNone)

err = client.Set(&Item{Key: "foo", Object: "bar"})
assert.Equal(t, called, false)
assert.NoError(t, err)
}
92 changes: 92 additions & 0 deletions util/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,59 @@ package cache

import (
"context"
"strconv"
"testing"
"time"

promcm "github.com/prometheus/client_model/go"

"github.com/alicebob/miniredis/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

var (
redisRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "argocd_redis_request_total",
},
[]string{"initiator", "failed"},
)
redisRequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "argocd_redis_request_duration",
Buckets: []float64{0.1, 0.25, .5, 1, 2},
},
[]string{"initiator"},
)
)

type MockMetricsServer struct {
registry *prometheus.Registry
redisRequestCounter *prometheus.CounterVec
redisRequestHistogram *prometheus.HistogramVec
}

func NewMockMetricsServer() *MockMetricsServer {
registry := prometheus.NewRegistry()
registry.MustRegister(redisRequestCounter)
registry.MustRegister(redisRequestHistogram)
return &MockMetricsServer{
registry: registry,
redisRequestCounter: redisRequestCounter,
redisRequestHistogram: redisRequestHistogram,
}
}

func (m *MockMetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues("mock", strconv.FormatBool(failed)).Inc()
}

func (m *MockMetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
m.redisRequestHistogram.WithLabelValues("mock").Observe(duration.Seconds())
}

func TestRedisSetCache(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
Expand Down Expand Up @@ -70,3 +115,50 @@ func TestRedisSetCacheCompressed(t *testing.T) {

assert.Equal(t, testValue, result)
}

func TestRedisMetrics(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()

metric := &promcm.Metric{}
ms := NewMockMetricsServer()
redisClient := redis.NewClient(&redis.Options{Addr: mr.Addr()})
faultyRedisClient := redis.NewClient(&redis.Options{Addr: "invalidredishost.invalid:12345"})
CollectMetrics(redisClient, ms)
CollectMetrics(faultyRedisClient, ms)

client := NewRedisCache(redisClient, 60*time.Second, RedisCompressionNone)
faultyClient := NewRedisCache(faultyRedisClient, 60*time.Second, RedisCompressionNone)
var res string

//client successful request
err = client.Set(&Item{Key: "foo", Object: "bar"})
assert.NoError(t, err)
err = client.Get("foo", &res)
assert.NoError(t, err)

c, err := ms.redisRequestCounter.GetMetricWithLabelValues("mock", "false")
assert.NoError(t, err)
err = c.Write(metric)
assert.NoError(t, err)
assert.Equal(t, metric.Counter.GetValue(), float64(2))

//faulty client failed request
err = faultyClient.Get("foo", &res)
assert.Error(t, err)
c, err = ms.redisRequestCounter.GetMetricWithLabelValues("mock", "true")
assert.NoError(t, err)
err = c.Write(metric)
assert.NoError(t, err)
assert.Equal(t, metric.Counter.GetValue(), float64(1))

//both clients histogram count
o, err := ms.redisRequestHistogram.GetMetricWithLabelValues("mock")
assert.NoError(t, err)
err = o.(prometheus.Metric).Write(metric)
assert.NoError(t, err)
assert.Equal(t, int(metric.Histogram.GetSampleCount()), 3)
}

0 comments on commit 9e941af

Please sign in to comment.