Skip to content

Commit

Permalink
Run compaction over existing bloom blocks (grafana#11486)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Upon changes to TSDB indexes, the existing blooms needs to be amended
with newly added chunks, or new series should be added to blocks.
This PR compares the existing meta information with the most up to date
TSDB information in the job. In case of no changes to indexes,
compaction is skipped; if there's any block needs amending, it does that
and merges all blocks in fp range; if there's no previous compaction, it
compacts from scratch.

**Special notes for your reviewer**:
This logic does not support cutting blocks upon topology changes to
bloom-compactors. It can create blocks with series outside of the fp
range of the compactor. Cutting blocks will be addressed in a follow-up
PR.
  • Loading branch information
poyzannur authored and pull[bot] committed Mar 22, 2024
1 parent 42eb3be commit fa5faa6
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 117 deletions.
3 changes: 0 additions & 3 deletions pkg/bloomcompactor/TODO.md

This file was deleted.

154 changes: 81 additions & 73 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,115 +463,123 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
var bloomBlocksRefs []bloomshipper.BlockRef
var tombstonedBlockRefs []bloomshipper.BlockRef
var activeBloomBlocksRefs []bloomshipper.BlockRef

metas, err := c.bloomShipperClient.GetMetas(ctx, metaSearchParams)
if err != nil {
return err
}

if len(metas) == 0 {
localDst := createLocalDirName(c.cfg.WorkingDirectory, job)
defer func() {
//clean up the bloom directory
if err := os.RemoveAll(localDst); err != nil {
level.Error(logger).Log("msg", "failed to remove block directory", "dir", localDst, "err", err)
}
}()
// TODO This logic currently is NOT concerned with cutting blocks upon topology changes to bloom-compactors.
// It may create blocks with series outside of the fp range of the compactor. Cutting blocks will be addressed in a follow-up PR.
metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job)

localDst := createLocalDirName(c.cfg.WorkingDirectory, job)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip())

defer func() {
//clean up the bloom directory
if err := os.RemoveAll(localDst); err != nil {
level.Error(logger).Log("msg", "failed to remove block directory", "dir", localDst, "err", err)
}
}()

var resultingBlock bloomshipper.Block
defer func() {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
}
}()

if len(blocksMatchingJob) == 0 && len(metasMatchingJob) > 0 {
// There is no change to any blocks, no compaction needed
level.Info(logger).Log("msg", "No changes to tsdb, no compaction needed")
return nil
} else if len(metasMatchingJob) == 0 {
// No matching existing blocks for this job, compact all series from scratch

blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip())
builder, err := NewPersistentBlockBuilder(localDst, blockOptions)
if err != nil {
level.Error(logger).Log("msg", "creating block builder", "err", err)
level.Error(logger).Log("msg", "failed creating block builder", "err", err)
return err
}

fpRate := c.limits.BloomFalsePositiveRate(job.tenantID)
storedBlock, err := compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder)
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder)
if err != nil {
return level.Error(logger).Log("msg", "failed to compact new chunks", "err", err)
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}

archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
} else if len(blocksMatchingJob) > 0 {
// When already compacted metas exists, we need to merge all blocks with amending blooms with new series

blockToUpload, err := c.compressBloomBlock(storedBlock, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "putting blocks to storage", "err", err)
return err
}
var populate = createPopulateFunc(ctx, logger, job, storeClient, bt)

seriesIter := makeSeriesIterFromSeriesMeta(job)

blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory)
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "removing archive file", "err", err, "file", archivePath)
for _, path := range blockPaths {
if err := os.RemoveAll(path); err != nil {
level.Error(logger).Log("msg", "failed removing uncompressed bloomDir", "dir", path, "err", err)
}
}
}()

// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
if err != nil {
level.Error(logger).Log("msg", "putting blocks to storage", "err", err)
return err
}

// all blocks are new and active blocks
for _, block := range storedBlocks {
bloomBlocksRefs = append(bloomBlocksRefs, block.BlockRef)
}
} else {
// TODO complete part 2 - periodic compaction for delta from previous period
// When already compacted metas exists

// Take the seriesFP, query the org_chunks from storage and query the blooms.
// compare the checksums of the indexes
// if they match - all good nothing to do
//else {
//get all chunks
//}

// Deduplicate index paths
uniqueIndexPaths := make(map[string]struct{})

for _, meta := range metas {
for _, blockRef := range meta.Blocks {
uniqueIndexPaths[blockRef.IndexPath] = struct{}{}
// ...

// the result should return a list of active
// blocks and tombstoned bloom blocks.
}
mergeBlockBuilder, err := NewPersistentBlockBuilder(localDst, blockOptions)
if err != nil {
level.Error(logger).Log("msg", "failed creating block builder", "err", err)
return err
}

resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
if err != nil {
level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err)
return err
}
}

// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
Tombstones: tombstonedBlockRefs,
Blocks: bloomBlocksRefs,
}
err = c.bloomShipperClient.PutMeta(ctx, meta)
archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())

blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "putting meta.json to storage", "err", err)
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
return nil
}
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
}
}()

func (c *Compactor) compressBloomBlock(storedBlock bloomshipper.Block, archivePath, localDst string, logger log.Logger) (bloomshipper.Block, error) {
blockToUpload := bloomshipper.Block{}
archiveFile, err := os.Create(archivePath)
// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
if err != nil {
return blockToUpload, err
level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err)
return err
}

err = v1.TarGz(archiveFile, v1.NewDirectoryBlockReader(localDst))
if err != nil {
level.Error(logger).Log("msg", "creating bloom block archive file", "err", err)
return blockToUpload, err
// all blocks are new and active blocks
for _, block := range storedBlocks {
activeBloomBlocksRefs = append(activeBloomBlocksRefs, block.BlockRef)
}

blockToUpload.BlockRef = storedBlock.BlockRef
blockToUpload.Data = archiveFile
return blockToUpload, nil
// TODO delete old metas in later compactions
// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
Tombstones: blocksMatchingJob,
Blocks: activeBloomBlocksRefs,
}
err = c.bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
return err
}
return nil
}
22 changes: 17 additions & 5 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom])
return p.builder.BuildFrom(itr)
}

func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (uint32, error) {
return builder.Build(p.builder)
}

func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) {
blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName))
if err != nil {
Expand All @@ -80,7 +84,7 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
return chunkRefs
}

func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks []chunk.Chunk) v1.SeriesWithBloom {
func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks []chunk.Chunk) (v1.SeriesWithBloom, error) {
// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
Expand All @@ -92,8 +96,11 @@ func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compa
}

// Tokenize data into n-grams
_ = tokenizer.PopulateSeriesWithBloom(&bloomForChks, chunks)
return bloomForChks
err := tokenizer.PopulateSeriesWithBloom(&bloomForChks, chunks)
if err != nil {
return v1.SeriesWithBloom{}, err
}
return bloomForChks, nil
}

// TODO Test this when bloom block size check is implemented
Expand Down Expand Up @@ -165,7 +172,7 @@ func compactNewChunks(
// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
if err != nil {
level.Error(logger).Log("msg", "building bloomBlocks", "err", err)
level.Error(logger).Log("msg", "failed building bloomBlocks", "err", err)
return bloomshipper.Block{}, err
}

Expand Down Expand Up @@ -215,7 +222,12 @@ func (it *lazyBloomBuilder) Next() bool {
return false
}

it.cur = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
return false
}
return true
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func TestChunkCompactor_BuildBloomFromSeries(t *testing.T) {
chunks := []chunk.Chunk{createTestChunk(fp, label)}

mbt := mockBloomTokenizer{}
bloom := buildBloomFromSeries(seriesMeta, fpRate, &mbt, chunks)
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, chunks)
require.NoError(t, err)
require.Equal(t, seriesMeta.seriesFP, bloom.Series.Fingerprint)
require.Equal(t, chunks, mbt.chunks)
}
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestLazyBloomBuilder(t *testing.T) {
require.Equal(t, 6, mcc.chunkCount)
require.Equal(t, fp3, it.At().Series.Fingerprint)

// interator is done
// iterator is done
require.False(t, it.Next())
require.Error(t, io.EOF, it.Err())
require.Equal(t, v1.SeriesWithBloom{}, it.At())
Expand Down
Loading

0 comments on commit fa5faa6

Please sign in to comment.