diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index e23dbf9cd78..bd145f8eca9 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -48,24 +48,24 @@ type EndpointSpec interface { // given store connection. Metadata(ctx context.Context, client infopb.InfoClient) (*endpointMetadata, error) - // StrictStatic returns true if the endpoint has been statically defined and it is under a strict mode. - StrictStatic() bool + // IsStrictStatic returns true if the endpoint has been statically defined and it is under a strict mode. + IsStrictStatic() bool } type grpcEndpointSpec struct { - addr string - strictstatic bool + addr string + isStrictStatic bool } // NewGRPCEndpointSpec creates gRPC endpoint spec. // It uses InfoAPI to get Metadata. -func NewGRPCEndpointSpec(addr string, strictstatic bool) EndpointSpec { - return &grpcEndpointSpec{addr: addr, strictstatic: strictstatic} +func NewGRPCEndpointSpec(addr string, isStrictStatic bool) EndpointSpec { + return &grpcEndpointSpec{addr: addr, isStrictStatic: isStrictStatic} } -// StrictStatic returns true if the endpoint has been statically defined and it is under a strict mode. -func (es *grpcEndpointSpec) StrictStatic() bool { - return es.strictstatic +// IsStrictStatic returns true if the endpoint has been statically defined and it is under a strict mode. +func (es *grpcEndpointSpec) IsStrictStatic() bool { + return es.isStrictStatic } func (es *grpcEndpointSpec) Addr() string { @@ -262,7 +262,7 @@ func (e *EndpointSet) Update(ctx context.Context) { level.Info(er.logger).Log("msg", unhealthyEndpointMessage, "address", addr, "extLset", labelpb.PromLabelSetsToString(er.LabelSets())) } - // Add stores that are not yet in stores. + // Add endpoints that are not yet in activeEndpoints map. for addr, er := range activeEndpoints { if _, ok := endpoints[addr]; ok { continue @@ -270,9 +270,8 @@ func (e *EndpointSet) Update(ctx context.Context) { extLset := labelpb.PromLabelSetsToString(er.LabelSets()) - // All producers should have unique external labels. While this does not check only StoreAPIs connected to - // this querier this allows to notify early user about misconfiguration. Warn only. This is also detectable from metric. - if (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) && + // All producers that expose StoreAPI should have unique external labels. Check all which connect to our Querier. + if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) && stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 { level.Warn(e.logger).Log("msg", "found duplicate storeAPI producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket", @@ -283,25 +282,7 @@ func (e *EndpointSet) Update(ctx context.Context) { endpoints[addr] = er e.updateEndpointStatus(er, nil) - if er.HasStoreAPI() { - level.Info(e.logger).Log("msg", "adding new storeAPI to query endpointset", "address", addr, "extLset", extLset) - } - - if er.HasRulesAPI() { - level.Info(e.logger).Log("msg", "adding new rulesAPI to query endpointset", "address", addr) - } - - if er.HasExemplarsAPI() { - level.Info(e.logger).Log("msg", "adding new exemplarsAPI to query endpointset", "address", addr) - } - - if er.HasTargetsAPI() { - level.Info(e.logger).Log("msg", "adding new targetsAPI to query endpointset", "address", addr) - } - - if er.HasMetricMetadataAPI() { - level.Info(e.logger).Log("msg", "adding new MetricMetadataAPI to query endpointset", "address", addr) - } + level.Info(e.logger).Log("msg", fmt.Sprintf("adding new %v with %+v", er.ComponentType(), er.apisPresent()), "address", addr, "extLset", extLset) } e.endpointsMetric.Update(stats) @@ -312,7 +293,7 @@ func (e *EndpointSet) Update(ctx context.Context) { e.cleanUpStoreStatuses(endpoints) } -// Get returns a list of all active stores. +// GetStoreClients returns a list of all active stores. func (e *EndpointSet) GetStoreClients() []storepb.StoreClient { e.endpointsMtx.RLock() defer e.endpointsMtx.RUnlock() @@ -443,7 +424,7 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri metadata, err := spec.Metadata(ctx, er.clients.info) if err != nil { - if !seenAlready && !spec.StrictStatic() { + if !seenAlready && !spec.IsStrictStatic() { // Close only if new and not a strict static node. // Unactive `e.endpoints` will be closed later on. er.Close() @@ -452,7 +433,7 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri e.updateEndpointStatus(er, err) level.Warn(e.logger).Log("msg", "update of node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr) - if !spec.StrictStatic() { + if !spec.IsStrictStatic() { return } @@ -600,39 +581,46 @@ func (er *endpointRef) ComponentType() component.Component { return component.FromString(er.metadata.ComponentType) } +func (er *endpointRef) HasClients() bool { + er.mtx.RLock() + defer er.mtx.RUnlock() + + return er.clients != nil +} + func (er *endpointRef) HasStoreAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.clients.store != nil + return er.HasClients() && er.clients.store != nil } func (er *endpointRef) HasRulesAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.clients.rule != nil + return er.HasClients() && er.clients.rule != nil } func (er *endpointRef) HasTargetsAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.clients.target != nil + return er.HasClients() && er.clients.target != nil } func (er *endpointRef) HasMetricMetadataAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.clients.metricMetadata != nil + return er.HasClients() && er.clients.metricMetadata != nil } func (er *endpointRef) HasExemplarsAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.clients.exemplar != nil + return er.HasClients() && er.clients.exemplar != nil } func (er *endpointRef) LabelSets() []labels.Labels { @@ -682,6 +670,32 @@ func (er *endpointRef) Close() { runutil.CloseWithLogOnErr(er.logger, er.cc, fmt.Sprintf("endpoint %v connection closed", er.addr)) } +func (er *endpointRef) apisPresent() []string { + var apisPresent []string + + if er.HasStoreAPI() { + apisPresent = append(apisPresent, "storeAPI") + } + + if er.HasRulesAPI() { + apisPresent = append(apisPresent, "rulesAPI") + } + + if er.HasExemplarsAPI() { + apisPresent = append(apisPresent, "exemplarsAPI") + } + + if er.HasTargetsAPI() { + apisPresent = append(apisPresent, "targetsAPI") + } + + if er.HasMetricMetadataAPI() { + apisPresent = append(apisPresent, "MetricMetadataAPI") + } + + return apisPresent +} + type endpointClients struct { store storepb.StoreClient rule rulespb.RulesClient