Skip to content

Commit

Permalink
add memcached autodiscovery support
Browse files Browse the repository at this point in the history
Signed-off-by: Roy Chiang <[email protected]>
  • Loading branch information
roystchiang committed Jul 26, 2021
1 parent 986c3cc commit 3644367
Show file tree
Hide file tree
Showing 6 changed files with 415 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.
- [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks.
- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support.

### Fixed

Expand Down
52 changes: 36 additions & 16 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,
}
)

Expand Down Expand Up @@ -114,6 +116,9 @@ type MemcachedClientConfig struct {

// DNSProviderUpdateInterval specifies the DNS discovery update interval.
DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"`

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`
}

func (c *MemcachedClientConfig) validate() error {
Expand Down Expand Up @@ -153,8 +158,8 @@ type memcachedClient struct {
// Name provides an identifier for the instantiated Client
name string

// DNS provider used to keep the memcached servers list updated.
dnsProvider *dns.Provider
// Address provider used to keep the memcached servers list updated.
addressProvider AddressProvider

// Channel used to notify internal goroutines when they should quit.
stop chan struct{}
Expand All @@ -177,6 +182,11 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec
}

type AddressProvider interface {
Resolve(context.Context, []string) error
Addresses() []string
}

type memcachedGetMultiResult struct {
items map[string]*memcache.Item
err error
Expand Down Expand Up @@ -220,20 +230,30 @@ func newMemcachedClient(
reg prometheus.Registerer,
name string,
) (*memcachedClient, error) {
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg)

var addressProvider AddressProvider
if config.AutoDiscovery {
addressProvider = memcacheDiscovery.NewProvider(
logger,
promRegisterer,
2*time.Second)
} else {
addressProvider = dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
}

c := &memcachedClient{
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
addressProvider: addressProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
Expand Down Expand Up @@ -561,11 +581,11 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
servers := c.addressProvider.Addresses()
if len(servers) == 0 {
return fmt.Errorf("no server address resolved for %s", c.name)
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package memcache

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
)

type Provider struct {
sync.RWMutex
resolver Resolver
clusterConfigs map[string]*ClusterConfig
logger log.Logger

configVersion *extprom.TxGaugeVec
resolvedAddresses *extprom.TxGaugeVec
resolverFailuresCount prometheus.Counter
resolverLookupsCount prometheus.Counter
}

func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider {
p := &Provider{
resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout},
clusterConfigs: map[string]*ClusterConfig{},
configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_config_version",
Help: "The current auto discovery config version",
}, []string{"addr"}),
resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_resolved_addresses",
Help: "The number of memcached nodes found via auto discovery",
}, []string{"addr"}),
resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_total",
Help: "The number of memcache auto discovery attempts",
}),
resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_failures_total",
Help: "The number of memcache auto discovery failures",
}),
}
return p
}

func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
clusterConfigs := map[string]*ClusterConfig{}
errs := errutil.MultiError{}

for _, address := range addresses {
clusterConfig, err := p.resolver.Resolve(ctx, address)
p.resolverLookupsCount.Inc()

if err != nil {
level.Warn(p.logger).Log(
"msg", "failed to perform auto-discovery for memcached",
"address", address,
)
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
}
}

p.Lock()
defer p.Unlock()

p.resolvedAddresses.ResetTx()
p.configVersion.ResetTx()
for address, config := range clusterConfigs {
p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes)))
p.configVersion.WithLabelValues(address).Set(float64(config.version))
}
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs

return errs.Err()
}

func (p *Provider) Addresses() []string {
var result []string
for _, config := range p.clusterConfigs {
for _, node := range config.nodes {
result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port))
}
}
return result
}
84 changes: 84 additions & 0 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package memcache

import (
"context"
"errors"
"sort"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestProviderUpdatesAddresses(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

err := provider.Resolve(ctx, clusters)
addresses := provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}
err = provider.Resolve(ctx, clusters)
addresses = provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
}

func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

err := provider.Resolve(ctx, clusters)
addresses := provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")
err = provider.Resolve(ctx, clusters)
addresses = provider.Addresses()
sort.Strings(addresses)

testutil.NotOk(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
}

type mockResolver struct {
configs map[string]*ClusterConfig
err error
}

func (r *mockResolver) Resolve(_ context.Context, address string) (*ClusterConfig, error) {
if r.err != nil {
return nil, r.err
}
return r.configs[address], nil
}
Loading

0 comments on commit 3644367

Please sign in to comment.