Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sharding compact instances by prometheus shard #1245

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

includeShards := cmd.Flag("shard.include", "Prometheus shard to compact. May be specified multiple times. Cannot be used with --shards.exclude.").Strings()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something like this is great, but I have 2 suggestions:

  • I would not tie it to Prometheus. There can be many sources of blocks, including Thanos-receiver. This being said it should be agnosting to Prometheus. It should be agnostic to sharding as well as again, it unnecessarly ties things. I would literally match of external-labels of source blocks. As we do literally that on compactor. It's tied to TSDB blocks. So e.g repeated block-external-label-matcher flag would be nice?
  • I would not reinvent the wheel in terms how to specify this "string". Especially this form does not specify what exactly we include / exclude on. Pod name? replica name? external labels? UIDs? (: I would stick to matchers because that is what we do, we query the blocks like metrics (where each block is single series). We can even reuse internal code for github.com/prometheus/[email protected]/labels/selector.go labels.Matcher. So matcher flag could take literally {cluster=~"monzo-super-cluster-(east|west)", zone="abc"}. What do you think?

In this manner we don't need include/exclude as well (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good - I'm happy not to tie it to prometheus shard and reuse matcher code 👍


excludeShards := cmd.Flag("shard.exclude", "Prometheus shard to ignore. May be specified multiple times. Cannot be used with --shards.include.").Strings()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -130,6 +134,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*includeShards,
*excludeShards,
)
}
}
Expand All @@ -152,6 +158,8 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
includeShards []string,
excludeShards []string,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand All @@ -168,6 +176,10 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

if len(includeShards) > 0 && len(excludeShards) > 0 {
return errors.New("Only one of --shard.include and --shard.exclude should be specified")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand All @@ -186,7 +198,7 @@ func runCompact(
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex)
blockSyncConcurrency, acceptMalformedIndex, includeShards, excludeShards)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Syncer struct {
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
includeShards []string
excludeShards []string
}

type syncerMetrics struct {
Expand Down Expand Up @@ -134,7 +136,8 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration,
blockSyncConcurrency int, acceptMalformedIndex bool, includeShards []string, excludeShards []string) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -147,6 +150,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
includeShards: includeShards,
excludeShards: excludeShards,
}, nil
}

Expand Down Expand Up @@ -204,6 +209,13 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return
}

if shardLabel, ok := meta.Thanos.Labels["prometheus_shard"]; ok {
if c.shouldSkipShard(shardLabel) {
level.Debug(c.logger).Log("msg", "skipping shard", "label", shardLabel)
continue
}
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
Expand Down Expand Up @@ -308,6 +320,32 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov
return true
}

func (c *Syncer) shouldSkipShard(shardLabel string) bool {

// If specified, shard must be present
if len(c.includeShards) > 0 {
for _, shard := range c.includeShards {
if shardLabel == shard {
return false
}
}
return true
}

// If specified, shard must *not* be present
if len(c.excludeShards) > 0 {
for _, shard := range c.excludeShards {
if shardLabel == shard {
return true
}
}
return false
}

// No shards specified, default is to include all.
return false
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, []string{}, []string{})
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, []string{}, []string{})
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down
24 changes: 23 additions & 1 deletion pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
defer cancel()

bkt := inmem.NewBucket()
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, []string{}, []string{})
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
Expand Down Expand Up @@ -103,3 +103,25 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}

func TestSyncer_ShouldSkipShard_SkipCorrectly(t *testing.T) {

sy, err := NewSyncer(nil, nil, nil, time.Second, 1, false, []string{}, []string{})
testutil.Ok(t, err)

testutil.Equals(t, false, sy.shouldSkipShard("prom-1"))

include := []string{"prom-2"}
sy, err = NewSyncer(nil, nil, nil, time.Second, 1, false, include, []string{})
testutil.Ok(t, err)

testutil.Equals(t, true, sy.shouldSkipShard("prom-1"))
testutil.Equals(t, false, sy.shouldSkipShard("prom-2"))

exclude := []string{"prom-3"}
sy, err = NewSyncer(nil, nil, nil, time.Second, 1, false, []string{}, exclude)
testutil.Ok(t, err)

testutil.Equals(t, true, sy.shouldSkipShard("prom-3"))
testutil.Equals(t, false, sy.shouldSkipShard("prom-1"))
}