Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store: selector labels from store #1059

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type
- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings
- [#1395](https://github.com/thanos-io/thanos/pull/1395) Added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar.
- [#1059](https://github.com/improbable-eng/thanos/pull/1059) Store: Added label selecting. This allows store gateway to serve only certain TSDB blocks from object store.

### Changed

Expand Down Expand Up @@ -485,4 +486,4 @@ Initial version to have a stable reference before [gossip protocol removal](/doc
- Compact / Downsample offline commands.
- Bucket commands.
- Downsampling support for UI.
- Grafana dashboards for Thanos components.
- Grafana dashboards for Thanos components.
12 changes: 12 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -56,12 +57,20 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On each component that works on the object storage (e.g Store GW and Compactor), add --selector.relabel-config (and corresponding --selector.relabel-config-file) as in proposal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On top of this we want to make sure Info() endpoint has exact labels of blocks that we shard.

PlaceHolder("<name>=\"<value>\"").Strings()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
}

selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse selector labels")
}

return runStore(g,
logger,
reg,
Expand All @@ -85,6 +94,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
MinTime: *minTime,
MaxTime: *maxTime,
},
selectorLset,
)
}
}
Expand All @@ -111,6 +121,7 @@ func runStore(
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorLset labels.Labels,
) error {
{
confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -153,6 +164,7 @@ func runStore(
verbose,
blockSyncConcurrency,
filterConf,
selectorLset,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
3 changes: 3 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ Flags:
RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).

```

Expand Down
33 changes: 26 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ type BucketStore struct {
samplesLimiter *Limiter
partitioner partitioner

filterConfig *FilterConfig
filterConfig *FilterConfig
selectorLabels labels.Labels
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -232,6 +233,7 @@ func NewBucketStore(
debugLogging bool,
blockSyncConcurrency int,
filterConf *FilterConfig,
selectorLabels labels.Labels,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -266,6 +268,7 @@ func NewBucketStore(
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
filterConfig: filterConf,
selectorLabels: selectorLabels,
}
s.metrics = metrics

Expand Down Expand Up @@ -519,12 +522,19 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
// Store nodes hold global data and thus have no labels.
return &storepb.InfoResponse{

infoResp := &storepb.InfoResponse{
StoreType: component.Store.ToProto(),
MinTime: mint,
MaxTime: maxt,
}, nil
Labels: make([]storepb.Label, 0, len(s.selectorLabels)),
}

for _, lset := range s.selectorLabels {
infoResp.Labels = append(infoResp.Labels, storepb.Label{Name: lset.Name, Value: lset.Value})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change we need to adjust Querier logic:

// Check if the store has some external labels specified and if any if there are duplicates.

Otherwise, sidecar + store gateway will be assumed a "duplicate" in Querier. Let's think about how we can fix this.

The storeAPI duplication detection logic is embedded in Querier to make sure user doesn't put many sidecars that upload blocks with the same external labels (common bug). I think we might want to change storeset behavior to look on StoreAPI InfoResponse.StoreType

I only don't like the fact that for Querier all of this is no longer some StoreAPI but actually Sidecar, Store...

I might think we actually would need to change StoreType to Producer, Browser as that would be a good abstraction over different implementations.

Thoughts @povilasv @GiedriusS @brancz ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something or shouldn't this just be the set of all labelsets seen in all blocks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

Or better: Let's maybe quickly write up some proposal as we want consistent thing everywhere we do selectors + maybe selecting prefixes as well.

cc @jojohappy

}

return infoResp, nil
}

func (s *BucketStore) limitMinTime(mint int64) int64 {
Expand Down Expand Up @@ -772,7 +782,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill
}

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
{
span, _ := tracing.StartSpan(srv.Context(), "store_query_gate_ismyturn")
err := s.queryGate.IsMyTurn(srv.Context())
Expand All @@ -783,10 +793,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}
defer s.queryGate.Done()

matchers, err := translateMatchers(req.Matchers)
match, matchers, err := matchesExternalLabels(req.Matchers, s.selectorLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
if !match {
return nil
}

newMatchers, err := translateMatchers(matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

req.MinTime = s.limitMinTime(req.MinTime)
req.MaxTime = s.limitMaxTime(req.MaxTime)

Expand All @@ -799,7 +818,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.mtx.RLock()

for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
blockMatchers, ok := bs.labelMatchers(newMatchers...)
if !ok {
continue
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
maxTime: maxTime,
}

store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf)
store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf, labels.FromStrings("c", "1"))
testutil.Ok(t, err)
s.store = store

Expand Down Expand Up @@ -191,6 +191,10 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
testutil.Ok(t, err)
testutil.Equals(t, []string{"1", "2"}, vals.Values)

names, err := s.store.LabelNames(ctx, &storepb.LabelNamesRequest{})
testutil.Ok(t, err)
testutil.Equals(t, []string{"a", "b", "c"}, names.Names)

// TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching.
for i, tcase := range []struct {
req *storepb.SeriesRequest
Expand Down Expand Up @@ -367,6 +371,15 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "c", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
},
} {
t.Log("Run ", i)

Expand Down Expand Up @@ -484,7 +497,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
&FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
}, nil)
testutil.Ok(t, err)

err = store.SyncBlocks(ctx)
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,16 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "bucketstore-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, labels.FromStrings("a", "b", "c", "d"))
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
testutil.Ok(t, err)

testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "c", Value: "d"},
}, resp.Labels)
testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType)
testutil.Equals(t, int64(math.MaxInt64), resp.MinTime)
testutil.Equals(t, int64(math.MinInt64), resp.MaxTime)
Expand Down Expand Up @@ -480,7 +484,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
&FilterConfig{
MinTime: minTimeDuration,
MaxTime: hourBefore,
})
}, nil)
testutil.Ok(t, err)

inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)
Expand Down