Skip to content

Commit

Permalink
Add a simple config for compaction planning
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Jul 16, 2024
1 parent fc4a911 commit 2a5f2b6
Showing 1 changed file with 77 additions and 5 deletions.
82 changes: 77 additions & 5 deletions pkg/metastore/metastore_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,60 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
)

type compactionPlan struct {
jobsMutex sync.Mutex
jobsByName map[string]*compactorv1.CompactionJob
queuedBlocksByLevel map[uint32][]*metastorev1.BlockMeta
var (
// TODO aleks: for illustration purposes, to be moved externally
globalCompactionStrategy = compactionStrategy{
levels: map[uint32]compactionLevelStrategy{
0: {
minBlocks: 10,
maxBlocks: 20,
minTotalSizeBytes: 2 << 18, // 512KB
maxTotalSizeBytes: 2 << 20, // 2MB
},
},
defaultStrategy: compactionLevelStrategy{
minBlocks: 10,
maxBlocks: 0,
minTotalSizeBytes: 0,
maxTotalSizeBytes: 0,
},
maxCompactionLevel: 10,
}
)

type compactionStrategy struct {
levels map[uint32]compactionLevelStrategy
defaultStrategy compactionLevelStrategy
maxCompactionLevel uint32
}

type compactionLevelStrategy struct {
minBlocks int
maxBlocks int
minTotalSizeBytes uint64
maxTotalSizeBytes uint64
}

func getStrategyForLevel(compactionLevel uint32) compactionLevelStrategy {
strategy, ok := globalCompactionStrategy.levels[compactionLevel]
if !ok {
strategy = globalCompactionStrategy.defaultStrategy
}
return strategy
}

func (s compactionLevelStrategy) shouldCreateJob(blocks []*metastorev1.BlockMeta) bool {
totalSizeBytes := getTotalSize(blocks)
enoughBlocks := len(blocks) >= s.minBlocks
enoughData := totalSizeBytes >= s.minTotalSizeBytes
if enoughBlocks && enoughData {
return true
} else if enoughBlocks {
return len(blocks) >= s.maxBlocks
} else if enoughData {
return totalSizeBytes >= s.maxTotalSizeBytes
}
return false
}

type Planner struct {
Expand All @@ -31,6 +81,12 @@ func NewPlanner(state *metastoreState, logger log.Logger) *Planner {
}
}

type compactionPlan struct {
jobsMutex sync.Mutex
jobsByName map[string]*compactorv1.CompactionJob
queuedBlocksByLevel map[uint32][]*metastorev1.BlockMeta
}

func (m *Metastore) GetCompactionJobs(_ context.Context, req *compactorv1.GetCompactionRequest) (*compactorv1.GetCompactionResponse, error) {
m.state.compactionPlansMutex.Lock()
defer m.state.compactionPlansMutex.Unlock()
Expand All @@ -56,6 +112,11 @@ func (m *metastoreState) addForCompaction(block *metastorev1.BlockMeta) *compact
plan.jobsMutex.Lock()
defer plan.jobsMutex.Unlock()

if block.CompactionLevel > globalCompactionStrategy.maxCompactionLevel {
level.Info(m.logger).Log("msg", "skipping block at max compaction level", "block", block.Id, "compaction_level", block.CompactionLevel)
return nil
}

queuedBlocks := append(plan.queuedBlocksByLevel[block.CompactionLevel], block)
plan.queuedBlocksByLevel[block.CompactionLevel] = queuedBlocks

Expand All @@ -68,8 +129,10 @@ func (m *metastoreState) addForCompaction(block *metastorev1.BlockMeta) *compact
"size", block.Size,
"queue_size", len(queuedBlocks))

strategy := getStrategyForLevel(block.CompactionLevel)

var job *compactorv1.CompactionJob
if len(queuedBlocks) >= 10 { // TODO aleks: add block size sum to the condition
if strategy.shouldCreateJob(queuedBlocks) {
job = &compactorv1.CompactionJob{
Name: fmt.Sprintf("L%d-S%d-%d", block.CompactionLevel, block.Shard, calculateHash(queuedBlocks)),
Blocks: queuedBlocks,
Expand All @@ -81,6 +144,7 @@ func (m *metastoreState) addForCompaction(block *metastorev1.BlockMeta) *compact
"msg", "created compaction job",
"job", job.Name,
"blocks", len(queuedBlocks),
"blocks_bytes", getTotalSize(queuedBlocks),
"shard", block.Shard,
"tenant", block.TenantId,
"compaction_level", block.CompactionLevel)
Expand All @@ -90,6 +154,14 @@ func (m *metastoreState) addForCompaction(block *metastorev1.BlockMeta) *compact
return job
}

func getTotalSize(blocks []*metastorev1.BlockMeta) uint64 {
totalSizeBytes := uint64(0)
for _, block := range blocks {
totalSizeBytes += block.Size
}
return totalSizeBytes
}

func calculateHash(blocks []*metastorev1.BlockMeta) uint64 {
b := make([]byte, 0, 1024)
for _, blk := range blocks {
Expand Down

0 comments on commit 2a5f2b6

Please sign in to comment.