From dae758a359c2350e597756529d8f8821f19ef6e3 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Tue, 23 Jul 2024 19:55:09 +0200 Subject: [PATCH] discovery: use thanos resolver for endpoint groups Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 1 + cmd/thanos/query.go | 16 +++++-- pkg/discovery/dns/grpc.go | 88 +++++++++++++++++++++++++++++++++++ pkg/discovery/dns/provider.go | 13 ++++++ 4 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 pkg/discovery/dns/grpc.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 67d8e8714b3..c3f4b162995 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed - [#7494](https://github.com/thanos-io/thanos/pull/7494) Ruler: remove trailing period from SRV records returned by discovery `dnsnosrva` lookups +- [#7567](https://github.com/thanos-io/thanos/pull/7565) Query: Use thanos resolver for endpoint groups. ### Removed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4b27dd29432..e4311647e13 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -492,6 +492,16 @@ func runQuery( } } + // Register resolver for the "thanos:///" scheme for endpoint-groups + dns.RegisterGRPCResolver( + dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), + dns.ResolverType(dnsSDResolver), + ), + dnsSDInterval, + ) + dnsEndpointProvider := dns.NewProvider( logger, extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), @@ -891,14 +901,12 @@ func prepareEndpointSet( } for _, eg := range endpointGroupAddrs { - addr := fmt.Sprintf("dns:///%s", eg) - spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...) + spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...) specs = append(specs, spec) } for _, eg := range strictEndpointGroups { - addr := fmt.Sprintf("dns:///%s", eg) - spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...) + spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...) specs = append(specs, spec) } diff --git a/pkg/discovery/dns/grpc.go b/pkg/discovery/dns/grpc.go new file mode 100644 index 00000000000..b1fe0771bd0 --- /dev/null +++ b/pkg/discovery/dns/grpc.go @@ -0,0 +1,88 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package dns + +import ( + "context" + "sync" + "time" + + grpcresolver "google.golang.org/grpc/resolver" +) + +var ( + _ grpcresolver.Builder = &builder{} + _ grpcresolver.Resolver = &resolver{} +) + +type builder struct { + resolveInterval time.Duration + provider *Provider +} + +func RegisterGRPCResolver(provider *Provider, interval time.Duration) { + grpcresolver.Register(&builder{ + resolveInterval: interval, + provider: provider, + }) +} + +func (b *builder) Scheme() string { return "thanos" } + +func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grpcresolver.BuildOptions) (grpcresolver.Resolver, error) { + ctx, cancel := context.WithCancel(context.Background()) + r := &resolver{ + provider: b.provider, + target: t.Endpoint(), + ctx: ctx, + cancel: cancel, + cc: cc, + interval: b.resolveInterval, + } + r.wg.Add(1) + go r.run() + + return r, nil +} + +type resolver struct { + provider *Provider + + target string + ctx context.Context + cancel context.CancelFunc + cc grpcresolver.ClientConn + interval time.Duration + + wg sync.WaitGroup +} + +func (r *resolver) Close() { + r.cancel() + r.wg.Wait() +} + +func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {} + +func (r *resolver) run() { + defer r.wg.Done() + for { + err := r.provider.Resolve(r.ctx, []string{r.target}) + if err != nil { + r.cc.ReportError(err) + } else { + state := grpcresolver.State{} + for _, addr := range r.provider.AddressesForHost(r.target) { + raddr := grpcresolver.Address{Addr: addr} + state.Addresses = append(state.Addresses, raddr) + } + _ = r.cc.UpdateState(state) + } + select { + case <-r.ctx.Done(): + return + case <-time.After(r.interval): + } + } +} diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 3ec032a654d..8f42bf4d269 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -164,3 +164,16 @@ func (p *Provider) Addresses() []string { } return result } + +// AddressesForHost returns the latest addresses present for the host in the Provider. +func (p *Provider) AddressesForHost(host string) []string { + p.RLock() + defer p.RUnlock() + + addrs := p.resolved[host] + + res := make([]string, len(addrs)) + copy(res, addrs) + + return res +}