Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement federated exemplar API #3846

Merged
merged 15 commits into from
Mar 22, 2021
29 changes: 29 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -103,6 +104,9 @@ func registerQuery(app *extkingpin.App) {
metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
Hidden().PlaceHolder("<metadata>").Strings()

exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
Hidden().PlaceHolder("<exemplar>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -159,6 +163,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Errorf("Address %s is duplicated for --metadata flag.", dup)
}

if dup := firstDuplicate(*exemplarEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --exemplar flag.", dup)
}

httpLogOpts, err := logging.ParseHTTPOptions(*reqLogDecision, reqLogConfig)
if err != nil {
return errors.Wrap(err, "error while parsing config for request logging")
Expand Down Expand Up @@ -223,6 +231,7 @@ func registerQuery(app *extkingpin.App) {
*stores,
*ruleEndpoints,
*metadataEndpoints,
*exemplarEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
*enableRulePartialResponse,
Expand Down Expand Up @@ -279,6 +288,7 @@ func runQuery(
storeAddrs []string,
ruleAddrs []string,
metadataAddrs []string,
exemplarAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
Expand Down Expand Up @@ -329,6 +339,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsExemplarProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_exemplar_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
Expand Down Expand Up @@ -362,12 +378,20 @@ func runQuery(

return specs
},
func() (specs []query.ExemplarSpec) {
for _, addr := range dnsExemplarProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand Down Expand Up @@ -457,6 +481,9 @@ func runQuery(
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
}
return nil
})
}, func(error) {
Expand Down Expand Up @@ -505,6 +532,7 @@ func runQuery(
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
metadata.NewGRPCClient(metadataProxy),
exemplars.NewGRPCClientWithDedup(exemplarsProxy, queryReplicaLabels),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
Expand Down Expand Up @@ -551,6 +579,7 @@ func runQuery(
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarsProxy)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/exthttp"
"github.com/thanos-io/thanos/pkg/extkingpin"
Expand Down Expand Up @@ -230,6 +231,7 @@ func runSidecar(
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
38 changes: 38 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"time"

cortexutil "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -40,6 +41,8 @@ import (
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
Expand Down Expand Up @@ -73,11 +76,13 @@ type QueryAPI struct {
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient
metadatas metadata.UnaryClient
exemplars exemplars.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
enableMetricMetadataPartialResponse bool
enableExemplarPartialResponse bool
disableCORS bool

replicaLabels []string
Expand All @@ -96,6 +101,7 @@ func NewQueryAPI(
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
metadatas metadata.UnaryClient,
exemplars exemplars.UnaryClient,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
Expand All @@ -116,6 +122,7 @@ func NewQueryAPI(
gate: gate,
ruleGroups: ruleGroups,
metadatas: metadatas,
exemplars: exemplars,

enableAutodownsampling: enableAutodownsampling,
enableQueryPartialResponse: enableQueryPartialResponse,
Expand Down Expand Up @@ -155,6 +162,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))

r.Get("/query_exemplars", instr("exemplars", NewExemplarsHandler(qapi.exemplars, qapi.enableExemplarPartialResponse)))
}

type queryData struct {
Expand Down Expand Up @@ -683,6 +692,35 @@ func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(
}
}

// NewExemplarsHandler creates handler compatible with HTTP /api/v1/exemplars [link-to-be-added]
// which uses gRPC Unary Rules API.
func NewExemplarsHandler(client exemplars.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
_, err := cortexutil.ParseTime(r.FormValue("start"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
_, err = cortexutil.ParseTime(r.FormValue("end"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

req := &exemplarspb.ExemplarsRequest{
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
PartialResponseStrategy: ps,
}
exemplarsData, warnings, err := client.Exemplars(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving exemplars")}
}
return exemplarsData, warnings, nil
}
}

var (
infMinTime = time.Unix(math.MinInt64/1000+62135596801, 0)
infMaxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999)
Expand Down
160 changes: 160 additions & 0 deletions pkg/exemplars/exemplars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package exemplars

import (
"context"
"sort"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

var _ UnaryClient = &GRPCClient{}

// UnaryClient is gRPC exemplarspb.Exemplars client which expands streaming exemplars API. Useful for consumers that does not
// support streaming.
type UnaryClient interface {
Exemplars(ctx context.Context, req *exemplarspb.ExemplarsRequest) ([]*exemplarspb.ExemplarData, storage.Warnings, error)
}

// GRPCClient allows to retrieve exemplars from local gRPC streaming server implementation.
// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available.
type GRPCClient struct {
proxy exemplarspb.ExemplarsServer

replicaLabels map[string]struct{}
}

type exemplarsServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
exemplarspb.Exemplars_ExemplarsServer
ctx context.Context

warnings []error
data []*exemplarspb.ExemplarData
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add two additional methods for it yeya24@1b6abfc#diff-d09c4014e056aee5c67a76aca788de273aecb440edefc8a3b040bc77e27f2c38R39-R55, otherwise it will panic.


func (srv *exemplarsServer) Send(res *exemplarspb.ExemplarsResponse) error {
if res.GetWarning() != "" {
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}

if res.GetData() == nil {
return errors.New("empty exemplars data")
}

srv.data = append(srv.data, res.GetData())
return nil
}

func (srv *exemplarsServer) Context() context.Context {
return srv.ctx
}

func NewGRPCClient(es exemplarspb.ExemplarsServer) *GRPCClient {
return NewGRPCClientWithDedup(es, nil)
}

func NewGRPCClientWithDedup(es exemplarspb.ExemplarsServer, replicaLabels []string) *GRPCClient {
c := &GRPCClient{
proxy: es,
replicaLabels: map[string]struct{}{},
}

for _, label := range replicaLabels {
c.replicaLabels[label] = struct{}{}
}
return c
}

func (rr *GRPCClient) Exemplars(ctx context.Context, req *exemplarspb.ExemplarsRequest) ([]*exemplarspb.ExemplarData, storage.Warnings, error) {
resp := &exemplarsServer{ctx: ctx}

if err := rr.proxy.Exemplars(req, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Exemplars")
}

resp.data = dedupExemplarsData(resp.data, rr.replicaLabels)
for _, d := range resp.data {
d.Exemplars = dedupExemplars(d.Exemplars, rr.replicaLabels)
}

return resp.data, resp.warnings, nil
}

func dedupExemplarsData(exemplarsData []*exemplarspb.ExemplarData, replicaLabels map[string]struct{}) []*exemplarspb.ExemplarData {
if len(exemplarsData) == 0 {
return exemplarsData
}

// Sort each exemplar's label names such that they are comparable.
for _, d := range exemplarsData {
sort.Slice(d.SeriesLabels.Labels, func(i, j int) bool {
return d.SeriesLabels.Labels[i].Name < d.SeriesLabels.Labels[j].Name
})
}

// Sort exemplars data such that they appear next to each other.
sort.Slice(exemplarsData, func(i, j int) bool {
return exemplarsData[i].Compare(exemplarsData[j]) < 0
})

i := 0
removeReplicaLabels(exemplarsData[i].SeriesLabels, replicaLabels)
for j := 1; j < len(exemplarsData); j++ {
removeReplicaLabels(exemplarsData[j].SeriesLabels, replicaLabels)
if exemplarsData[i].Compare(exemplarsData[j]) != 0 {
// Effectively retain exemplarsData[j] in the resulting slice.
i++
exemplarsData[i] = exemplarsData[j]
continue
}
}

return exemplarsData[:i+1]
}

func dedupExemplars(exemplars []*exemplarspb.Exemplar, replicaLabels map[string]struct{}) []*exemplarspb.Exemplar {
if len(exemplars) == 0 {
return exemplars
}

for _, e := range exemplars {
sort.Slice(e.Labels.Labels, func(i, j int) bool {
return e.Labels.Labels[i].Name < e.Labels.Labels[j].Name
})
}

sort.Slice(exemplars, func(i, j int) bool {
return exemplars[i].Compare(exemplars[j]) < 0
})

i := 0
removeReplicaLabels(exemplars[i].Labels, replicaLabels)
for j := 1; j < len(exemplars); j++ {
removeReplicaLabels(exemplars[j].Labels, replicaLabels)
if exemplars[i].Compare(exemplars[j]) != 0 {
// Effectively retain exemplars[j] in the resulting slice.
i++
exemplars[i] = exemplars[j]
}
}

return exemplars[:i+1]
}

func removeReplicaLabels(labelSet labelpb.ZLabelSet, replicaLabels map[string]struct{}) {
newLabels := make([]labelpb.ZLabel, 0, len(labelSet.Labels))
for _, l := range labelSet.Labels {
if _, ok := replicaLabels[l.Name]; !ok {
newLabels = append(newLabels, l)
}
}

labelSet.Labels = newLabels
}
Loading