diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index ac971b7a974..114684f908b 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -96,6 +96,9 @@ func registerQuery(app *extkingpin.App) { selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("=\"\"").Strings() + endpoints := cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups."). + PlaceHolder("").Strings() + stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). PlaceHolder("").Strings() @@ -243,6 +246,7 @@ func registerQuery(app *extkingpin.App) { *queryReplicaLabels, selectorLset, getFlagsMap(cmd.Flags()), + *endpoints, *stores, *ruleEndpoints, *targetEndpoints, @@ -304,6 +308,7 @@ func runQuery( queryReplicaLabels []string, selectorLset labels.Labels, flagsMap map[string]string, + endpoints []string, storeAddrs []string, ruleAddrs []string, targetAddrs []string, @@ -348,6 +353,12 @@ func runQuery( } } + dnsInfoProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_info_apis_", reg), + dns.ResolverType(dnsSDResolver), + ) + dnsRuleProvider := dns.NewProvider( logger, extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg), @@ -419,6 +430,13 @@ func runQuery( return specs }, + func() (specs []query.InfoSpec) { + for _, addr := range dnsInfoProvider.Addresses() { + specs = append(specs, query.NewGRPCStoreSpec(addr, false)) + } + + return specs + }, dialOpts, unhealthyStoreTimeout, ) @@ -522,6 +540,10 @@ func runQuery( if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) } + if err := dnsInfoProvider.Resolve(resolveCtx, endpoints); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err) + + } return nil }) }, func(error) { diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 6dfd6740ddd..94d5702ada4 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -253,7 +253,7 @@ func runSidecar( examplarSrv := exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels) infoSrv := info.NewInfoServer( - infopb.ComponentType_SIDECAR, + component.Sidecar.String(), func() []labelpb.ZLabelSet { return promStore.LabelSet() }, @@ -272,6 +272,15 @@ func runSidecar( MaxTime: time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC().Unix(), } }, + func() *infopb.RulesInfo { + return &infopb.RulesInfo{} + }, + func() *infopb.TargetsInfo { + return &infopb.TargetsInfo{} + }, + func() *infopb.MetricMetadataInfo { + return &infopb.MetricMetadataInfo{} + }, ) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index d39cf5d1a66..c505f8d3be2 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -384,7 +384,7 @@ func runStore( } infoSrv := info.NewInfoServer( - infopb.ComponentType_STORE, + component.Store.String(), func() []labelpb.ZLabelSet { return bs.LabelSet() }, @@ -396,6 +396,9 @@ func runStore( } }, func() *infopb.ExemplarsInfo { return nil }, + nil, + nil, + nil, ) // Start query (proxy) gRPC StoreAPI. diff --git a/pkg/info/info.go b/pkg/info/info.go index 6c78135f281..f2408edce45 100644 --- a/pkg/info/info.go +++ b/pkg/info/info.go @@ -6,7 +6,6 @@ package info import ( "context" - "github.com/go-kit/kit/log" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" "google.golang.org/grpc" @@ -15,25 +14,33 @@ import ( type InfoServer struct { infopb.UnimplementedInfoServer - logger log.Logger - component infopb.ComponentType + component string - getLabelSet func() []labelpb.ZLabelSet - getStoreInfo func() *infopb.StoreInfo - getExemplarsInfo func() *infopb.ExemplarsInfo + getLabelSet func() []labelpb.ZLabelSet + getStoreInfo func() *infopb.StoreInfo + getExemplarsInfo func() *infopb.ExemplarsInfo + getRulesInfo func() *infopb.RulesInfo + getTargetsInfo func() *infopb.TargetsInfo + getMetricMetadataInfo func() *infopb.MetricMetadataInfo } func NewInfoServer( - component infopb.ComponentType, + component string, getLabelSet func() []labelpb.ZLabelSet, getStoreInfo func() *infopb.StoreInfo, getExemplarsInfo func() *infopb.ExemplarsInfo, + getRulesInfo func() *infopb.RulesInfo, + getTargetsInfo func() *infopb.TargetsInfo, + getMetricMetadataInfo func() *infopb.MetricMetadataInfo, ) *InfoServer { return &InfoServer{ - component: component, - getLabelSet: getLabelSet, - getStoreInfo: getStoreInfo, - getExemplarsInfo: getExemplarsInfo, + component: component, + getLabelSet: getLabelSet, + getStoreInfo: getStoreInfo, + getExemplarsInfo: getExemplarsInfo, + getRulesInfo: getRulesInfo, + getTargetsInfo: getTargetsInfo, + getMetricMetadataInfo: getMetricMetadataInfo, } } @@ -45,10 +52,28 @@ func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) { } func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*infopb.InfoResponse, error) { - return &infopb.InfoResponse{ - LabelSets: srv.getLabelSet(), - ComponentType: srv.component, - StoreInfo: srv.getStoreInfo(), - ExemplarsInfo: srv.getExemplarsInfo(), - }, nil + + if srv.getRulesInfo == nil { + srv.getRulesInfo = func() *infopb.RulesInfo { return nil } + } + + if srv.getTargetsInfo == nil { + srv.getTargetsInfo = func() *infopb.TargetsInfo { return nil } + } + + if srv.getMetricMetadataInfo == nil { + srv.getMetricMetadataInfo = func() *infopb.MetricMetadataInfo { return nil } + } + + resp := &infopb.InfoResponse{ + LabelSets: srv.getLabelSet(), + ComponentType: srv.component, + Store: srv.getStoreInfo(), + Exemplars: srv.getExemplarsInfo(), + Rules: srv.getRulesInfo(), + Targets: srv.getTargetsInfo(), + MetricMetadata: srv.getMetricMetadataInfo(), + } + + return resp, nil } diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index c511f16e695..fb6e664f19a 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -29,47 +29,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type ComponentType int32 - -const ( - ComponentType_UNKNOWN ComponentType = 0 - ComponentType_QUERY ComponentType = 1 - ComponentType_RULE ComponentType = 2 - ComponentType_SIDECAR ComponentType = 3 - ComponentType_STORE ComponentType = 4 - ComponentType_RECEIVE ComponentType = 5 - // DEBUG represents some debug StoreAPI components e.g. thanos tools store-api-serve. - ComponentType_DEBUG ComponentType = 6 -) - -var ComponentType_name = map[int32]string{ - 0: "UNKNOWN", - 1: "QUERY", - 2: "RULE", - 3: "SIDECAR", - 4: "STORE", - 5: "RECEIVE", - 6: "DEBUG", -} - -var ComponentType_value = map[string]int32{ - "UNKNOWN": 0, - "QUERY": 1, - "RULE": 2, - "SIDECAR": 3, - "STORE": 4, - "RECEIVE": 5, - "DEBUG": 6, -} - -func (x ComponentType) String() string { - return proto.EnumName(ComponentType_name, int32(x)) -} - -func (ComponentType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{0} -} - type InfoRequest struct { } @@ -108,9 +67,17 @@ var xxx_messageInfo_InfoRequest proto.InternalMessageInfo type InfoResponse struct { LabelSets []labelpb.ZLabelSet `protobuf:"bytes,1,rep,name=label_sets,json=labelSets,proto3" json:"label_sets"` - ComponentType ComponentType `protobuf:"varint,2,opt,name=ComponentType,proto3,enum=thanos.ComponentType" json:"ComponentType,omitempty"` - StoreInfo *StoreInfo `protobuf:"bytes,3,opt,name=storeInfo,proto3" json:"storeInfo,omitempty"` - ExemplarsInfo *ExemplarsInfo `protobuf:"bytes,6,opt,name=exemplarsInfo,proto3" json:"exemplarsInfo,omitempty"` + ComponentType string `protobuf:"bytes,2,opt,name=ComponentType,proto3" json:"ComponentType,omitempty"` + /// StoreInfo holds the metadata related to Store API if exposed by the component otherwise it will be null. + Store *StoreInfo `protobuf:"bytes,3,opt,name=store,proto3" json:"store,omitempty"` + /// RulesInfo holds the metadata related to Rules API if exposed by the component otherwise it will be null. + Rules *RulesInfo `protobuf:"bytes,4,opt,name=rules,proto3" json:"rules,omitempty"` + /// MetricMetadataInfo holds the metadata related to Metadata API if exposed by the component otherwise it will be null. + MetricMetadata *MetricMetadataInfo `protobuf:"bytes,5,opt,name=metric_metadata,json=metricMetadata,proto3" json:"metric_metadata,omitempty"` + /// TargetsInfo holds the metadata related to Targets API if exposed by the component otherwise it will be null. + Targets *TargetsInfo `protobuf:"bytes,6,opt,name=targets,proto3" json:"targets,omitempty"` + /// ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. + Exemplars *ExemplarsInfo `protobuf:"bytes,7,opt,name=exemplars,proto3" json:"exemplars,omitempty"` } func (m *InfoResponse) Reset() { *m = InfoResponse{} } @@ -146,6 +113,7 @@ func (m *InfoResponse) XXX_DiscardUnknown() { var xxx_messageInfo_InfoResponse proto.InternalMessageInfo +/// StoreInfo holds the metadata related to Store API exposed by the component. type StoreInfo struct { MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` @@ -184,6 +152,118 @@ func (m *StoreInfo) XXX_DiscardUnknown() { var xxx_messageInfo_StoreInfo proto.InternalMessageInfo +/// RulesInfo holds the metadata related to Rules API exposed by the component. +type RulesInfo struct { +} + +func (m *RulesInfo) Reset() { *m = RulesInfo{} } +func (m *RulesInfo) String() string { return proto.CompactTextString(m) } +func (*RulesInfo) ProtoMessage() {} +func (*RulesInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{3} +} +func (m *RulesInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RulesInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RulesInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RulesInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_RulesInfo.Merge(m, src) +} +func (m *RulesInfo) XXX_Size() int { + return m.Size() +} +func (m *RulesInfo) XXX_DiscardUnknown() { + xxx_messageInfo_RulesInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_RulesInfo proto.InternalMessageInfo + +/// MetricMetadataInfo holds the metadata related to Metadata API exposed by the component. +type MetricMetadataInfo struct { +} + +func (m *MetricMetadataInfo) Reset() { *m = MetricMetadataInfo{} } +func (m *MetricMetadataInfo) String() string { return proto.CompactTextString(m) } +func (*MetricMetadataInfo) ProtoMessage() {} +func (*MetricMetadataInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{4} +} +func (m *MetricMetadataInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetricMetadataInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetricMetadataInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetricMetadataInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetricMetadataInfo.Merge(m, src) +} +func (m *MetricMetadataInfo) XXX_Size() int { + return m.Size() +} +func (m *MetricMetadataInfo) XXX_DiscardUnknown() { + xxx_messageInfo_MetricMetadataInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_MetricMetadataInfo proto.InternalMessageInfo + +/// TargetsInfo holds the metadata related to Targets API exposed by the component. +type TargetsInfo struct { +} + +func (m *TargetsInfo) Reset() { *m = TargetsInfo{} } +func (m *TargetsInfo) String() string { return proto.CompactTextString(m) } +func (*TargetsInfo) ProtoMessage() {} +func (*TargetsInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{5} +} +func (m *TargetsInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TargetsInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TargetsInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TargetsInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_TargetsInfo.Merge(m, src) +} +func (m *TargetsInfo) XXX_Size() int { + return m.Size() +} +func (m *TargetsInfo) XXX_DiscardUnknown() { + xxx_messageInfo_TargetsInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_TargetsInfo proto.InternalMessageInfo + +/// EXemplarsInfo holds the metadata related to Exemplars API exposed by the component. type ExemplarsInfo struct { MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` @@ -193,7 +273,7 @@ func (m *ExemplarsInfo) Reset() { *m = ExemplarsInfo{} } func (m *ExemplarsInfo) String() string { return proto.CompactTextString(m) } func (*ExemplarsInfo) ProtoMessage() {} func (*ExemplarsInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{3} + return fileDescriptor_a1214ec45d2bf952, []int{6} } func (m *ExemplarsInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -223,44 +303,46 @@ func (m *ExemplarsInfo) XXX_DiscardUnknown() { var xxx_messageInfo_ExemplarsInfo proto.InternalMessageInfo func init() { - proto.RegisterEnum("thanos.ComponentType", ComponentType_name, ComponentType_value) proto.RegisterType((*InfoRequest)(nil), "thanos.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.InfoResponse") proto.RegisterType((*StoreInfo)(nil), "thanos.StoreInfo") + proto.RegisterType((*RulesInfo)(nil), "thanos.RulesInfo") + proto.RegisterType((*MetricMetadataInfo)(nil), "thanos.MetricMetadataInfo") + proto.RegisterType((*TargetsInfo)(nil), "thanos.TargetsInfo") proto.RegisterType((*ExemplarsInfo)(nil), "thanos.ExemplarsInfo") } func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 424 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x6e, 0xd3, 0x30, - 0x18, 0xc7, 0xe3, 0xb6, 0xcb, 0x16, 0x97, 0xa2, 0x60, 0x36, 0x29, 0xeb, 0x21, 0x44, 0x11, 0x87, - 0x8a, 0x43, 0x23, 0x8a, 0x84, 0x84, 0x38, 0xad, 0x9d, 0x85, 0x2a, 0xa6, 0x4e, 0x38, 0x0d, 0x88, - 0x5d, 0xa6, 0x04, 0x79, 0x23, 0x52, 0x62, 0x9b, 0xd8, 0x48, 0xdd, 0x5b, 0xf0, 0x58, 0x3d, 0xee, - 0xc8, 0x09, 0x41, 0xfb, 0x04, 0xbc, 0x01, 0xb2, 0xd3, 0x96, 0x86, 0x23, 0x97, 0xc4, 0xfe, 0x7e, - 0xff, 0xef, 0xfb, 0xdb, 0xd6, 0x1f, 0x9e, 0xe4, 0xec, 0x86, 0x47, 0xfa, 0x23, 0xb2, 0xa8, 0x12, - 0x9f, 0x86, 0xa2, 0xe2, 0x8a, 0x23, 0x5b, 0x7d, 0x4e, 0x19, 0x97, 0xfd, 0x53, 0xa9, 0x78, 0x45, - 0xa3, 0x22, 0xcd, 0x68, 0x21, 0xb2, 0x48, 0xdd, 0x09, 0x2a, 0x6b, 0x49, 0xff, 0xf8, 0x96, 0xdf, - 0x72, 0xb3, 0x8c, 0xf4, 0xaa, 0xae, 0x86, 0x3d, 0xd8, 0x9d, 0xb2, 0x1b, 0x4e, 0xe8, 0x97, 0xaf, - 0x54, 0xaa, 0xf0, 0x37, 0x80, 0x0f, 0xea, 0xbd, 0x14, 0x9c, 0x49, 0x8a, 0x5e, 0x42, 0x68, 0x86, - 0x5d, 0x4b, 0xaa, 0xa4, 0x07, 0x82, 0xf6, 0xa0, 0x3b, 0x7a, 0x34, 0xac, 0xdd, 0x86, 0x57, 0x17, - 0x1a, 0xc5, 0x54, 0x8d, 0x3b, 0xcb, 0x1f, 0x4f, 0x2c, 0xe2, 0x14, 0x9b, 0xbd, 0x44, 0xaf, 0x61, - 0x6f, 0xc2, 0x4b, 0xc1, 0x19, 0x65, 0x6a, 0x7e, 0x27, 0xa8, 0xd7, 0x0a, 0xc0, 0xe0, 0xe1, 0xe8, - 0x64, 0xdb, 0xda, 0x80, 0xa4, 0xa9, 0x45, 0x11, 0x74, 0xcc, 0x3d, 0xf4, 0x49, 0xbc, 0x76, 0x00, - 0xf6, 0x3d, 0xe3, 0x2d, 0x20, 0x7f, 0x35, 0xda, 0x8d, 0x2e, 0x68, 0x29, 0x8a, 0xb4, 0x92, 0xa6, - 0xc9, 0x36, 0x4d, 0x3b, 0x37, 0xbc, 0x0f, 0x49, 0x53, 0x1b, 0x9e, 0x41, 0x67, 0x37, 0x14, 0x9d, - 0xc2, 0xa3, 0x32, 0x67, 0xd7, 0x2a, 0x2f, 0xa9, 0x07, 0x02, 0x30, 0x68, 0x93, 0xc3, 0x32, 0x67, - 0xf3, 0xbc, 0xa4, 0x06, 0xa5, 0x8b, 0x1a, 0xb5, 0x36, 0x28, 0x5d, 0x68, 0x14, 0x62, 0xd8, 0x6b, - 0x58, 0xfc, 0xdf, 0x98, 0x67, 0xe9, 0x3f, 0x8f, 0x86, 0xba, 0xf0, 0x30, 0x99, 0xbd, 0x9d, 0x5d, - 0x7e, 0x98, 0xb9, 0x16, 0x72, 0xe0, 0xc1, 0xbb, 0x04, 0x93, 0x8f, 0x2e, 0x40, 0x47, 0xb0, 0x43, - 0x92, 0x0b, 0xec, 0xb6, 0xb4, 0x22, 0x9e, 0x9e, 0xe3, 0xc9, 0x19, 0x71, 0xdb, 0x5a, 0x11, 0xcf, - 0x2f, 0x09, 0x76, 0x3b, 0xba, 0x4e, 0xf0, 0x04, 0x4f, 0xdf, 0x63, 0xf7, 0x40, 0xd7, 0xcf, 0xf1, - 0x38, 0x79, 0xe3, 0xda, 0xa3, 0x57, 0xb0, 0x63, 0x0e, 0xf8, 0x7c, 0xf3, 0x7f, 0xbc, 0x7d, 0xa2, - 0xbd, 0x14, 0xf4, 0x8f, 0x9b, 0xc5, 0x3a, 0x0a, 0xe3, 0xa7, 0xcb, 0x5f, 0xbe, 0xb5, 0x5c, 0xf9, - 0xe0, 0x7e, 0xe5, 0x83, 0x9f, 0x2b, 0x1f, 0x7c, 0x5b, 0xfb, 0xd6, 0xfd, 0xda, 0xb7, 0xbe, 0xaf, - 0x7d, 0xeb, 0xca, 0xae, 0x33, 0x99, 0xd9, 0x26, 0x57, 0x2f, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, - 0x6d, 0xd9, 0xce, 0xb7, 0xa9, 0x02, 0x00, 0x00, + // 426 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x3f, 0x6f, 0xd3, 0x40, + 0x18, 0xc6, 0x7d, 0x49, 0x9a, 0xe0, 0x37, 0x04, 0xc4, 0x91, 0x4a, 0x6e, 0x06, 0x13, 0x59, 0x95, + 0xc8, 0x42, 0x2c, 0x5a, 0x09, 0x89, 0x91, 0x56, 0x1d, 0x90, 0xe8, 0xe2, 0x66, 0xea, 0x12, 0x5d, + 0xca, 0xdb, 0x60, 0xc9, 0xf7, 0x07, 0xdf, 0x55, 0x4a, 0xbf, 0x05, 0x1f, 0x2b, 0x63, 0x46, 0x26, + 0x04, 0xc9, 0xc0, 0xd7, 0x40, 0x77, 0x67, 0x87, 0x58, 0x6c, 0x2c, 0xf6, 0xdd, 0xf3, 0x3c, 0xbf, + 0xbb, 0xf7, 0xb5, 0x5f, 0x38, 0xce, 0xc5, 0xbd, 0x4c, 0xed, 0x43, 0x2d, 0xd2, 0x52, 0xdd, 0x4d, + 0x55, 0x29, 0x8d, 0xa4, 0x5d, 0xf3, 0x85, 0x09, 0xa9, 0x47, 0x27, 0xda, 0xc8, 0x12, 0xd3, 0x82, + 0x2d, 0xb0, 0x50, 0x8b, 0xd4, 0x3c, 0x2a, 0xd4, 0x3e, 0x32, 0x1a, 0x2e, 0xe5, 0x52, 0xba, 0x65, + 0x6a, 0x57, 0x5e, 0x4d, 0x06, 0xd0, 0xff, 0x28, 0xee, 0x65, 0x86, 0x5f, 0x1f, 0x50, 0x9b, 0xe4, + 0x77, 0x0b, 0x9e, 0xfa, 0xbd, 0x56, 0x52, 0x68, 0xa4, 0xef, 0x00, 0xdc, 0x61, 0x73, 0x8d, 0x46, + 0x47, 0x64, 0xdc, 0x9e, 0xf4, 0xcf, 0x5e, 0x4c, 0xfd, 0x6d, 0xd3, 0xdb, 0x4f, 0xd6, 0xba, 0x41, + 0x73, 0xd1, 0x59, 0xff, 0x78, 0x15, 0x64, 0x61, 0x51, 0xed, 0x35, 0x3d, 0x85, 0xc1, 0xa5, 0xe4, + 0x4a, 0x0a, 0x14, 0x66, 0xf6, 0xa8, 0x30, 0x6a, 0x8d, 0xc9, 0x24, 0xcc, 0x9a, 0x22, 0x7d, 0x0d, + 0x47, 0xae, 0xe0, 0xa8, 0x3d, 0x26, 0x87, 0x07, 0xdf, 0x58, 0xd1, 0xd5, 0xe1, 0x7d, 0x1b, 0x2c, + 0x1f, 0x0a, 0xd4, 0x51, 0xa7, 0x19, 0xcc, 0xac, 0xe8, 0x83, 0xce, 0xa7, 0x97, 0xf0, 0x9c, 0xa3, + 0x29, 0xf3, 0xbb, 0x39, 0x47, 0xc3, 0x3e, 0x33, 0xc3, 0xa2, 0x23, 0x87, 0x8c, 0x6a, 0xe4, 0xda, + 0xd9, 0xd7, 0x95, 0xeb, 0xd8, 0x67, 0xbc, 0xa1, 0xd1, 0x37, 0xd0, 0x33, 0xac, 0x5c, 0xda, 0x8e, + 0xbb, 0x0e, 0x7e, 0x59, 0xc3, 0x33, 0x2f, 0x3b, 0xaa, 0xce, 0xd0, 0x73, 0x08, 0x71, 0x85, 0x5c, + 0x15, 0xac, 0xd4, 0x51, 0xcf, 0x01, 0xc7, 0x35, 0x70, 0x55, 0x1b, 0x0e, 0xf9, 0x9b, 0x4b, 0x3e, + 0x40, 0xb8, 0xef, 0x92, 0x9e, 0xc0, 0x13, 0x9e, 0x8b, 0xb9, 0xc9, 0x39, 0x46, 0x64, 0x4c, 0x26, + 0xed, 0xac, 0xc7, 0x73, 0x31, 0xcb, 0x39, 0x3a, 0x8b, 0xad, 0xbc, 0xd5, 0xaa, 0x2c, 0xb6, 0xb2, + 0x56, 0xd2, 0x87, 0x70, 0xdf, 0x7f, 0x32, 0x04, 0xfa, 0x6f, 0x67, 0xf6, 0xf7, 0x1e, 0x94, 0x9c, + 0x5c, 0xc1, 0xa0, 0x51, 0xd0, 0xff, 0x5d, 0x7c, 0xf6, 0x1e, 0x3a, 0x8e, 0x7e, 0x5b, 0xbd, 0xf7, + 0x9f, 0xe7, 0x60, 0x94, 0x46, 0xc3, 0xa6, 0xe8, 0xe7, 0xe9, 0xe2, 0x74, 0xfd, 0x2b, 0x0e, 0xd6, + 0xdb, 0x98, 0x6c, 0xb6, 0x31, 0xf9, 0xb9, 0x8d, 0xc9, 0xb7, 0x5d, 0x1c, 0x6c, 0x76, 0x71, 0xf0, + 0x7d, 0x17, 0x07, 0xb7, 0x5d, 0x3f, 0xd8, 0x8b, 0xae, 0x1b, 0xce, 0xf3, 0x3f, 0x01, 0x00, 0x00, + 0xff, 0xff, 0x23, 0x43, 0x4e, 0x7f, 0xee, 0x02, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -275,7 +357,7 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type InfoClient interface { - // Info returns the metadata (Eg. LabelSets, Min/Max time) about all the APIs the component supports. + /// Info returns the metadata (Eg. LabelSets, Min/Max time) about all the APIs the component supports. Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) } @@ -298,7 +380,7 @@ func (c *infoClient) Info(ctx context.Context, in *InfoRequest, opts ...grpc.Cal // InfoServer is the server API for Info service. type InfoServer interface { - // Info returns the metadata (Eg. LabelSets, Min/Max time) about all the APIs the component supports. + /// Info returns the metadata (Eg. LabelSets, Min/Max time) about all the APIs the component supports. Info(context.Context, *InfoRequest) (*InfoResponse, error) } @@ -388,9 +470,21 @@ func (m *InfoResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.ExemplarsInfo != nil { + if m.Exemplars != nil { { - size, err := m.ExemplarsInfo.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Exemplars.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x3a + } + if m.Targets != nil { + { + size, err := m.Targets.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -400,9 +494,33 @@ func (m *InfoResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x32 } - if m.StoreInfo != nil { + if m.MetricMetadata != nil { + { + size, err := m.MetricMetadata.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + if m.Rules != nil { { - size, err := m.StoreInfo.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Rules.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + if m.Store != nil { + { + size, err := m.Store.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -412,10 +530,12 @@ func (m *InfoResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x1a } - if m.ComponentType != 0 { - i = encodeVarintRpc(dAtA, i, uint64(m.ComponentType)) + if len(m.ComponentType) > 0 { + i -= len(m.ComponentType) + copy(dAtA[i:], m.ComponentType) + i = encodeVarintRpc(dAtA, i, uint64(len(m.ComponentType))) i-- - dAtA[i] = 0x10 + dAtA[i] = 0x12 } if len(m.LabelSets) > 0 { for iNdEx := len(m.LabelSets) - 1; iNdEx >= 0; iNdEx-- { @@ -467,6 +587,75 @@ func (m *StoreInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *RulesInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RulesInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RulesInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *MetricMetadataInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetricMetadataInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetricMetadataInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *TargetsInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TargetsInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TargetsInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + func (m *ExemplarsInfo) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -532,15 +721,28 @@ func (m *InfoResponse) Size() (n int) { n += 1 + l + sovRpc(uint64(l)) } } - if m.ComponentType != 0 { - n += 1 + sovRpc(uint64(m.ComponentType)) + l = len(m.ComponentType) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + if m.Store != nil { + l = m.Store.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Rules != nil { + l = m.Rules.Size() + n += 1 + l + sovRpc(uint64(l)) } - if m.StoreInfo != nil { - l = m.StoreInfo.Size() + if m.MetricMetadata != nil { + l = m.MetricMetadata.Size() n += 1 + l + sovRpc(uint64(l)) } - if m.ExemplarsInfo != nil { - l = m.ExemplarsInfo.Size() + if m.Targets != nil { + l = m.Targets.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Exemplars != nil { + l = m.Exemplars.Size() n += 1 + l + sovRpc(uint64(l)) } return n @@ -561,6 +763,33 @@ func (m *StoreInfo) Size() (n int) { return n } +func (m *RulesInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *MetricMetadataInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *TargetsInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func (m *ExemplarsInfo) Size() (n int) { if m == nil { return 0 @@ -696,10 +925,10 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 2: - if wireType != 0 { + if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field ComponentType", wireType) } - m.ComponentType = 0 + var stringLen uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowRpc @@ -709,14 +938,99 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.ComponentType |= ComponentType(b&0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ComponentType = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field StoreInfo", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Store", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Store == nil { + m.Store = &StoreInfo{} + } + if err := m.Store.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Rules", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Rules == nil { + m.Rules = &RulesInfo{} + } + if err := m.Rules.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MetricMetadata", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -743,16 +1057,16 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.StoreInfo == nil { - m.StoreInfo = &StoreInfo{} + if m.MetricMetadata == nil { + m.MetricMetadata = &MetricMetadataInfo{} } - if err := m.StoreInfo.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.MetricMetadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex case 6: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ExemplarsInfo", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Targets", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -779,10 +1093,46 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.ExemplarsInfo == nil { - m.ExemplarsInfo = &ExemplarsInfo{} + if m.Targets == nil { + m.Targets = &TargetsInfo{} } - if err := m.ExemplarsInfo.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.Targets.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Exemplars", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Exemplars == nil { + m.Exemplars = &ExemplarsInfo{} + } + if err := m.Exemplars.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex @@ -895,6 +1245,156 @@ func (m *StoreInfo) Unmarshal(dAtA []byte) error { } return nil } +func (m *RulesInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RulesInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RulesInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *MetricMetadataInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetricMetadataInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetricMetadataInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TargetsInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TargetsInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TargetsInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ExemplarsInfo) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index 58a4d2d16eb..911b6c1922d 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" + "github.com/thanos-io/thanos/pkg/info/infopb" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/component" @@ -68,6 +69,11 @@ type ExemplarSpec interface { Addr() string } +type InfoSpec interface { + // Addr returns InfoAPI Address for the info spec. It is used as its ID. + Addr() string +} + // stringError forces the error to be a string // when marshaled into a JSON. type stringError struct { @@ -198,6 +204,7 @@ type StoreSet struct { // Store specifications can change dynamically. If some store is missing from the list, we assuming it is no longer // accessible and we close gRPC client for it. + infoSpec func() []InfoSpec storeSpecs func() []StoreSpec ruleSpecs func() []RuleSpec targetSpecs func() []TargetSpec @@ -228,6 +235,7 @@ func NewStoreSet( targetSpecs func() []TargetSpec, metadataSpecs func() []MetadataSpec, exemplarSpecs func() []ExemplarSpec, + infoSpecs func() []InfoSpec, dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, ) *StoreSet { @@ -239,6 +247,11 @@ func NewStoreSet( if logger == nil { logger = log.NewNopLogger() } + + if infoSpecs == nil { + infoSpecs = func() []InfoSpec { return nil } + } + if storeSpecs == nil { storeSpecs = func() []StoreSpec { return nil } } @@ -257,6 +270,7 @@ func NewStoreSet( ss := &StoreSet{ logger: log.With(logger, "component", "storeset"), + infoSpec: infoSpecs, storeSpecs: storeSpecs, ruleSpecs: ruleSpecs, targetSpecs: targetSpecs, @@ -289,6 +303,9 @@ type storeRef struct { // If target is not nil, then this store also supports targets API. target targetspb.TargetsClient + // If info is not nil, then this store also supports Info API. + info infopb.InfoClient + // Meta (can change during runtime). labelSets []labels.Labels storeType component.StoreAPI @@ -312,6 +329,21 @@ func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int6 s.exemplar = exemplar } +func (s *storeRef) UpdateWithStore(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, store storepb.StoreClient, rule rulespb.RulesClient, target targetspb.TargetsClient, metadata metadatapb.MetadataClient, exemplar exemplarspb.ExemplarsClient) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.storeType = storeType + s.labelSets = labelSets + s.minTime = minTime + s.maxTime = maxTime + s.StoreClient = store + s.rule = rule + s.target = target + s.metadata = metadata + s.exemplar = exemplar +} + func (s *storeRef) StoreType() component.StoreAPI { s.mtx.RLock() defer s.mtx.RUnlock() @@ -483,6 +515,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store targetAddrSet = make(map[string]struct{}) metadataAddrSet = make(map[string]struct{}) exemplarAddrSet = make(map[string]struct{}) + infoAddrSet = make(map[string]struct{}) ) // Gather active stores map concurrently. Build new store if does not exist already. @@ -597,6 +630,105 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store level.Warn(s.logger).Log("msg", "ignored rule store", "address", ruleAddr) } } + + // Gather healthy stores map concurrently using info addresses. Build new store if does not exist already. + for _, infoSpec := range s.infoSpec() { + if _, ok := infoAddrSet[infoSpec.Addr()]; ok { + level.Warn(s.logger).Log("msg", "duplicated address in info nodes", "address", infoSpec.Addr()) + continue + } + infoAddrSet[infoSpec.Addr()] = struct{}{} + + wg.Add(1) + go func(spec InfoSpec) { + defer wg.Done() + + addr := spec.Addr() + + ctx, cancel := context.WithTimeout(ctx, s.gRPCInfoCallTimeout) + defer cancel() + + st, seenAlready := stores[addr] + if !seenAlready { + // New store or was unactive and was removed in the past - create the new one. + conn, err := grpc.DialContext(ctx, addr, s.dialOpts...) + if err != nil { + s.updateStoreStatus(&storeRef{addr: addr}, err) + level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr) + return + } + + st = &storeRef{ + info: infopb.NewInfoClient(conn), + storeType: component.UnknownStoreAPI, + cc: conn, + addr: addr, + logger: s.logger, + } + } + + info, err := st.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true)) + if err != nil { + if !seenAlready { + // Close only if new + // Unactive `s.stores` will be closed later on. + st.Close() + } + + s.updateStoreStatus(st, err) + level.Warn(s.logger).Log("msg", "update of node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr) + + return + } + + s.updateStoreStatus(st, nil) + + labelSets := make([]labels.Labels, 0, len(info.LabelSets)) + for _, ls := range info.LabelSets { + labelSets = append(labelSets, ls.PromLabels()) + } + + var minTime, maxTime int64 + var store storepb.StoreClient + if info.Store != nil { + store = storepb.NewStoreClient(st.cc) + minTime = info.Store.MinTime + maxTime = info.Store.MinTime + } + + var rule rulespb.RulesClient + if info.Rules != nil { + rule = rulespb.NewRulesClient(st.cc) + } + + var target targetspb.TargetsClient + if info.Targets != nil { + target = targetspb.NewTargetsClient(st.cc) + } + + var metadata metadatapb.MetadataClient + if info.MetricMetadata != nil { + metadata = metadatapb.NewMetadataClient(st.cc) + } + + var exemplar exemplarspb.ExemplarsClient + if info.Exemplars != nil { + // min/max range is also provided by in the response of Info rpc call + // but we are not using this metadata anywhere right now so ignoring. + exemplar = exemplarspb.NewExemplarsClient(st.cc) + } + + s.updateStoreStatus(st, nil) + st.UpdateWithStore(labelSets, minTime, maxTime, component.Sidecar, store, rule, target, metadata, exemplar) + + mtx.Lock() + defer mtx.Unlock() + + activeStores[addr] = st + }(infoSpec) + } + wg.Wait() + return activeStores } diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index be58b7fb384..922a9fd89be 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -18,6 +18,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -130,6 +131,93 @@ func (s *testStores) CloseOne(addr string) { delete(s.srvs, addr) } +type mockedInfo struct { + info infopb.InfoResponse +} + +func (s *mockedInfo) Info(ctx context.Context, r *infopb.InfoRequest) (*infopb.InfoResponse, error) { + return &s.info, nil +} + +type testInfoMeta struct { + extlsetFn func(add string) []labelpb.ZLabelSet + storeType component.StoreAPI + store infopb.StoreInfo + rule infopb.RulesInfo + metadata infopb.MetricMetadataInfo + target infopb.TargetsInfo + exemplar infopb.ExemplarsInfo +} + +type testInfoSrvs struct { + srvs map[string]*grpc.Server + orderAddrs []string +} + +func startInfoSrvs(infoMetas []testInfoMeta) (*testInfoSrvs, error) { + info := &testInfoSrvs{ + srvs: map[string]*grpc.Server{}, + } + + for _, meta := range infoMetas { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + // Close the servers + info.Close() + return nil, err + } + + srv := grpc.NewServer() + + infoSrv := &mockedInfo{ + info: infopb.InfoResponse{ + LabelSets: meta.extlsetFn(listener.Addr().String()), + Store: &meta.store, + MetricMetadata: &meta.metadata, + Rules: &meta.rule, + Targets: &meta.target, + Exemplars: &meta.exemplar, + }, + } + + if meta.storeType != nil { + infoSrv.info.ComponentType = meta.storeType.String() + } + infopb.RegisterInfoServer(srv, infoSrv) + go func() { + _ = srv.Serve(listener) + }() + + info.srvs[listener.Addr().String()] = srv + info.orderAddrs = append(info.orderAddrs, listener.Addr().String()) + } + + return info, nil +} + +func (s *testInfoSrvs) Close() { + for _, srv := range s.srvs { + srv.Stop() + } + s.srvs = nil +} + +func (s *testInfoSrvs) CloseOne(addr string) { + srv, ok := s.srvs[addr] + if !ok { + return + } + + srv.Stop() + delete(s.srvs, addr) +} + +func (s *testInfoSrvs) InfoAddresses() []string { + var stores []string + stores = append(stores, s.orderAddrs...) + return stores +} + func TestStoreSet_Update(t *testing.T) { stores, err := startTestStores([]testStoreMeta{ { @@ -184,8 +272,95 @@ func TestStoreSet_Update(t *testing.T) { discoveredStoreAddr := stores.StoreAddresses() + infoSrvs, err := startInfoSrvs([]testInfoMeta{ + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return []labelpb.ZLabelSet{ + { + Labels: []labelpb.ZLabel{ + {Name: "addr", Value: addr}, + }, + }, + { + Labels: []labelpb.ZLabel{ + {Name: "a", Value: "b"}, + }, + }, + } + }, + store: infopb.StoreInfo{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + }, + exemplar: infopb.ExemplarsInfo{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + }, + rule: infopb.RulesInfo{}, + metadata: infopb.MetricMetadataInfo{}, + target: infopb.TargetsInfo{}, + }, + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return []labelpb.ZLabelSet{ + { + Labels: []labelpb.ZLabel{ + {Name: "addr", Value: addr}, + }, + }, + { + Labels: []labelpb.ZLabel{ + {Name: "a", Value: "b"}, + }, + }, + } + }, + store: infopb.StoreInfo{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + }, + exemplar: infopb.ExemplarsInfo{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + }, + rule: infopb.RulesInfo{}, + metadata: infopb.MetricMetadataInfo{}, + target: infopb.TargetsInfo{}, + }, + { + storeType: component.Query, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return []labelpb.ZLabelSet{ + { + Labels: []labelpb.ZLabel{ + {Name: "addr", Value: "broken"}, + }, + }, + } + }, + store: infopb.StoreInfo{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + }, + exemplar: infopb.ExemplarsInfo{ + MinTime: math.MaxInt64, + MaxTime: math.MinInt64, + }, + rule: infopb.RulesInfo{}, + metadata: infopb.MetricMetadataInfo{}, + target: infopb.TargetsInfo{}, + }, + }) + testutil.Ok(t, err) + defer infoSrvs.Close() + + discoveredInfoAddr := infoSrvs.InfoAddresses() + // Testing if duplicates can cause weird results. discoveredStoreAddr = append(discoveredStoreAddr, discoveredStoreAddr[0]) + discoveredInfoAddr = append(discoveredInfoAddr, discoveredInfoAddr[0]) storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { for _, addr := range discoveredStoreAddr { @@ -205,6 +380,12 @@ func TestStoreSet_Update(t *testing.T) { func() (specs []ExemplarSpec) { return nil }, + func() (specs []InfoSpec) { + for _, addr := range discoveredInfoAddr { + specs = append(specs, NewGRPCStoreSpec(addr, false)) + } + return specs + }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second defer storeSet.Close() @@ -215,11 +396,14 @@ func TestStoreSet_Update(t *testing.T) { // Start with one not available. stores.CloseOne(discoveredStoreAddr[2]) + // Make one address discovered by Info Servers unavailable. + infoSrvs.CloseOne(discoveredInfoAddr[2]) + // Should not matter how many of these we run. storeSet.Update(context.Background()) storeSet.Update(context.Background()) - testutil.Equals(t, 2, len(storeSet.stores)) - testutil.Equals(t, 3, len(storeSet.storeStatuses)) + testutil.Equals(t, 4, len(storeSet.stores)) + testutil.Equals(t, 6, len(storeSet.storeStatuses)) for addr, st := range storeSet.stores { testutil.Equals(t, addr, st.addr) @@ -237,22 +421,28 @@ func TestStoreSet_Update(t *testing.T) { expected[component.Sidecar] = map[string]int{ fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[0]): 1, fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[1]): 1, + fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredInfoAddr[0]): 1, + fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredInfoAddr[1]): 1, } testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) // Remove address from discovered and reset last check, which should ensure cleanup of status on next update. storeSet.storeStatuses[discoveredStoreAddr[2]].LastCheck = time.Now().Add(-4 * time.Minute) discoveredStoreAddr = discoveredStoreAddr[:len(discoveredStoreAddr)-2] + storeSet.storeStatuses[discoveredInfoAddr[2]].LastCheck = time.Now().Add(-4 * time.Minute) + discoveredInfoAddr = discoveredInfoAddr[:len(discoveredInfoAddr)-2] storeSet.Update(context.Background()) - testutil.Equals(t, 2, len(storeSet.storeStatuses)) + testutil.Equals(t, 4, len(storeSet.storeStatuses)) stores.CloseOne(discoveredStoreAddr[0]) delete(expected[component.Sidecar], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[0])) + infoSrvs.CloseOne(discoveredInfoAddr[0]) + delete(expected[component.Sidecar], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredInfoAddr[0])) // We expect Update to tear down store client for closed store server. storeSet.Update(context.Background()) - testutil.Equals(t, 1, len(storeSet.stores), "only one service should respond just fine, so we expect one client to be ready.") - testutil.Equals(t, 2, len(storeSet.storeStatuses)) + testutil.Equals(t, 2, len(storeSet.stores), "only two service should respond just fine, so we expect one client to be ready.") + testutil.Equals(t, 4, len(storeSet.storeStatuses)) addr := discoveredStoreAddr[1] st, ok := storeSet.stores[addr] @@ -478,7 +668,7 @@ func TestStoreSet_Update(t *testing.T) { // New stores should be loaded. storeSet.Update(context.Background()) - testutil.Equals(t, 1+len(stores2.srvs), len(storeSet.stores)) + testutil.Equals(t, 2+len(stores2.srvs), len(storeSet.stores)) // Check stats. expected = newStoreAPIStats() @@ -494,6 +684,7 @@ func TestStoreSet_Update(t *testing.T) { } expected[component.Sidecar] = map[string]int{ fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[1]): 1, + fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredInfoAddr[1]): 1, "{l1=\"v2\", l2=\"v3\"}": 2, } expected[component.Store] = map[string]int{ @@ -503,7 +694,7 @@ func TestStoreSet_Update(t *testing.T) { testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) // Check statuses. - testutil.Equals(t, 2+len(stores2.srvs), len(storeSet.storeStatuses)) + testutil.Equals(t, 4+len(stores2.srvs), len(storeSet.storeStatuses)) } func TestStoreSet_Update_NoneAvailable(t *testing.T) { @@ -542,10 +733,43 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { testutil.Ok(t, err) defer st.Close() + infoSrvs, err := startInfoSrvs([]testInfoMeta{ + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return []labelpb.ZLabelSet{ + { + Labels: []labelpb.ZLabel{ + {Name: "addr", Value: addr}, + }, + }, + } + }, + }, + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return []labelpb.ZLabelSet{ + { + Labels: []labelpb.ZLabel{ + {Name: "addr", Value: addr}, + }, + }, + } + }, + }, + }) + testutil.Ok(t, err) + defer infoSrvs.Close() + initialStoreAddr := st.StoreAddresses() st.CloseOne(initialStoreAddr[0]) st.CloseOne(initialStoreAddr[1]) + initialInfoAddr := infoSrvs.InfoAddresses() + infoSrvs.CloseOne(initialInfoAddr[0]) + infoSrvs.CloseOne(initialInfoAddr[1]) + storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { for _, addr := range initialStoreAddr { @@ -557,6 +781,12 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { func() (specs []TargetSpec) { return nil }, func() (specs []MetadataSpec) { return nil }, func() (specs []ExemplarSpec) { return nil }, + func() (specs []InfoSpec) { + for _, addr := range initialInfoAddr { + specs = append(specs, NewGRPCStoreSpec(addr, false)) + } + return specs + }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second @@ -648,6 +878,8 @@ func TestQuerierStrict(t *testing.T) { return nil }, func() []ExemplarSpec { return nil + }, func() []InfoSpec { + return nil }, testGRPCOpts, time.Minute) defer storeSet.Close() storeSet.gRPCInfoCallTimeout = 1 * time.Second @@ -802,6 +1034,7 @@ func TestStoreSet_Update_Rules(t *testing.T) { func() []TargetSpec { return nil }, func() []MetadataSpec { return nil }, tc.exemplarSpecs, + func() []InfoSpec { return nil }, testGRPCOpts, time.Minute) t.Run(tc.name, func(t *testing.T) { @@ -979,6 +1212,9 @@ func TestStoreSet_Rules_Discovery(t *testing.T) { return nil }, func() []ExemplarSpec { return nil }, + func() []InfoSpec { + return nil + }, testGRPCOpts, time.Minute) defer storeSet.Close() diff --git a/scripts/insecure_grpcurl_info.sh b/scripts/insecure_grpcurl_info.sh deleted file mode 100755 index 0d75b02053f..00000000000 --- a/scripts/insecure_grpcurl_info.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env bash - -DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -HELP=' -insecure_grpcurl_info.sh allows you to use call StoreAPI.Series gRPC method and receive streamed series in JSON format. - -Usage: - # Start some example Thanos component that exposes gRPC or use existing one. To start example one run: `thanos query &` - bash scripts/insecure_grpcurl_info.sh localhost:10901 '"'"'[{"type": 0, "name": "__name__", "value":"go_goroutines"}]'"'"' 0 10 -' - -INFO_API_HOSTPORT=$1 -if [ -z "${INFO_API_HOSTPORT}" ]; then - echo '$1 is missing (INFO_API_HOSTPORT). Expected host:port string for the target StoreAPI to grpcurl against, e.g. localhost:10901' - echo "${HELP}" - exit 1 -fi - -go install github.com/fullstorydev/grpcurl/cmd/grpcurl - -INFO_REQUEST='{}' - -GOGOPROTO_ROOT="$(GO111MODULE=on go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)" - -cd $DIR/../pkg/ || exit -grpcurl \ - -import-path="${GOGOPROTO_ROOT}" \ - -import-path=. \ - -proto=info/infopb/rpc.proto \ - -plaintext \ - -d="${INFO_REQUEST}" "${INFO_API_HOSTPORT}" thanos.Info/Info diff --git a/test/e2e/info_api_test.go b/test/e2e/info_api_test.go index 208ef2ee3ab..d02157680c0 100644 --- a/test/e2e/info_api_test.go +++ b/test/e2e/info_api_test.go @@ -5,54 +5,44 @@ package e2e_test import ( "context" + "path/filepath" "testing" + "time" "github.com/cortexproject/cortex/integration/e2e" - "github.com/thanos-io/thanos/pkg/info/infopb" - "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/prometheus/common/model" + "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" - "google.golang.org/grpc" ) -func TestInfoAPI_WithSidecar(t *testing.T) { +func TestInfo(t *testing.T) { t.Parallel() - netName := "e2e_test_info_with_sidecar" - - s, err := e2e.NewScenario(netName) + s, err := e2e.NewScenario("e2e_test_info") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) - prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar( - s.SharedDir(), - netName, - "prom", - defaultPromConfig("ha", 0, "", ""), - e2ethanos.DefaultPrometheusImage(), - ) + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_info", "alone1", defaultPromConfig("prom-alone1", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_info", "alone2", defaultPromConfig("prom-alone2", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage()) testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) + prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) - // Create grpc Client - conn, err := grpc.Dial(sidecar.GRPCEndpoint(), grpc.WithInsecure()) + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{}). + WithEndpoints([]string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}). + Build() testutil.Ok(t, err) - defer conn.Close() + testutil.Ok(t, s.StartAndWaitReady(q)) - client := infopb.NewInfoClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) - res, err := client.Info(context.Background(), &infopb.InfoRequest{}) - testutil.Ok(t, err) - testutil.Equals(t, res, infopb.InfoResponse{ - LabelSets: []labelpb.ZLabelSet{}, - ComponentType: infopb.ComponentType_SIDECAR, - StoreInfo: &infopb.StoreInfo{ - MinTime: -62167219200000, - MaxTime: 9223372036854775807, - }, - ExemplarsInfo: &infopb.ExemplarsInfo{ - MinTime: -9223309901257974, - MaxTime: 9223309901257974, - }, - }) + queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{}) }