diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index 4e5bcf87dd..82bf30625e 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -7,6 +7,7 @@ import ( "context" "encoding/base64" "strconv" + "strings" "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" @@ -102,7 +103,11 @@ func (c cacheKey) string() string { // which would end up in wrong query results. matchers := c.key.(cacheKeyExpandedPostings) matchersHash := blake2b.Sum256([]byte(matchers)) - return "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:]) + key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:]) + if len(c.compression) > 0 { + key += ":" + c.compression + } + return key case cacheKeySeries: return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: @@ -111,16 +116,16 @@ func (c cacheKey) string() string { } func labelMatchersToString(matchers []*labels.Matcher) string { - matchersString := "" + sb := strings.Builder{} for i, lbl := range matchers { - matchersString += lbl.String() + sb.WriteString(lbl.String()) if i < len(matchers)-1 { - matchersString += ";" + sb.WriteRune(';') } } - return matchersString + return sb.String() } type cacheKeyPostings labels.Label -type cacheKeyExpandedPostings string +type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache. type cacheKeySeries uint64 diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 778efce1fb..59adf2b8c7 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -43,12 +43,21 @@ func TestCacheKey_string(t *testing.T) { return fmt.Sprintf("P:%s:%s", uid.String(), encodedHash) }(), }, + "postings cache key includes compression scheme": { + key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"}), compressionSchemeStreamedSnappy}, + expected: func() string { + hash := blake2b.Sum256([]byte("foo:bar")) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("P:%s:%s:%s", uid.String(), encodedHash, compressionSchemeStreamedSnappy) + }(), + }, "should stringify series cache key": { key: cacheKey{ulidString, cacheKeySeries(12345), ""}, expected: fmt.Sprintf("S:%s:12345", uid.String()), }, "should stringify expanded postings cache key": { - key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher}))}, + key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher})), ""}, expected: func() string { hash := blake2b.Sum256([]byte(matcher.String())) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) @@ -57,7 +66,7 @@ func TestCacheKey_string(t *testing.T) { }(), }, "should stringify expanded postings cache key when multiple matchers": { - key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher, matcher2}))}, + key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher, matcher2})), ""}, expected: func() string { hash := blake2b.Sum256([]byte(fmt.Sprintf("%s;%s", matcher.String(), matcher2.String()))) encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) @@ -65,6 +74,15 @@ func TestCacheKey_string(t *testing.T) { return fmt.Sprintf("EP:%s:%s", uid.String(), encodedHash) }(), }, + "expanded postings cache key includes compression scheme": { + key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher})), compressionSchemeStreamedSnappy}, + expected: func() string { + hash := blake2b.Sum256([]byte(matcher.String())) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("EP:%s:%s:%s", uid.String(), encodedHash, compressionSchemeStreamedSnappy) + }(), + }, } for testName, testData := range tests { @@ -110,7 +128,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { matchers = append(matchers, labels.MustNewMatcher(t, name, value)) } return cacheKeyExpandedPostings(labelMatchersToString(matchers)) - }()}, + }(), ""}, }, }, } diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index a73a5d64df..8e35f4dca3 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -326,7 +326,7 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [ // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers))}); ok { + if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { return b, true } return nil, false diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 856adb03f3..4f0807ecbd 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -123,7 +123,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) { - key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys))}.string() + key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err) @@ -134,7 +134,7 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) { - key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls))}.string() + key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), ""}.string() // Fetch the keys from memcached in a single request. c.expandedPostingRequests.Add(1)