Skip to content

Commit

Permalink
Store,Compactor: Thanos sharding (#1583)
Browse files Browse the repository at this point in the history
* Add selector.relabel-config flag && expose advertised label for store gateway

Signed-off-by: jojohappy <[email protected]>

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
jojohappy authored and GiedriusS committed Oct 28, 2019
1 parent 1d15a9a commit 274eec2
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Thanos Query Added `/-/ready` and `/-/healthy` endpoints.
- [#1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag.
- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags.
- [#1583](https://github.com/thanos-io/thanos/pull/1583) Thanos sharding:
- Add relabel config (`--selector.relabel-config-file` and `selector.relabel-config`) into Thanos Store and Compact components.
- For store gateway, advertise labels from "approved" blocks.
- Selecting blocks to serve depends on the result of block labels relabeling.

### Changed

Expand Down
16 changes: 15 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -131,6 +133,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
selectorRelabelConf,
)
}
}
Expand All @@ -153,6 +156,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
selectorRelabelConf *extflag.PathOrContent,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -185,6 +189,16 @@ func runCompact(
return err
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := parseRelabelConfig(relabelContentYaml)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand All @@ -193,7 +207,7 @@ func runCompact(
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex)
blockSyncConcurrency, acceptMalformedIndex, relabelConfig)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ func regCommonTracingFlags(app *kingpin.Application) *extflag.PathOrContent {
false,
)
}

func regSelectorRelabelFlags(cmd *kingpin.CmdClause) *extflag.PathOrContent {
return extflag.RegisterPathOrContent(
cmd,
"selector.relabel-config",
"YAML file that contains relabeling configuration that allows selecting blocks. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config ",
false,
)
}
26 changes: 26 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)

// registerStore registers a store command.
Expand Down Expand Up @@ -58,6 +60,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -87,6 +91,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
MinTime: *minTime,
MaxTime: *maxTime,
},
selectorRelabelConf,
)
}
}
Expand All @@ -113,6 +118,7 @@ func runStore(
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
) error {
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

Expand All @@ -126,6 +132,16 @@ func runStore(
return errors.Wrap(err, "create bucket client")
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := parseRelabelConfig(relabelContentYaml)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand Down Expand Up @@ -156,6 +172,7 @@ func runStore(
verbose,
blockSyncConcurrency,
filterConf,
relabelConfig,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down Expand Up @@ -211,3 +228,12 @@ func runStore(
level.Info(logger).Log("msg", "starting store node")
return nil
}

func parseRelabelConfig(contentYaml []byte) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}

return relabelConfig, nil
}
13 changes: 13 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,18 @@ Flags:
metadata from object storage.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--selector.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration that allows selecting blocks. It
follows native Prometheus relabel-config syntax.
See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (lower priority). Content of YAML file that
contains relabeling configuration that allows
selecting blocks. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
```
14 changes: 14 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ Flags:
RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
--selector.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration that allows selecting blocks. It
follows native Prometheus relabel-config
syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (lower priority). Content of YAML file
that contains relabeling configuration that
allows selecting blocks. It follows native
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config

```

Expand Down
16 changes: 15 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
promlables "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/labels"
Expand Down Expand Up @@ -55,6 +57,7 @@ type Syncer struct {
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
relabelConfig []*relabel.Config
}

type syncerMetrics struct {
Expand Down Expand Up @@ -136,7 +139,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -149,6 +152,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
relabelConfig: relabelConfig,
}, nil
}

Expand Down Expand Up @@ -213,6 +217,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
if err == blockTooFreshSentinelError {
continue
}

if err != nil {
if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {
continue
Expand All @@ -221,6 +226,15 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return
}

// Check for block labels by relabeling.
// If output is empty, the block will be dropped.
lset := promlables.FromMap(meta.Thanos.Labels)
processedLabels := relabel.Process(lset, c.relabelConfig...)
if processedLabels == nil {
level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
continue
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
Expand Down
93 changes: 90 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/labels"
Expand All @@ -25,14 +26,16 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/testutil"
"gopkg.in/yaml.v2"
)

func TestSyncer_SyncMetas_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -72,7 +75,6 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, ids[5:], groups[0].IDs())
})

}

func TestSyncer_GarbageCollect_e2e(t *testing.T) {
Expand All @@ -85,6 +87,8 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
var metas []*metadata.Meta
var ids []ulid.ULID

relabelConfig := make([]*relabel.Config, 0)

for i := 0; i < 10; i++ {
var m metadata.Meta

Expand Down Expand Up @@ -134,7 +138,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -354,3 +358,86 @@ func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels,

return uid, nil
}

func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
var err error

relabelContentYaml := `
- action: drop
regex: "A"
source_labels:
- cluster
`
var relabelConfig []*relabel.Config
err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig)
testutil.Ok(t, err)

extLsets := []labels.Labels{{{Name: "cluster", Value: "A"}}, {{Name: "cluster", Value: "B"}}}

objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
testutil.Ok(t, err)

var ids []ulid.ULID
var metas []*metadata.Meta

for i := 0; i < 16; i++ {
id, err := ulid.New(uint64(i), nil)
testutil.Ok(t, err)

var meta metadata.Meta
meta.Version = 1
meta.ULID = id
meta.Thanos = metadata.Thanos{
Labels: extLsets[i%2].Map(),
}

ids = append(ids, id)
metas = append(metas, &meta)
}
for _, m := range metas[:10] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}

testutil.Ok(t, sy.SyncMetas(ctx))

groups, err := sy.Groups()
testutil.Ok(t, err)
var evenIds []ulid.ULID
for i := 0; i < 10; i++ {
if i%2 != 0 {
evenIds = append(evenIds, ids[i])
}
}
testutil.Equals(t, evenIds, groups[0].IDs())

// Upload last 6 blocks.
for _, m := range metas[10:] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}

// Delete first 4 blocks.
for _, m := range metas[:4] {
testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, m.ULID))
}

testutil.Ok(t, sy.SyncMetas(ctx))

groups, err = sy.Groups()
testutil.Ok(t, err)
evenIds = make([]ulid.ULID, 0)
for i := 4; i < 16; i++ {
if i%2 != 0 {
evenIds = append(evenIds, ids[i])
}
}
testutil.Equals(t, evenIds, groups[0].IDs())
})
}
Loading

0 comments on commit 274eec2

Please sign in to comment.