Skip to content

Commit

Permalink
*: get rid of store info api (thanos-io#7704)
Browse files Browse the repository at this point in the history
We support the Info gRPC api for 3 years now. We used to use Store API
Info as fallback if we encounter an endpoint that does not implement
Info gRPC but that should not happen now anymore.

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored and fpetkovski committed Oct 7, 2024
1 parent 64818ba commit f13720a
Show file tree
Hide file tree
Showing 21 changed files with 135 additions and 2,046 deletions.
3 changes: 1 addition & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func registerQuery(app *extkingpin.App) {
enableQueryPushdown,
*alertQueryURL,
*grpcProxyStrategy,
component.Query,
*queryTelemetryDurationQuantiles,
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
Expand Down Expand Up @@ -414,7 +413,6 @@ func runQuery(
enableQueryPushdown bool,
alertQueryURL string,
grpcProxyStrategy string,
comp component.Component,
queryTelemetryDurationQuantiles []float64,
queryTelemetrySamplesQuantiles []float64,
queryTelemetrySeriesQuantiles []float64,
Expand All @@ -424,6 +422,7 @@ func runQuery(
queryMode queryMode,
enableExtendedFunctions bool,
) error {
comp := component.Query
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
if lastColon != -1 {
Expand Down
57 changes: 21 additions & 36 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

package component

import (
"strings"

"github.com/thanos-io/thanos/pkg/store/storepb"
)

// Component is a generic component interface.
type Component interface {
String() string
Expand All @@ -18,7 +12,6 @@ type Component interface {
type StoreAPI interface {
implementsStoreAPI()
String() string
ToProto() storepb.StoreType
}

// Source is a Thanos component that produce blocks of metrics.
Expand All @@ -33,7 +26,6 @@ type SourceStoreAPI interface {
implementsStoreAPI()
producesBlocks()
String() string
ToProto() storepb.StoreType
}

type component struct {
Expand All @@ -48,14 +40,6 @@ type storeAPI struct {

func (storeAPI) implementsStoreAPI() {}

func (s sourceStoreAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

func (s storeAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

type source struct {
component
}
Expand All @@ -68,26 +52,6 @@ type sourceStoreAPI struct {
storeAPI
}

// FromProto converts from a gRPC StoreType to StoreAPI.
func FromProto(storeType storepb.StoreType) StoreAPI {
switch storeType {
case storepb.StoreType_QUERY:
return Query
case storepb.StoreType_RULE:
return Rule
case storepb.StoreType_SIDECAR:
return Sidecar
case storepb.StoreType_STORE:
return Store
case storepb.StoreType_RECEIVE:
return Receive
case storepb.StoreType_DEBUG:
return Debug
default:
return UnknownStoreAPI
}
}

func FromString(storeType string) StoreAPI {
switch storeType {
case "query":
Expand All @@ -111,6 +75,7 @@ var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Mark = source{component: component{name: "mark"}}
Upload = source{component: component{name: "upload"}}
Rewrite = source{component: component{name: "rewrite"}}
Retention = source{component: component{name: "retention"}}
Compact = source{component: component{name: "compact"}}
Expand All @@ -124,4 +89,24 @@ var (
Store = storeAPI{component: component{name: "store"}}
UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}}
Query = storeAPI{component: component{name: "query"}}

All = []Component{
Bucket,
Cleanup,
Mark,
Upload,
Rewrite,
Retention,
Compact,
Downsample,
Replicate,
QueryFrontend,
Debug,
Receive,
Rule,
Sidecar,
Store,
UnknownStoreAPI,
Query,
}
)
112 changes: 13 additions & 99 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,91 +78,9 @@ func (es *endpointRef) Metadata(ctx context.Context, infoClient infopb.InfoClien
return &endpointMetadata{resp}, nil
}
}

// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
if storeClient != nil {
metadata, err := es.getMetadataUsingStoreAPI(ctx, storeClient)
if err != nil {
return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr)
}
return metadata, nil
}

return nil, errors.New(noMetadataEndpointMessage)
}

func (es *endpointRef) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) {
resp, err := client.Info(ctx, &storepb.InfoRequest{})
if err != nil {
return nil, err
}

infoResp := fillExpectedAPIs(component.FromProto(resp.StoreType), resp.MinTime, resp.MaxTime)
infoResp.LabelSets = resp.LabelSets
infoResp.ComponentType = component.FromProto(resp.StoreType).String()

return &endpointMetadata{
&infoResp,
}, nil
}

func fillExpectedAPIs(componentType component.Component, mintime, maxTime int64) infopb.InfoResponse {
switch componentType {
case component.Sidecar:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
Targets: &infopb.TargetsInfo{},
MetricMetadata: &infopb.MetricMetadataInfo{},
Exemplars: &infopb.ExemplarsInfo{},
}
case component.Query:
{
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
Targets: &infopb.TargetsInfo{},
MetricMetadata: &infopb.MetricMetadataInfo{},
Exemplars: &infopb.ExemplarsInfo{},
Query: &infopb.QueryAPIInfo{},
}
}
case component.Receive:
{
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Exemplars: &infopb.ExemplarsInfo{},
}
}
case component.Store:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
}
case component.Rule:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
}
default:
return infopb.InfoResponse{}
}
}

// stringError forces the error to be a string
// when marshaled into a JSON.
type stringError struct {
Expand Down Expand Up @@ -195,7 +113,7 @@ type EndpointStatus struct {
// TODO(hitanshu-mehta) Currently,only collecting metrics of storeEndpoints. Make this struct generic.
type endpointSetNodeCollector struct {
mtx sync.Mutex
storeNodes map[component.Component]map[string]int
storeNodes map[string]map[string]int
storePerExtLset map[string]int

logger log.Logger
Expand All @@ -209,7 +127,7 @@ func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointS
}
return &endpointSetNodeCollector{
logger: logger,
storeNodes: map[component.Component]map[string]int{},
storeNodes: map[string]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
Expand All @@ -232,8 +150,8 @@ func truncateExtLabels(s string, threshold int) string {
}
return s
}
func (c *endpointSetNodeCollector) Update(nodes map[component.Component]map[string]int) {
storeNodes := make(map[component.Component]map[string]int, len(nodes))
func (c *endpointSetNodeCollector) Update(nodes map[string]map[string]int) {
storeNodes := make(map[string]map[string]int, len(nodes))
storePerExtLset := map[string]int{}

for storeType, occurrencesPerExtLset := range nodes {
Expand All @@ -259,20 +177,16 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
c.mtx.Lock()
defer c.mtx.Unlock()

for storeType, occurrencesPerExtLset := range c.storeNodes {
for k, occurrencesPerExtLset := range c.storeNodes {
for externalLabels, occurrences := range occurrencesPerExtLset {
var storeTypeStr string
if storeType != nil {
storeTypeStr = storeType.String()
}
// Select only required labels.
lbls := []string{}
for _, lbl := range c.labels {
switch lbl {
case string(ExternalLabels):
lbls = append(lbls, externalLabels)
case string(StoreType):
lbls = append(lbls, storeTypeStr)
lbls = append(lbls, k)
}
}
select {
Expand Down Expand Up @@ -451,12 +365,12 @@ func (e *EndpointSet) Update(ctx context.Context) {

// All producers that expose StoreAPI should have unique external labels. Check all which connect to our Querier.
if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 {
stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset] > 0 {

level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket",
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1))
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1))
}
stats[er.ComponentType()][extLset]++
stats[er.ComponentType().String()][extLset]++
}

e.endpointsMetric.Update(stats)
Expand Down Expand Up @@ -918,10 +832,10 @@ type endpointMetadata struct {
*infopb.InfoResponse
}

func newEndpointAPIStats() map[component.Component]map[string]int {
nodes := make(map[component.Component]map[string]int, len(storepb.StoreType_name))
for i := range storepb.StoreType_name {
nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{}
func newEndpointAPIStats() map[string]map[string]int {
nodes := make(map[string]map[string]int, len(component.All))
for _, comp := range component.All {
nodes[comp.String()] = map[string]int{}
}
return nodes
}
Expand Down
Loading

0 comments on commit f13720a

Please sign in to comment.