diff --git a/CHANGELOG.md b/CHANGELOG.md index c4a01e00e0..f75e4c4bfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type - [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings - [#1395](https://github.com/thanos-io/thanos/pull/1395) Added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar. +- [#1059](https://github.com/improbable-eng/thanos/pull/1059) Store: Added label selecting. This allows store gateway to serve only certain TSDB blocks from object store. ### Changed @@ -485,4 +486,4 @@ Initial version to have a stable reference before [gossip protocol removal](/doc - Compact / Downsample offline commands. - Bucket commands. - Downsampling support for UI. -- Grafana dashboards for Thanos components. +- Grafana dashboards for Thanos components. \ No newline at end of file diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 23a97b344a..6e4be396ed 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -11,6 +11,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/runutil" @@ -56,12 +57,20 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z")) + selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). + PlaceHolder("=\"\"").Strings() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", minTime, maxTime) } + selectorLset, err := parseFlagLabels(*selectorLabels) + if err != nil { + return errors.Wrap(err, "parse selector labels") + } + return runStore(g, logger, reg, @@ -85,6 +94,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string MinTime: *minTime, MaxTime: *maxTime, }, + selectorLset, ) } } @@ -111,6 +121,7 @@ func runStore( syncInterval time.Duration, blockSyncConcurrency int, filterConf *store.FilterConfig, + selectorLset labels.Labels, ) error { { confContentYaml, err := objStoreConfig.Content() @@ -153,6 +164,7 @@ func runStore( verbose, blockSyncConcurrency, filterConf, + selectorLset, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/docs/components/store.md b/docs/components/store.md index 69c9db8de8..eabed556dc 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -98,6 +98,9 @@ Flags: RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y. + --selector-label=="" ... + Query selector labels that will be exposed in + info endpoint (repeated). ``` diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 100b063c41..da0832205c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -215,7 +215,8 @@ type BucketStore struct { samplesLimiter *Limiter partitioner partitioner - filterConfig *FilterConfig + filterConfig *FilterConfig + selectorLabels labels.Labels } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -232,6 +233,7 @@ func NewBucketStore( debugLogging bool, blockSyncConcurrency int, filterConf *FilterConfig, + selectorLabels labels.Labels, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -266,6 +268,7 @@ func NewBucketStore( samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, filterConfig: filterConf, + selectorLabels: selectorLabels, } s.metrics = metrics @@ -519,12 +522,19 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { // Info implements the storepb.StoreServer interface. func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { mint, maxt := s.TimeRange() - // Store nodes hold global data and thus have no labels. - return &storepb.InfoResponse{ + + infoResp := &storepb.InfoResponse{ StoreType: component.Store.ToProto(), MinTime: mint, MaxTime: maxt, - }, nil + Labels: make([]storepb.Label, 0, len(s.selectorLabels)), + } + + for _, lset := range s.selectorLabels { + infoResp.Labels = append(infoResp.Labels, storepb.Label{Name: lset.Name, Value: lset.Value}) + } + + return infoResp, nil } func (s *BucketStore) limitMinTime(mint int64) int64 { @@ -772,7 +782,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill } // Series implements the storepb.StoreServer interface. -func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { +func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { { span, _ := tracing.StartSpan(srv.Context(), "store_query_gate_ismyturn") err := s.queryGate.IsMyTurn(srv.Context()) @@ -783,10 +793,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } defer s.queryGate.Done() - matchers, err := translateMatchers(req.Matchers) + match, matchers, err := matchesExternalLabels(req.Matchers, s.selectorLabels) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } + if !match { + return nil + } + + newMatchers, err := translateMatchers(matchers) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + req.MinTime = s.limitMinTime(req.MinTime) req.MaxTime = s.limitMaxTime(req.MaxTime) @@ -799,7 +818,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.mtx.RLock() for _, bs := range s.blockSets { - blockMatchers, ok := bs.labelMatchers(matchers...) + blockMatchers, ok := bs.labelMatchers(newMatchers...) if !ok { continue } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6e165d75ca..c4ea8c8dae 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -150,7 +150,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m maxTime: maxTime, } - store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf) + store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf, labels.FromStrings("c", "1")) testutil.Ok(t, err) s.store = store @@ -191,6 +191,10 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { testutil.Ok(t, err) testutil.Equals(t, []string{"1", "2"}, vals.Values) + names, err := s.store.LabelNames(ctx, &storepb.LabelNamesRequest{}) + testutil.Ok(t, err) + testutil.Equals(t, []string{"a", "b", "c"}, names.Names) + // TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching. for i, tcase := range []struct { req *storepb.SeriesRequest @@ -367,6 +371,15 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MaxTime: maxt, }, }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "c", Value: "2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + }, } { t.Log("Run ", i) @@ -484,7 +497,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, - }) + }, nil) testutil.Ok(t, err) err = store.SyncBlocks(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 13fe7e8b92..5a1bb749a5 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -422,12 +422,16 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "bucketstore-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, labels.FromStrings("a", "b", "c", "d")) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) testutil.Ok(t, err) + testutil.Equals(t, []storepb.Label{ + {Name: "a", Value: "b"}, + {Name: "c", Value: "d"}, + }, resp.Labels) testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) testutil.Equals(t, int64(math.MaxInt64), resp.MinTime) testutil.Equals(t, int64(math.MinInt64), resp.MaxTime) @@ -480,7 +484,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: hourBefore, - }) + }, nil) testutil.Ok(t, err) inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)