Skip to content

Commit

Permalink
discovery: use thanos resolver for endpoint groups
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Jul 23, 2024
1 parent 639bf8f commit 01fe986
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 4 deletions.
21 changes: 17 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

extflag "github.com/efficientgo/tools/extkingpin"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -492,6 +493,20 @@ func runQuery(
}
}

dnsEndpointGroupProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg),
dns.ResolverType(dnsSDResolver),
)

// Register resolver for the "thanos://" scheme for grpc
resolver.Register(
dns.NewBuilder(
dnsEndpointGroupProvider,
dnsSDInterval,
),
)

dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
Expand Down Expand Up @@ -891,14 +906,12 @@ func prepareEndpointSet(
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(eg, false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(eg, true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

Expand Down
92 changes: 92 additions & 0 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package dns

import (
"context"
"sync"
"time"

grpcresolver "google.golang.org/grpc/resolver"
)

const (
schemeThanos = "thanos"
)

var (
_ grpcresolver.Builder = &builder{}
_ grpcresolver.Resolver = &resolver{}
)

type builder struct {
resolveInterval time.Duration
provider *Provider
}

func NewBuilder(provider *Provider, interval time.Duration) grpcresolver.Builder {
return &builder{
resolveInterval: interval,
provider: provider,
}
}

func (b *builder) Scheme() string { return schemeThanos }

func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grpcresolver.BuildOptions) (grpcresolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &resolver{
provider: b.provider,
target: t.Endpoint(),
ctx: ctx,
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
}
r.wg.Add(1)
go r.run()

return r, nil
}

type resolver struct {
provider *Provider

target string
ctx context.Context
cancel context.CancelFunc
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
}

func (r *resolver) Close() {
r.cancel()
r.wg.Wait()
}

func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {}

func (r *resolver) run() {
defer r.wg.Done()
for {
err := r.provider.Resolve(r.ctx, []string{r.target})
if err != nil {
r.cc.ReportError(err)
} else {
state := grpcresolver.State{}
for _, addr := range r.provider.AddressesForHost(r.target) {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
}
_ = r.cc.UpdateState(state)
}
select {
case <-r.ctx.Done():
return
case <-time.After(r.interval):
}
}
}
13 changes: 13 additions & 0 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,16 @@ func (p *Provider) Addresses() []string {
}
return result
}

// AddressesForHost returns the latest addresses present for the host in the Provider.
func (p *Provider) AddressesForHost(host string) []string {
p.RLock()
defer p.RUnlock()

addrs := p.resolved[host]

res := make([]string, len(addrs))
copy(res, addrs)

return res
}

0 comments on commit 01fe986

Please sign in to comment.