diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index a789b70842b..e0bcc21755a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -110,6 +110,10 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").Int() + includeShards := cmd.Flag("shard.include", "Prometheus shard to compact. May be specified multiple times. Cannot be used with --shards.exclude.").Strings() + + excludeShards := cmd.Flag("shard.exclude", "Prometheus shard to ignore. May be specified multiple times. Cannot be used with --shards.include.").Strings() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -130,6 +134,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *maxCompactionLevel, *blockSyncConcurrency, *compactionConcurrency, + *includeShards, + *excludeShards, ) } } @@ -152,6 +158,8 @@ func runCompact( maxCompactionLevel int, blockSyncConcurrency int, concurrency int, + includeShards []string, + excludeShards []string, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -166,6 +174,10 @@ func runCompact( reg.MustRegister(halted) reg.MustRegister(retried) + if len(includeShards) > 0 && len(excludeShards) > 0 { + return errors.New("Only one of --shard.include and --shard.exclude should be specified") + } + confContentYaml, err := objStoreConfig.Content() if err != nil { return err @@ -184,7 +196,7 @@ func runCompact( }() sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, - blockSyncConcurrency, acceptMalformedIndex) + blockSyncConcurrency, acceptMalformedIndex, includeShards, excludeShards) if err != nil { return errors.Wrap(err, "create syncer") } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 61a9f0be1dc..0803f3a9659 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -50,6 +50,8 @@ type Syncer struct { blockSyncConcurrency int metrics *syncerMetrics acceptMalformedIndex bool + includeShards []string + excludeShards []string } type syncerMetrics struct { @@ -134,7 +136,8 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, + blockSyncConcurrency int, acceptMalformedIndex bool, includeShards []string, excludeShards []string) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -147,6 +150,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, + includeShards: includeShards, + excludeShards: excludeShards, }, nil } @@ -204,6 +209,13 @@ func (c *Syncer) syncMetas(ctx context.Context) error { return } + if shardLabel, ok := meta.Thanos.Labels["prometheus_shard"]; ok { + if c.shouldSkipShard(shardLabel) { + level.Debug(c.logger).Log("msg", "skipping shard", "label", shardLabel) + continue + } + } + c.blocksMtx.Lock() c.blocks[id] = meta c.blocksMtx.Unlock() @@ -308,6 +320,32 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov return true } +func (c *Syncer) shouldSkipShard(shardLabel string) bool { + + // If specified, shard must be present + if len(c.includeShards) > 0 { + for _, shard := range c.includeShards { + if shardLabel == shard { + return false + } + } + return true + } + + // If specified, shard must *not* be present + if len(c.excludeShards) > 0 { + for _, shard := range c.excludeShards { + if shardLabel == shard { + return true + } + } + return false + } + + // No shards specified, default is to include all. + return false +} + // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. func GroupKey(meta metadata.Meta) string { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f58862c0ac0..0618785d6f5 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, []string{}, []string{}) testutil.Ok(t, err) // Generate 15 blocks. Initially the first 10 are synced into memory and only the last @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, []string{}, []string{}) testutil.Ok(t, err) testutil.Ok(t, sy.SyncMetas(ctx)) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 4639fff67af..b0ad8674c8b 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -75,7 +75,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { defer cancel() bkt := inmem.NewBucket() - sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false) + sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, []string{}, []string{}) testutil.Ok(t, err) // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. @@ -103,3 +103,25 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, true, exists) } + +func TestSyncer_ShouldSkipShard_SkipCorrectly(t *testing.T) { + + sy, err := NewSyncer(nil, nil, nil, time.Second, 1, false, []string{}, []string{}) + testutil.Ok(t, err) + + testutil.Equals(t, false, sy.shouldSkipShard("prom-1")) + + include := []string{"prom-2"} + sy, err = NewSyncer(nil, nil, nil, time.Second, 1, false, include, []string{}) + testutil.Ok(t, err) + + testutil.Equals(t, true, sy.shouldSkipShard("prom-1")) + testutil.Equals(t, false, sy.shouldSkipShard("prom-2")) + + exclude := []string{"prom-3"} + sy, err = NewSyncer(nil, nil, nil, time.Second, 1, false, []string{}, exclude) + testutil.Ok(t, err) + + testutil.Equals(t, true, sy.shouldSkipShard("prom-3")) + testutil.Equals(t, false, sy.shouldSkipShard("prom-1")) +}