Skip to content

Commit

Permalink
Enable sharding compact instances by prometheus shard
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrco committed Jun 11, 2019
1 parent f2356eb commit 412ca02
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
14 changes: 13 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

includeShards := cmd.Flag("shard.include", "Prometheus shard to compact. May be specified multiple times. Cannot be used with --shards.exclude.").Strings()

excludeShards := cmd.Flag("shard.exclude", "Prometheus shard to ignore. May be specified multiple times. Cannot be used with --shards.include.").Strings()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -130,6 +134,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*includeShards,
*excludeShards,
)
}
}
Expand All @@ -152,6 +158,8 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
includeShards []string,
excludeShards []string,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand All @@ -166,6 +174,10 @@ func runCompact(
reg.MustRegister(halted)
reg.MustRegister(retried)

if len(includeShards) > 0 && len(excludeShards) > 0 {
return errors.New("Only one of --shard.include and --shard.exclude should be specified")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand All @@ -184,7 +196,7 @@ func runCompact(
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex)
blockSyncConcurrency, acceptMalformedIndex, includeShards, excludeShards)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Syncer struct {
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
includeShards []string
excludeShards []string
}

type syncerMetrics struct {
Expand Down Expand Up @@ -134,7 +136,8 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration,
blockSyncConcurrency int, acceptMalformedIndex bool, includeShards []string, excludeShards []string) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -147,6 +150,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
includeShards: includeShards,
excludeShards: excludeShards,
}, nil
}

Expand Down Expand Up @@ -204,6 +209,13 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return
}

if shardLabel, ok := meta.Thanos.Labels["prometheus_shard"]; ok {
if c.shouldSkipShard(shardLabel) {
level.Debug(c.logger).Log("msg", "skipping shard", "label", shardLabel)
continue
}
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
Expand Down Expand Up @@ -308,6 +320,32 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov
return true
}

func (c *Syncer) shouldSkipShard(shardLabel string) bool {

// If specified, shard must be present
if len(c.includeShards) > 0 {
for _, shard := range c.includeShards {
if shardLabel == shard {
return false
}
}
return true
}

// If specified, shard must *not* be present
if len(c.excludeShards) > 0 {
for _, shard := range c.excludeShards {
if shardLabel == shard {
return true
}
}
return false
}

// No shards specified, default is to include all.
return false
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, []string{}, []string{})
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, []string{}, []string{})
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down
24 changes: 23 additions & 1 deletion pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
defer cancel()

bkt := inmem.NewBucket()
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, []string{}, []string{})
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
Expand Down Expand Up @@ -103,3 +103,25 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}

func TestSyncer_ShouldSkipShard_SkipCorrectly(t *testing.T) {

sy, err := NewSyncer(nil, nil, nil, time.Second, 1, false, []string{}, []string{})
testutil.Ok(t, err)

testutil.Equals(t, false, sy.shouldSkipShard("prom-1"))

include := []string{"prom-2"}
sy, err = NewSyncer(nil, nil, nil, time.Second, 1, false, include, []string{})
testutil.Ok(t, err)

testutil.Equals(t, true, sy.shouldSkipShard("prom-1"))
testutil.Equals(t, false, sy.shouldSkipShard("prom-2"))

exclude := []string{"prom-3"}
sy, err = NewSyncer(nil, nil, nil, time.Second, 1, false, []string{}, exclude)
testutil.Ok(t, err)

testutil.Equals(t, true, sy.shouldSkipShard("prom-3"))
testutil.Equals(t, false, sy.shouldSkipShard("prom-1"))
}

0 comments on commit 412ca02

Please sign in to comment.