Skip to content

Commit

Permalink
querier: Add /api/v1/labels support (#905)
Browse files Browse the repository at this point in the history
* Feature: add /api/v1/labels support

Signed-off-by: jojohappy <[email protected]>
  • Loading branch information
jojohappy authored and bwplotka committed Apr 18, 2019
1 parent f847bfe commit 3b9afb4
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver
- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples.
- [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag).
- [#905](https://github.com/improbable-eng/thanos/pull/905) New Query API: /api/v1/labels. Noticed that the API was added in Prometheus v2.6.

New options:

Expand Down
34 changes: 34 additions & 0 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
r.Get("/label/:name/values", instr("label_values", api.labelValues))

r.Get("/series", instr("series", api.series))

r.Get("/labels", instr("label_names", api.labelNames))
}

type queryData struct {
Expand Down Expand Up @@ -614,3 +616,35 @@ func parseDuration(s string) (time.Duration, error) {
}
return 0, fmt.Errorf("cannot parse %q to a valid duration", s)
}

func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
ctx := r.Context()

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames")

names, err := q.LabelNames()
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}

return names, warnings, nil
}
15 changes: 13 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,20 @@ func (q *querier) LabelValues(name string) ([]string, error) {
}

// LabelNames returns all the unique label names present in the block in sorted order.
// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702.
func (q *querier) LabelNames() ([]string, error) {
return nil, errors.New("not implemented")
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Names, nil
}

func (q *querier) Close() error {
Expand Down
43 changes: 41 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,38 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
}

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string

for _, b := range s.blocks {
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")

res := indexr.LabelNames()
sort.Strings(res)

mtx.Lock()
sets = append(sets, res)
mtx.Unlock()

return nil
})
}

s.mtx.RUnlock()

if err := g.Wait(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
}, nil
}

// LabelValues implements the storepb.StoreServer interface.
Expand Down Expand Up @@ -1616,6 +1646,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string {
return res
}

// LabelNames returns a list of label names.
func (r *bucketIndexReader) LabelNames() []string {
res := make([]string, 0, len(r.block.lvals))
for ln, _ := range r.block.lvals {
res = append(res, ln)
}
return res
}

// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()
Expand Down
81 changes: 75 additions & 6 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"google.golang.org/grpc/status"
)

var statusToCode = map[int]codes.Code{
http.StatusBadRequest: codes.InvalidArgument,
http.StatusNotFound: codes.NotFound,
http.StatusUnprocessableEntity: codes.Internal,
http.StatusServiceUnavailable: codes.Unavailable,
http.StatusInternalServerError: codes.Internal,
}

// PrometheusStore implements the store node API on top of the Prometheus remote read API.
type PrometheusStore struct {
logger log.Logger
Expand Down Expand Up @@ -336,10 +344,52 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label {
}

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
u := *p.base
u.Path = path.Join(u.Path, "/api/v1/labels")

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]")
defer span.Finish()

resp, err := p.client.Do(req.WithContext(ctx))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
}

if resp.StatusCode == http.StatusNoContent {
return &storepb.LabelNamesResponse{Names: []string{}}, nil
}

var m struct {
Data []string `json:"data"`
Status string `json:"status"`
Error string `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if m.Status != "success" {
code, exists := statusToCode[resp.StatusCode]
if !exists {
return nil, status.Error(codes.Internal, m.Error)
}
return nil, status.Error(code, m.Error)
}

return &storepb.LabelNamesResponse{Names: m.Data}, nil
}

// LabelValues returns all known label values for a given label name.
Expand All @@ -356,25 +406,44 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

span, ctx := tracing.StartSpan(ctx, "/prom_label_values HTTP[client]")
defer span.Finish()

resp, err := p.client.Do(req.WithContext(ctx))
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
}

if resp.StatusCode == http.StatusNoContent {
return &storepb.LabelValuesResponse{Values: []string{}}, nil
}

var m struct {
Data []string `json:"data"`
Data []string `json:"data"`
Status string `json:"status"`
Error string `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

sort.Strings(m.Data)

if m.Status != "success" {
code, exists := statusToCode[resp.StatusCode]
if !exists {
return nil, status.Error(codes.Internal, m.Error)
}
return nil, status.Error(code, m.Error)
}

return &storepb.LabelValuesResponse{Values: m.Data}, nil
}
45 changes: 43 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,48 @@ func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher)
func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
var (
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
)

for _, st := range s.stores() {
st := st
g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
if r.PartialResponseDisabled {
return err
}

mtx.Lock()
warnings = append(warnings, err.Error())
mtx.Unlock()
return nil
}

mtx.Lock()
warnings = append(warnings, resp.Warnings...)
names = append(names, resp.Names)
mtx.Unlock()

return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeUnsortedSlices(names...),
Warnings: warnings,
}, nil
}

// LabelValues returns all known label values for a given label name.
Expand All @@ -415,7 +456,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
store := st
g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Label: r.Label,
Label: r.Label,
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
Expand Down
Loading

0 comments on commit 3b9afb4

Please sign in to comment.