From 78f3554ca30bd5a4dec01629b8b7b2b0b2b489be Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 10 Nov 2020 17:03:32 -0500 Subject: [PATCH] Address frequent errors logged by compactor regarding meta not found (#327) * Update blocklist in memory for blocks add/removed/compacted by compactor, to reduce compaction noise/errors * Remove unused parameter to updateBlocklist * Test to ensure traces are still Find'able after compaction * Change updateBlocklist to compare blockmeta by id instead of pointer. Add tests * Ignore .vscode * Update changelog --- .gitignore | 1 + CHANGELOG.md | 2 + tempodb/compactor.go | 34 ++++- tempodb/compactor_test.go | 88 ++++++++++++ tempodb/tempodb.go | 35 +++++ tempodb/tempodb_test.go | 280 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 433 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 89360de28d4..09e54314380 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,6 @@ /tempo-vulture /bin .idea +.vscode /dist /example/docker-compose/example-data/tempo \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index ebd87253dc9..e76d4056adc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## master / unreleased +* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327) + ## v0.3.0 * [CHANGE] Bloom filters are now sharded to reduce size and improve caching, as blocks grow. This is a **breaking change** and all data stored before this change will **not** be queryable. [#192](https://github.com/grafana/tempo/pull/192) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 415883a55f0..30972348133 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -59,6 +59,7 @@ func (rw *readerWriter) doCompaction() { rand.Seed(time.Now().Unix()) tenantID := tenants[rand.Intn(len(tenants))].(string) blocklist := rw.blocklist(tenantID) + blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects) start := time.Now() @@ -67,7 +68,7 @@ func (rw *readerWriter) doCompaction() { for { toBeCompacted, hashString := blockSelector.BlocksToCompact() if len(toBeCompacted) == 0 { - level.Info(rw.logger).Log("msg", "failed to find any blocks to compact", "tenantID", tenantID) + level.Info(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID) break } if !rw.compactorSharder.Owns(hashString) { @@ -132,6 +133,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin } recordsPerBlock := (totalRecords / outputBlocks) + var newCompactedBlocks []*encoding.BlockMeta var currentBlock *wal.CompactorBlock var tracker backend.AppendTracker @@ -170,6 +172,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin return errors.Wrap(err, "error making new compacted block") } currentBlock.BlockMeta().CompactionLevel = nextCompactionLevel + newCompactedBlocks = append(newCompactedBlocks, currentBlock.BlockMeta()) } // writing to the current block will cause the id to escape the iterator so we need to make a copy of it @@ -208,12 +211,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin } // mark old blocks compacted so they don't show up in polling - for _, meta := range blockMetas { - if err := rw.c.MarkBlockCompacted(meta.BlockID, tenantID); err != nil { - level.Error(rw.logger).Log("msg", "unable to mark block compacted", "blockID", meta.BlockID, "tenantID", tenantID, "err", err) - metricCompactionErrors.Inc() - } - } + markCompacted(rw, tenantID, blockMetas, newCompactedBlocks) return nil } @@ -269,3 +267,25 @@ func compactionLevelForBlocks(blockMetas []*encoding.BlockMeta) uint8 { return level } + +func markCompacted(rw *readerWriter, tenantID string, oldBlocks []*encoding.BlockMeta, newBlocks []*encoding.BlockMeta) { + for _, meta := range oldBlocks { + // Mark in the backend + if err := rw.c.MarkBlockCompacted(meta.BlockID, tenantID); err != nil { + level.Error(rw.logger).Log("msg", "unable to mark block compacted", "blockID", meta.BlockID, "tenantID", tenantID, "err", err) + metricCompactionErrors.Inc() + } + } + + // Converted outgoing blocks into compacted entries. + newCompactions := make([]*encoding.CompactedBlockMeta, 0, len(oldBlocks)) + for _, newBlock := range oldBlocks { + newCompactions = append(newCompactions, &encoding.CompactedBlockMeta{ + BlockMeta: *newBlock, + CompactedTime: time.Now(), + }) + } + + // Update blocklist in memory + rw.updateBlocklist(tenantID, newBlocks, oldBlocks, newCompactions) +} diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 32677a5028c..e3952835cbd 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -240,3 +240,91 @@ func TestSameIDCompaction(t *testing.T) { } assert.Equal(t, blockCount-blocksPerCompaction, records) } + +func TestCompactionUpdatesBlocklist(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + IndexDownsample: rand.Int()%20 + 1, + BloomFP: .01, + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) + assert.NoError(t, err) + + c.EnableCompaction(&CompactorConfig{ + ChunkSizeBytes: 10, + MaxCompactionRange: 24 * time.Hour, + BlockRetention: 0, + CompactedBlockRetention: 0, + }, &mockSharder{}) + + // Cut x blocks with y records each + blockCount := 5 + recordCount := 1 + cutTestBlocks(t, w, blockCount, recordCount) + + rw := r.(*readerWriter) + rw.pollBlocklist() + + // compact everything + err = rw.compact(rw.blocklist(testTenantID), testTenantID) + assert.NoError(t, err) + + // New blocklist contains 1 compacted block with everything + blocks := rw.blocklist(testTenantID) + assert.Equal(t, 1, len(blocks)) + assert.Equal(t, uint8(1), blocks[0].CompactionLevel) + assert.Equal(t, blockCount*recordCount, blocks[0].TotalObjects) + + // Compacted list contains all old blocks + assert.Equal(t, blockCount, len(rw.compactedBlocklist(testTenantID))) + + // Make sure all expected traces are found. + for i := 0; i < blockCount; i++ { + for j := 0; j < recordCount; j++ { + trace, _, err := rw.Find(context.TODO(), testTenantID, makeTraceID(i, j)) + assert.NotNil(t, trace) + assert.Greater(t, len(trace), 0) + assert.NoError(t, err) + } + } +} + +func cutTestBlocks(t *testing.T, w Writer, blockCount int, recordCount int) { + wal := w.WAL() + for i := 0; i < blockCount; i++ { + head, err := wal.NewBlock(uuid.New(), testTenantID) + assert.NoError(t, err) + + for j := 0; j < recordCount; j++ { + // Use i and j to ensure unique ids + err = head.Write( + makeTraceID(i, j), + []byte{0x01, 0x02, 0x03}) + assert.NoError(t, err, "unexpected error writing rec") + } + + complete, err := head.Complete(wal, &mockSharder{}) + assert.NoError(t, err) + + err = w.WriteBlock(context.Background(), complete) + assert.NoError(t, err) + } +} + +func makeTraceID(i int, j int) []byte { + return []byte{byte(i), byte(j), 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} +} diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 912888cd674..0b82654c46a 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -571,3 +571,38 @@ func (rw *readerWriter) cleanMissingTenants(tenants []string) { } } } + +// updateBlocklist Add and remove regular or compacted blocks from the in-memory blocklist. +// Changes are temporary and will be overwritten on the next poll. +func (rw *readerWriter) updateBlocklist(tenantID string, add []*encoding.BlockMeta, remove []*encoding.BlockMeta, compactedAdd []*encoding.CompactedBlockMeta) { + if tenantID == "" { + return + } + + rw.blockListsMtx.Lock() + defer rw.blockListsMtx.Unlock() + + // ******** Regular blocks ******** + blocklist := rw.blockLists[tenantID] + + matchedRemovals := make(map[uuid.UUID]struct{}) + for _, b := range blocklist { + for _, rem := range remove { + if b.BlockID == rem.BlockID { + matchedRemovals[rem.BlockID] = struct{}{} + } + } + } + + newblocklist := make([]*encoding.BlockMeta, 0, len(blocklist)-len(matchedRemovals)+len(add)) + for _, b := range blocklist { + if _, ok := matchedRemovals[b.BlockID]; !ok { + newblocklist = append(newblocklist, b) + } + } + newblocklist = append(newblocklist, add...) + rw.blockLists[tenantID] = newblocklist + + // ******** Compacted blocks ******** + rw.compactedBlockLists[tenantID] = append(rw.compactedBlockLists[tenantID], compactedAdd...) +} diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index eb771eeeac7..4f0fb4cd6f3 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -311,3 +311,283 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected lastTime = b.StartTime } } + +func TestUpdateBlocklist(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, _, _, err := New(&Config{ + Backend: "local", + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + IndexDownsample: 17, + BloomFP: .01, + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) + assert.NoError(t, err) + + rw := r.(*readerWriter) + + tests := []struct { + name string + existing []*encoding.BlockMeta + add []*encoding.BlockMeta + remove []*encoding.BlockMeta + expected []*encoding.BlockMeta + }{ + { + name: "all nil", + existing: nil, + add: nil, + remove: nil, + expected: nil, + }, + { + name: "add to nil", + existing: nil, + add: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + remove: nil, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + }, + { + name: "add to existing", + existing: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + add: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + remove: nil, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + }, + { + name: "remove from nil", + existing: nil, + add: nil, + remove: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + expected: nil, + }, + { + name: "remove nil", + existing: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + add: nil, + remove: nil, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + }, + { + name: "remove existing", + existing: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + add: nil, + remove: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + }, + { + name: "remove no match", + existing: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + add: nil, + remove: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + }, + { + name: "add and remove", + existing: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + add: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + }, + }, + remove: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rw.blockLists[testTenantID] = tt.existing + rw.updateBlocklist(testTenantID, tt.add, tt.remove, nil) + + assert.Equal(t, len(tt.expected), len(rw.blockLists[testTenantID])) + + for i := range tt.expected { + assert.Equal(t, tt.expected[i].BlockID, rw.blockLists[testTenantID][i].BlockID) + } + }) + } +} + +func TestUpdateBlocklistCompacted(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, _, _, err := New(&Config{ + Backend: "local", + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + IndexDownsample: 17, + BloomFP: .01, + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) + assert.NoError(t, err) + + rw := r.(*readerWriter) + + tests := []struct { + name string + existing []*encoding.CompactedBlockMeta + add []*encoding.CompactedBlockMeta + expected []*encoding.CompactedBlockMeta + }{ + { + name: "all nil", + existing: nil, + add: nil, + expected: nil, + }, + { + name: "add to nil", + existing: nil, + add: []*encoding.CompactedBlockMeta{ + { + BlockMeta: encoding.BlockMeta{ + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + }, + expected: []*encoding.CompactedBlockMeta{ + { + BlockMeta: encoding.BlockMeta{ + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + }, + }, + { + name: "add to existing", + existing: []*encoding.CompactedBlockMeta{ + { + BlockMeta: encoding.BlockMeta{ + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + }, + add: []*encoding.CompactedBlockMeta{ + { + BlockMeta: encoding.BlockMeta{ + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + }, + expected: []*encoding.CompactedBlockMeta{ + { + BlockMeta: encoding.BlockMeta{ + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + }, + }, + { + BlockMeta: encoding.BlockMeta{ + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rw.compactedBlockLists[testTenantID] = tt.existing + rw.updateBlocklist(testTenantID, nil, nil, tt.add) + + assert.Equal(t, len(tt.expected), len(rw.compactedBlockLists[testTenantID])) + + for i := range tt.expected { + assert.Equal(t, tt.expected[i].BlockID, rw.compactedBlockLists[testTenantID][i].BlockID) + } + }) + } +}