Skip to content

Commit

Permalink
rebase main and added compression name to key
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jun 7, 2023
1 parent aa52507 commit 5064ead
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 12 deletions.
17 changes: 11 additions & 6 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/base64"
"strconv"
"strings"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -102,7 +103,11 @@ func (c cacheKey) string() string {
// which would end up in wrong query results.
matchers := c.key.(cacheKeyExpandedPostings)
matchersHash := blake2b.Sum256([]byte(matchers))
return "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:])
key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:])
if len(c.compression) > 0 {
key += ":" + c.compression
}
return key
case cacheKeySeries:
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
default:
Expand All @@ -111,16 +116,16 @@ func (c cacheKey) string() string {
}

func labelMatchersToString(matchers []*labels.Matcher) string {
matchersString := ""
sb := strings.Builder{}
for i, lbl := range matchers {
matchersString += lbl.String()
sb.WriteString(lbl.String())
if i < len(matchers)-1 {
matchersString += ";"
sb.WriteRune(';')
}
}
return matchersString
return sb.String()
}

type cacheKeyPostings labels.Label
type cacheKeyExpandedPostings string
type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache.
type cacheKeySeries uint64
24 changes: 21 additions & 3 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@ func TestCacheKey_string(t *testing.T) {
return fmt.Sprintf("P:%s:%s", uid.String(), encodedHash)
}(),
},
"postings cache key includes compression scheme": {
key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"}), compressionSchemeStreamedSnappy},
expected: func() string {
hash := blake2b.Sum256([]byte("foo:bar"))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])

return fmt.Sprintf("P:%s:%s:%s", uid.String(), encodedHash, compressionSchemeStreamedSnappy)
}(),
},
"should stringify series cache key": {
key: cacheKey{ulidString, cacheKeySeries(12345), ""},
expected: fmt.Sprintf("S:%s:12345", uid.String()),
},
"should stringify expanded postings cache key": {
key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher}))},
key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher})), ""},
expected: func() string {
hash := blake2b.Sum256([]byte(matcher.String()))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])
Expand All @@ -57,14 +66,23 @@ func TestCacheKey_string(t *testing.T) {
}(),
},
"should stringify expanded postings cache key when multiple matchers": {
key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher, matcher2}))},
key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher, matcher2})), ""},
expected: func() string {
hash := blake2b.Sum256([]byte(fmt.Sprintf("%s;%s", matcher.String(), matcher2.String())))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])

return fmt.Sprintf("EP:%s:%s", uid.String(), encodedHash)
}(),
},
"expanded postings cache key includes compression scheme": {
key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher})), compressionSchemeStreamedSnappy},
expected: func() string {
hash := blake2b.Sum256([]byte(matcher.String()))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])

return fmt.Sprintf("EP:%s:%s:%s", uid.String(), encodedHash, compressionSchemeStreamedSnappy)
}(),
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -110,7 +128,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
matchers = append(matchers, labels.MustNewMatcher(t, name, value))
}
return cacheKeyExpandedPostings(labelMatchersToString(matchers))
}()},
}(), ""},
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers))}); ok {
if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok {
return b, true
}
return nil, false
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) {
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys))}.string()
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), ""}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err)
Expand All @@ -134,7 +134,7 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe
// and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) {
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls))}.string()
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), ""}.string()

// Fetch the keys from memcached in a single request.
c.expandedPostingRequests.Add(1)
Expand Down

0 comments on commit 5064ead

Please sign in to comment.