Skip to content

Commit

Permalink
convert cache to a ttl cache
Browse files Browse the repository at this point in the history
  • Loading branch information
christopherzli committed Aug 28, 2024
1 parent eddbbb3 commit f60a77d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 40 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ require (

require (
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5
github.com/hashicorp/golang-lru v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.29.0
Expand Down
8 changes: 3 additions & 5 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type HandlerConfig struct {
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
LogFailedQueries bool `yaml:"log_failed_queries"`
FailedQueryCacheCapacity int `yaml:"failed_query_cache_capacity"`
FailedQueryTTL time.Duration `yaml:"failed_query_ttl"`
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand Down Expand Up @@ -76,11 +77,8 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
}

if cfg.FailedQueryCacheCapacity > 0 {
level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity)
FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, reg)
if errQueryCache != nil {
level.Warn(log).Log(errQueryCache.Error())
}
level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity, "ttl", cfg.FailedQueryTTL.String())
FailedQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, cfg.FailedQueryTTL, reg)
h.failedQueryCache = FailedQueryCache
}

Expand Down
41 changes: 19 additions & 22 deletions internal/cortex/frontend/transport/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package utils

import (
"fmt"
"github.com/hashicorp/golang-lru/v2/expirable"
"net/http"
"net/url"
"regexp"
"strconv"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -25,39 +25,35 @@ var (
type FailedQueryCache struct {
regex *regexp.Regexp
errorExtract *regexp.Regexp
lruCache *lru.Cache
lruCache *expirable.LRU[string, int]
cachedHits prometheus.Counter
cachedQueries prometheus.Gauge
}

func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryCache, error) {
func NewFailedQueryCache(capacity int, ttlDuration time.Duration, reg prometheus.Registerer) *FailedQueryCache {
regex := regexp.MustCompile(`[\s\n\t]+`)
errorExtract := regexp.MustCompile(`Code\((\d+)\)`)
lruCache, err := lru.New(capacity)
if err != nil {
lruCache = nil
err = fmt.Errorf("failed to create lru cache: %s", err)
return nil, err
}
lruCacheWithTTL := expirable.NewLRU[string, int](capacity, nil, ttlDuration)

cachedHits := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cached_failed_queries_count",
Help: "Total number of queries that hit the failed query cache.",
Name: "cached_failed_queries_count",
Help: "Total number of queries that hit the failed query cache.",
})
cachedQueries := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "failed_query_cache_size",
Help: "How many queries are cached in the failed query cache.",
Name: "failed_query_cache_size",
Help: "How many queries are cached in the failed query cache.",
})
cachedQueries.Set(0)

return &FailedQueryCache{
regex: regex,
errorExtract: errorExtract,
lruCache: lruCache,
lruCache: lruCacheWithTTL,
cachedHits: cachedHits,
cachedQueries: cachedQueries,
}, err
}
}

// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
Expand Down Expand Up @@ -92,19 +88,20 @@ func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNorm

func (f *FailedQueryCache) addCacheEntry(queryExpressionNormalized string, queryExpressionRangeLength int) {
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
if contains := f.lruCache.Contains(queryExpressionNormalized); contains {
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue)
}
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
}
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)

f.cachedQueries.Set(float64(f.lruCache.Len()))
}

// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits prometheus.Counter) (bool, string) {
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength {
cachedQueryRangeSeconds := value.(int)
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *expirable.LRU[string, int], cachedHits prometheus.Counter) (bool, string) {
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value <= queryExpressionRangeLength {
cachedQueryRangeSeconds := value
message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil)
cachedHits.Inc()
return true, message
Expand Down Expand Up @@ -159,7 +156,7 @@ func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values, q
queryExpressionRangeLength := getQueryRangeSeconds(query)
// TODO(hc.zhu): add a flag for the threshold
// The current gateway timeout is 5 minutes, so we cache the failed query running longer than 5 minutes - 10 seconds.
if queryResponseTime > time.Second * (60 * 5 - 10) {
if queryResponseTime > time.Second*(60*5-10) {
// Cache long running queries regardless of the error code. The most common case is "context canceled".
f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength)
message := createLogMessage("Cached a failed long running query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
Expand Down
21 changes: 9 additions & 12 deletions internal/cortex/frontend/transport/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,16 @@ func verifyMetricCount(t *testing.T, reg *prometheus.Registry, expectedCount int

func TestNewFailedQueryCache(t *testing.T) {
reg := prometheus.NewRegistry()
cache, err := NewFailedQueryCache(2, reg)
cache := NewFailedQueryCache(2, 0, reg)
if cache == nil {
t.Fatalf("Expected cache to be created, but got nil")
}
if err != nil {
t.Fatalf("Expected no error message, but got: %s", err.Error())
}
verifyMetricCount(t, reg, 2)
}

func TestUpdateFailedQueryCache(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(3, reg)
cache := NewFailedQueryCache(3, 0, reg)

tests := []struct {
name string
Expand Down Expand Up @@ -206,7 +203,7 @@ func TestUpdateFailedQueryCache(t *testing.T) {
// TestQueryHitCache tests the QueryHitCache method
func TestQueryHitCache(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(2, reg)
cache := NewFailedQueryCache(2, 0, reg)
lruCache := cache.lruCache

lruCache.Add("test_query", 100)
Expand Down Expand Up @@ -289,7 +286,7 @@ func TestQueryHitCache(t *testing.T) {

func TestCacheCounterVec(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(2, reg)
cache := NewFailedQueryCache(2, 0, reg)
lruCache := cache.lruCache

lruCache.Add("test_query", 100)
Expand Down Expand Up @@ -371,12 +368,12 @@ func TestCacheCounterVec(t *testing.T) {

func TestCacheLongRunningFailedQuery(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(3, reg)
cache := NewFailedQueryCache(3, 0, reg)

tests := []struct {
name string
err error
query url.Values
name string
err error
query url.Values
}{
{
name: "No error code in error message",
Expand All @@ -401,7 +398,7 @@ func TestCacheLongRunningFailedQuery(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Long running failed query without an error code
cached, _ := cache.UpdateFailedQueryCache(tt.err, tt.query, time.Second*(5 * 60 - 1))
cached, _ := cache.UpdateFailedQueryCache(tt.err, tt.query, time.Second*(5*60-1))
if !cached {
t.Errorf("Should cache short running failed query without an error code")
}
Expand Down

0 comments on commit f60a77d

Please sign in to comment.