Skip to content

Commit

Permalink
Address frequent errors logged by compactor regarding meta not found (#…
Browse files Browse the repository at this point in the history
…327)

* Update blocklist in memory for blocks add/removed/compacted by compactor, to reduce compaction noise/errors

* Remove unused parameter to updateBlocklist

* Test to ensure traces are still Find'able after compaction

* Change updateBlocklist to compare blockmeta by id instead of pointer. Add tests

* Ignore .vscode

* Update changelog
  • Loading branch information
mdisibio authored Nov 10, 2020
1 parent 7ff3853 commit 78f3554
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
/tempo-vulture
/bin
.idea
.vscode
/dist
/example/docker-compose/example-data/tempo
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## master / unreleased

* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327)

## v0.3.0

* [CHANGE] Bloom filters are now sharded to reduce size and improve caching, as blocks grow. This is a **breaking change** and all data stored before this change will **not** be queryable. [#192](https://github.com/grafana/tempo/pull/192)
Expand Down
34 changes: 27 additions & 7 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (rw *readerWriter) doCompaction() {
rand.Seed(time.Now().Unix())
tenantID := tenants[rand.Intn(len(tenants))].(string)
blocklist := rw.blocklist(tenantID)

blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects)

start := time.Now()
Expand All @@ -67,7 +68,7 @@ func (rw *readerWriter) doCompaction() {
for {
toBeCompacted, hashString := blockSelector.BlocksToCompact()
if len(toBeCompacted) == 0 {
level.Info(rw.logger).Log("msg", "failed to find any blocks to compact", "tenantID", tenantID)
level.Info(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
break
}
if !rw.compactorSharder.Owns(hashString) {
Expand Down Expand Up @@ -132,6 +133,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin
}

recordsPerBlock := (totalRecords / outputBlocks)
var newCompactedBlocks []*encoding.BlockMeta
var currentBlock *wal.CompactorBlock
var tracker backend.AppendTracker

Expand Down Expand Up @@ -170,6 +172,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin
return errors.Wrap(err, "error making new compacted block")
}
currentBlock.BlockMeta().CompactionLevel = nextCompactionLevel
newCompactedBlocks = append(newCompactedBlocks, currentBlock.BlockMeta())
}

// writing to the current block will cause the id to escape the iterator so we need to make a copy of it
Expand Down Expand Up @@ -208,12 +211,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin
}

// mark old blocks compacted so they don't show up in polling
for _, meta := range blockMetas {
if err := rw.c.MarkBlockCompacted(meta.BlockID, tenantID); err != nil {
level.Error(rw.logger).Log("msg", "unable to mark block compacted", "blockID", meta.BlockID, "tenantID", tenantID, "err", err)
metricCompactionErrors.Inc()
}
}
markCompacted(rw, tenantID, blockMetas, newCompactedBlocks)

return nil
}
Expand Down Expand Up @@ -269,3 +267,25 @@ func compactionLevelForBlocks(blockMetas []*encoding.BlockMeta) uint8 {

return level
}

func markCompacted(rw *readerWriter, tenantID string, oldBlocks []*encoding.BlockMeta, newBlocks []*encoding.BlockMeta) {
for _, meta := range oldBlocks {
// Mark in the backend
if err := rw.c.MarkBlockCompacted(meta.BlockID, tenantID); err != nil {
level.Error(rw.logger).Log("msg", "unable to mark block compacted", "blockID", meta.BlockID, "tenantID", tenantID, "err", err)
metricCompactionErrors.Inc()
}
}

// Converted outgoing blocks into compacted entries.
newCompactions := make([]*encoding.CompactedBlockMeta, 0, len(oldBlocks))
for _, newBlock := range oldBlocks {
newCompactions = append(newCompactions, &encoding.CompactedBlockMeta{
BlockMeta: *newBlock,
CompactedTime: time.Now(),
})
}

// Update blocklist in memory
rw.updateBlocklist(tenantID, newBlocks, oldBlocks, newCompactions)
}
88 changes: 88 additions & 0 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,91 @@ func TestSameIDCompaction(t *testing.T) {
}
assert.Equal(t, blockCount-blocksPerCompaction, records)
}

func TestCompactionUpdatesBlocklist(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Pool: &pool.Config{
MaxWorkers: 10,
QueueDepth: 100,
},
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IndexDownsample: rand.Int()%20 + 1,
BloomFP: .01,
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: 24 * time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})

// Cut x blocks with y records each
blockCount := 5
recordCount := 1
cutTestBlocks(t, w, blockCount, recordCount)

rw := r.(*readerWriter)
rw.pollBlocklist()

// compact everything
err = rw.compact(rw.blocklist(testTenantID), testTenantID)
assert.NoError(t, err)

// New blocklist contains 1 compacted block with everything
blocks := rw.blocklist(testTenantID)
assert.Equal(t, 1, len(blocks))
assert.Equal(t, uint8(1), blocks[0].CompactionLevel)
assert.Equal(t, blockCount*recordCount, blocks[0].TotalObjects)

// Compacted list contains all old blocks
assert.Equal(t, blockCount, len(rw.compactedBlocklist(testTenantID)))

// Make sure all expected traces are found.
for i := 0; i < blockCount; i++ {
for j := 0; j < recordCount; j++ {
trace, _, err := rw.Find(context.TODO(), testTenantID, makeTraceID(i, j))
assert.NotNil(t, trace)
assert.Greater(t, len(trace), 0)
assert.NoError(t, err)
}
}
}

func cutTestBlocks(t *testing.T, w Writer, blockCount int, recordCount int) {
wal := w.WAL()
for i := 0; i < blockCount; i++ {
head, err := wal.NewBlock(uuid.New(), testTenantID)
assert.NoError(t, err)

for j := 0; j < recordCount; j++ {
// Use i and j to ensure unique ids
err = head.Write(
makeTraceID(i, j),
[]byte{0x01, 0x02, 0x03})
assert.NoError(t, err, "unexpected error writing rec")
}

complete, err := head.Complete(wal, &mockSharder{})
assert.NoError(t, err)

err = w.WriteBlock(context.Background(), complete)
assert.NoError(t, err)
}
}

func makeTraceID(i int, j int) []byte {
return []byte{byte(i), byte(j), 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
}
35 changes: 35 additions & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,38 @@ func (rw *readerWriter) cleanMissingTenants(tenants []string) {
}
}
}

// updateBlocklist Add and remove regular or compacted blocks from the in-memory blocklist.
// Changes are temporary and will be overwritten on the next poll.
func (rw *readerWriter) updateBlocklist(tenantID string, add []*encoding.BlockMeta, remove []*encoding.BlockMeta, compactedAdd []*encoding.CompactedBlockMeta) {
if tenantID == "" {
return
}

rw.blockListsMtx.Lock()
defer rw.blockListsMtx.Unlock()

// ******** Regular blocks ********
blocklist := rw.blockLists[tenantID]

matchedRemovals := make(map[uuid.UUID]struct{})
for _, b := range blocklist {
for _, rem := range remove {
if b.BlockID == rem.BlockID {
matchedRemovals[rem.BlockID] = struct{}{}
}
}
}

newblocklist := make([]*encoding.BlockMeta, 0, len(blocklist)-len(matchedRemovals)+len(add))
for _, b := range blocklist {
if _, ok := matchedRemovals[b.BlockID]; !ok {
newblocklist = append(newblocklist, b)
}
}
newblocklist = append(newblocklist, add...)
rw.blockLists[tenantID] = newblocklist

// ******** Compacted blocks ********
rw.compactedBlockLists[tenantID] = append(rw.compactedBlockLists[tenantID], compactedAdd...)
}
Loading

0 comments on commit 78f3554

Please sign in to comment.