Skip to content

Commit

Permalink
feat(blooms)!: Index structured metadata into blooms (#14061)
Browse files Browse the repository at this point in the history
Instead of indexing n-grams of the log line content, we index the plain values of the structured metadata keys and values.

Resulting tokens are:
* `name`
* `chunkPrefix + name`
* `value`
* `chunkPrefix + value`
* `name + '=' value`
* `chunkPrefix + name + '=' + value`

Indexed fields (metadata name) are also extracted into the series metadata.
Indexed metadata values cannot be queried with the bloom gateways yet.

This PR does not cleanup unused code used for ngram-tokenization, it is scope for a follow up.

---

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Sep 10, 2024
1 parent 5f78784 commit a2fbaa8
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 177 deletions.
136 changes: 55 additions & 81 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v1

import (
"math"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -92,19 +91,15 @@ func estimatedCount(m uint, p float64) uint {

// Populates a bloom filter(s) with the tokens from the given chunks.
// Called once per series
func (bt *BloomTokenizer) Populate(
blooms v2iter.SizedIterator[*Bloom],
chks v2iter.Iterator[ChunkRefWithIter],
ch chan *BloomCreation,
) {
func (bt *BloomTokenizer) Populate(blooms v2iter.SizedIterator[*Bloom], chks v2iter.Iterator[ChunkRefWithIter], ch chan *BloomCreation) {
clear(bt.cache) // MUST always clear the cache before starting a new series
var next bool

// All but the last bloom are considered full -- send back unaltered
for next = blooms.Next(); next && blooms.Remaining() > 0; next = blooms.Next() {
ch <- &BloomCreation{
Bloom: blooms.At(),
SourceBytesAdded: 0,
Bloom: blooms.At(),
Info: newIndexingInfo(),
}
}

Expand All @@ -118,35 +113,29 @@ func (bt *BloomTokenizer) Populate(
// We have the feeling that the empty blooms may be reused from old blocks.
// Here we log an error if we find an empty bloom.
if bloom.Count() == 0 {
level.Warn(bt.logger).Log(
"msg", "found existing empty bloom",
)
level.Warn(bt.logger).Log("msg", "found existing empty bloom")
}
} else {
bloom = NewBloom()
}

var bytesAdded int
info := newIndexingInfo()

for chks.Next() {
chk := chks.At()
itr := v2iter.NewPeekIter(chk.Itr)

for {
full, newBytes := bt.addChunkToBloom(
bloom,
chk.Ref,
itr,
)
bytesAdded += newBytes
full, chunkStats := bt.addChunkToBloom(bloom, chk.Ref, itr)
info = info.merge(chunkStats)

// If a bloom is full, the chunk wasn't completely added
// so we'll submit this bloom, start a new one, and continue indexing
if full {
bt.sendBloom(ch, bloom, bytesAdded)
bt.sendBloom(ch, bloom, info)

// start a new bloom + reset bytesAdded counter
bytesAdded = 0
// start a new bloom + reset stats
info = newIndexingInfo()
bloom = NewBloom()

// cache _MUST_ be cleared when a new bloom is created to ensure that all tokens from
Expand All @@ -161,21 +150,15 @@ func (bt *BloomTokenizer) Populate(

// TODO(salvacorts): Delete this once we solve the correctness bug
if bloom.Count() == 0 {
level.Warn(bt.logger).Log(
"msg", "resulting bloom is empty",
)
level.Warn(bt.logger).Log("msg", "resulting bloom is empty")
}

// Send the last bloom
bt.sendBloom(ch, bloom, bytesAdded)
bt.sendBloom(ch, bloom, info)
close(ch)
}

func (bt *BloomTokenizer) sendBloom(
ch chan<- *BloomCreation,
bloom *Bloom,
bytesAdded int,
) {
func (bt *BloomTokenizer) sendBloom(ch chan<- *BloomCreation, bloom *Bloom, info indexingInfo) {
fillRatio := bloom.ScalableBloomFilter.FillRatio()
bt.metrics.hammingWeightRatio.Observe(fillRatio)
bt.metrics.estimatedCount.Observe(
Expand All @@ -184,70 +167,57 @@ func (bt *BloomTokenizer) sendBloom(
bt.metrics.bloomSize.Observe(float64(bloom.ScalableBloomFilter.Capacity() / eightBits))
bt.metrics.bloomsTotal.Inc()
ch <- &BloomCreation{
Bloom: bloom,
SourceBytesAdded: bytesAdded,
Bloom: bloom,
Info: info,
}
}

// addChunkToBloom adds the tokens from the given chunk to the given bloom.
// It continues until the chunk is exhausted or the bloom is full.
// NB(owen-d): We ensure the invariant that each line is indexed entirely into at least one bloom.
// This includes both raw ngrams and chunk-prefixed ngrams and is why we use a peeking iterator --
// so we can advance the iterator only after we're sure the bloom has accepted the line.
// This is because the _line_ is the atom in Loki's data model and a query must either match (or not) an individual line.
// Therefore, we index entire lines into a bloom to ensure a lookups are accurate.
func (bt *BloomTokenizer) addChunkToBloom(bloom *Bloom, ref ChunkRef, entryIter v2iter.PeekIterator[push.Entry]) (full bool, bytesAdded int) {
func prefixForChunkRef(chk ChunkRef) []byte {
enc := encoding.EncWith(make([]byte, 0, 20))
enc.PutBE64(uint64(chk.From)) // 8 bytes
enc.PutBE64(uint64(chk.Through)) // 8 bytes
enc.PutBE32(chk.Checksum) // 4 bytes
return enc.Get()
}

// addChunkToBloom adds the values from structured metadata from the entries of the given chunk to the given bloom.
// addChunkToBloom returns true if the bloom has been completely filled, and may not have consumed the entire iterator.
// addChunkToBloom must be called multiple times until returning false with new blooms until the iterator has been fully consumed.
func (bt *BloomTokenizer) addChunkToBloom(bloom *Bloom, ref ChunkRef, entryIter v2iter.PeekIterator[push.Entry]) (bool, indexingInfo) {
var (
tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N(), ref, nil)
tokens int
successfulInserts int
cachedInserts int
collisionInserts int
chunkBytes int
linesAdded int
tokens int
successfulInserts int
cachedInserts int
collisionInserts int
linesAdded int

collision bool
)

// return values
full, info := false, newIndexingInfo()

tokenizer := NewStructuredMetadataTokenizer(string(prefixForChunkRef(ref)))

// We use a peeking iterator to avoid advancing the iterator until we're sure the bloom has accepted the line.
outer:
for entry, ok := entryIter.Peek(); ok; entry, ok = entryIter.Peek() {
line := entry.Line
chunkBytes += len(line)

tokenItrs := []v2iter.Iterator[[]byte]{
// two iterators, one for the raw tokens and one for the chunk prefixed tokens.
// Warning: the underlying line tokenizer (used in both iterators) uses the same buffer for tokens.
// They are NOT SAFE for concurrent use.
NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line)),
bt.lineTokenizer.Tokens(line),
}
for _, kv := range entry.StructuredMetadata {
info.sourceBytes += len(kv.Name) + len(kv.Value)
info.indexedFields.Add(Field(kv.Name))

for _, itr := range tokenItrs {
for itr.Next() {
tok := itr.At()
tokenItr := tokenizer.Tokens(kv)
for tokenItr.Next() {
tok := tokenItr.At()
tokens++

// TODO[owen-d]: [n]byte this
// To avoid allocations, an unsafe string can be used to check ownership in cache.
str := unsafe.String(unsafe.SliceData(tok), len(tok))
// A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if _, found := bt.cache[str]; found {
if _, found := bt.cache[tok]; found {
cachedInserts++
continue
}

// maxBloomSize is in bytes, but blooms operate at the bit level; adjust
var collision bool
collision, full = bloom.ScalableBloomFilter.TestAndAddWithMaxSize(tok, bt.maxBloomSize*eightBits)

if full {
// edge case: one line maxed out the bloom size -- retrying is futile
// (and will loop endlessly), so we'll just skip indexing it
if linesAdded == 0 {
_ = entryIter.Next()
}

break outer
}
collision, full = bloom.ScalableBloomFilter.TestAndAddWithMaxSize([]byte(tok), bt.maxBloomSize*eightBits)

if collision {
collisionInserts++
Expand All @@ -257,8 +227,7 @@ outer:

// only register the key in the cache if it was successfully added to the bloom
// as can prevent us from trying subsequent copies
str = string(tok)
bt.cache[str] = nil
bt.cache[tok] = nil
if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clear(bt.cache)
}
Expand All @@ -268,14 +237,19 @@ outer:
// Only advance the iterator once we're sure the bloom has accepted the line
linesAdded++
_ = entryIter.Next()

// Only break out of the loop if the bloom filter is full after indexing all structured metadata of an entry.
if full {
break
}
}

// update metrics after each chunk added for more consistent reporting
bt.metrics.tokensTotal.Add(float64(tokens))
bt.metrics.insertsTotal.WithLabelValues(collisionTypeFalse).Add(float64(successfulInserts))
bt.metrics.insertsTotal.WithLabelValues(collisionTypeCache).Add(float64(cachedInserts))
bt.metrics.insertsTotal.WithLabelValues(collisionTypeTrue).Add(float64(collisionInserts))
bt.metrics.sourceBytesAdded.Add(float64(chunkBytes))
bt.metrics.sourceBytesAdded.Add(float64(info.sourceBytes))

return full, chunkBytes
return full, info
}
Loading

0 comments on commit a2fbaa8

Please sign in to comment.