Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: get rid of store info api #7704

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#7494](https://github.com/thanos-io/thanos/pull/7494) Ruler: remove trailing period from SRV records returned by discovery `dnsnosrva` lookups
- [#7567](https://github.com/thanos-io/thanos/pull/7565) Query: Use thanos resolver for endpoint groups.
- [#7704](https://github.com/thanos-io/thanos/pull/7704) *: *breaking :warning:* remove Store gRPC Info function. This has been deprecated for 3 years, its time to remove it.

### Removed

Expand Down
3 changes: 1 addition & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
*alertQueryURL,
*grpcProxyStrategy,
component.Query,
*queryTelemetryDurationQuantiles,
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
Expand Down Expand Up @@ -437,7 +436,6 @@ func runQuery(
disableCORS bool,
alertQueryURL string,
grpcProxyStrategy string,
comp component.Component,
queryTelemetryDurationQuantiles []float64,
queryTelemetrySamplesQuantiles []float64,
queryTelemetrySeriesQuantiles []float64,
Expand All @@ -452,6 +450,7 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
) error {
comp := component.Query
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
if lastColon != -1 {
Expand Down
56 changes: 20 additions & 36 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

package component

import (
"strings"

"github.com/thanos-io/thanos/pkg/store/storepb"
)

// Component is a generic component interface.
type Component interface {
String() string
Expand All @@ -18,7 +12,6 @@ type Component interface {
type StoreAPI interface {
implementsStoreAPI()
String() string
ToProto() storepb.StoreType
}

// Source is a Thanos component that produce blocks of metrics.
Expand All @@ -33,7 +26,6 @@ type SourceStoreAPI interface {
implementsStoreAPI()
producesBlocks()
String() string
ToProto() storepb.StoreType
}

type component struct {
Expand All @@ -48,14 +40,6 @@ type storeAPI struct {

func (storeAPI) implementsStoreAPI() {}

func (s sourceStoreAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

func (s storeAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

type source struct {
component
}
Expand All @@ -68,26 +52,6 @@ type sourceStoreAPI struct {
storeAPI
}

// FromProto converts from a gRPC StoreType to StoreAPI.
func FromProto(storeType storepb.StoreType) StoreAPI {
switch storeType {
case storepb.StoreType_QUERY:
return Query
case storepb.StoreType_RULE:
return Rule
case storepb.StoreType_SIDECAR:
return Sidecar
case storepb.StoreType_STORE:
return Store
case storepb.StoreType_RECEIVE:
return Receive
case storepb.StoreType_DEBUG:
return Debug
default:
return UnknownStoreAPI
}
}

func FromString(storeType string) StoreAPI {
switch storeType {
case "query":
Expand Down Expand Up @@ -125,4 +89,24 @@ var (
Store = storeAPI{component: component{name: "store"}}
UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}}
Query = storeAPI{component: component{name: "query"}}

All = []Component{
Bucket,
Cleanup,
Mark,
Upload,
Rewrite,
Retention,
Compact,
Downsample,
Replicate,
QueryFrontend,
Debug,
Receive,
Rule,
Sidecar,
Store,
UnknownStoreAPI,
Query,
}
)
112 changes: 13 additions & 99 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,91 +83,9 @@ func (es *endpointRef) Metadata(ctx context.Context, infoClient infopb.InfoClien
return &endpointMetadata{resp}, nil
}
}

// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
if storeClient != nil {
metadata, err := es.getMetadataUsingStoreAPI(ctx, storeClient)
if err != nil {
return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr)
}
return metadata, nil
}

return nil, errors.New(noMetadataEndpointMessage)
}

func (es *endpointRef) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) {
resp, err := client.Info(ctx, &storepb.InfoRequest{})
if err != nil {
return nil, err
}

infoResp := fillExpectedAPIs(component.FromProto(resp.StoreType), resp.MinTime, resp.MaxTime)
infoResp.LabelSets = resp.LabelSets
infoResp.ComponentType = component.FromProto(resp.StoreType).String()

return &endpointMetadata{
&infoResp,
}, nil
}

func fillExpectedAPIs(componentType component.Component, mintime, maxTime int64) infopb.InfoResponse {
switch componentType {
case component.Sidecar:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
Targets: &infopb.TargetsInfo{},
MetricMetadata: &infopb.MetricMetadataInfo{},
Exemplars: &infopb.ExemplarsInfo{},
}
case component.Query:
{
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
Targets: &infopb.TargetsInfo{},
MetricMetadata: &infopb.MetricMetadataInfo{},
Exemplars: &infopb.ExemplarsInfo{},
Query: &infopb.QueryAPIInfo{},
}
}
case component.Receive:
{
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Exemplars: &infopb.ExemplarsInfo{},
}
}
case component.Store:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
}
case component.Rule:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
}
default:
return infopb.InfoResponse{}
}
}

// stringError forces the error to be a string
// when marshaled into a JSON.
type stringError struct {
Expand Down Expand Up @@ -199,7 +117,7 @@ type EndpointStatus struct {
// TODO(hitanshu-mehta) Currently,only collecting metrics of storeEndpoints. Make this struct generic.
type endpointSetNodeCollector struct {
mtx sync.Mutex
storeNodes map[component.Component]map[string]int
storeNodes map[string]map[string]int
storePerExtLset map[string]int

logger log.Logger
Expand All @@ -213,7 +131,7 @@ func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointS
}
return &endpointSetNodeCollector{
logger: logger,
storeNodes: map[component.Component]map[string]int{},
storeNodes: map[string]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
Expand All @@ -236,8 +154,8 @@ func truncateExtLabels(s string, threshold int) string {
}
return s
}
func (c *endpointSetNodeCollector) Update(nodes map[component.Component]map[string]int) {
storeNodes := make(map[component.Component]map[string]int, len(nodes))
func (c *endpointSetNodeCollector) Update(nodes map[string]map[string]int) {
storeNodes := make(map[string]map[string]int, len(nodes))
storePerExtLset := map[string]int{}

for storeType, occurrencesPerExtLset := range nodes {
Expand All @@ -263,20 +181,16 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
c.mtx.Lock()
defer c.mtx.Unlock()

for storeType, occurrencesPerExtLset := range c.storeNodes {
for k, occurrencesPerExtLset := range c.storeNodes {
for externalLabels, occurrences := range occurrencesPerExtLset {
var storeTypeStr string
if storeType != nil {
storeTypeStr = storeType.String()
}
// Select only required labels.
lbls := []string{}
for _, lbl := range c.labels {
switch lbl {
case string(ExternalLabels):
lbls = append(lbls, externalLabels)
case string(StoreType):
lbls = append(lbls, storeTypeStr)
lbls = append(lbls, k)
}
}
select {
Expand Down Expand Up @@ -454,12 +368,12 @@ func (e *EndpointSet) Update(ctx context.Context) {

// All producers that expose StoreAPI should have unique external labels. Check all which connect to our Querier.
if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 {
stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset] > 0 {

level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket",
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1))
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1))
}
stats[er.ComponentType()][extLset]++
stats[er.ComponentType().String()][extLset]++
}

e.endpointsMetric.Update(stats)
Expand Down Expand Up @@ -905,10 +819,10 @@ type endpointMetadata struct {
*infopb.InfoResponse
}

func newEndpointAPIStats() map[component.Component]map[string]int {
nodes := make(map[component.Component]map[string]int, len(storepb.StoreType_name))
for i := range storepb.StoreType_name {
nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{}
func newEndpointAPIStats() map[string]map[string]int {
nodes := make(map[string]map[string]int, len(component.All))
for _, comp := range component.All {
nodes[comp.String()] = map[string]int{}
}
return nodes
}
Expand Down
Loading
Loading