diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index 36f7b7db32..184fbfadd5 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -3,16 +3,9 @@ package main import ( "fmt" "io/ioutil" - "net" - "strconv" "strings" - "time" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/improbable-eng/thanos/pkg/cluster" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -36,102 +29,22 @@ func regGRPCFlags(cmd *kingpin.CmdClause) ( grpcTLSSrvClientCA } +// TODO(povilasv): we don't need this anymore. func regCommonServerFlags(cmd *kingpin.CmdClause) ( grpcBindAddr *string, httpBindAddr *string, grpcTLSSrvCert *string, grpcTLSSrvKey *string, - grpcTLSSrvClientCA *string, - peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) { + grpcTLSSrvClientCA *string) { httpBindAddr = regHTTPAddrFlag(cmd) grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd) - grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Deprecated(gossip will be removed from v0.5.0): Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used."). - String() - - clusterBindAddr := cmd.Flag("cluster.address", "Deprecated(gossip will be removed from v0.5.0): Listen ip:port address for gossip cluster."). - Default("0.0.0.0:10900").String() - - clusterAdvertiseAddr := cmd.Flag("cluster.advertise-address", "Deprecated(gossip will be removed from v0.5.0): Explicit (external) ip:port address to advertise for gossip in gossip cluster. Used internally for membership only."). - String() - - peers := cmd.Flag("cluster.peers", "Deprecated(gossip will be removed from v0.5.0): Initial peers to join the cluster. It can be either , or . A lookup resolution is done only at the startup.").Strings() - - gossipInterval := modelDuration(cmd.Flag("cluster.gossip-interval", "Deprecated(gossip will be removed from v0.5.0): Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth. Default is used from a specified network-type."). - PlaceHolder("")) - - pushPullInterval := modelDuration(cmd.Flag("cluster.pushpull-interval", "Deprecated(gossip will be removed from v0.5.0): Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage. Default is used from a specified network-type."). - PlaceHolder("")) - - refreshInterval := modelDuration(cmd.Flag("cluster.refresh-interval", "Deprecated(gossip will be removed from v0.5.0): Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String())) - - secretKey := cmd.Flag("cluster.secret-key", "Deprecated(gossip will be removed from v0.5.0): Initial secret key to encrypt cluster gossip. Can be one of AES-128, AES-192, or AES-256 in hexadecimal format.").HexBytes() - - networkType := cmd.Flag("cluster.network-type", - fmt.Sprintf("Deprecated(gossip will be removed from v0.5.0): Network type with predefined peers configurations. Sets of configurations accounting the latency differences between network types: %s.", - strings.Join(cluster.NetworkPeerTypes, ", "), - ), - ). - Default(cluster.LanNetworkPeerType). - Enum(cluster.NetworkPeerTypes...) - - gossipDisabled := cmd.Flag("cluster.disable", "Deprecated(gossip will be removed from v0.5.0): If true gossip will be disabled and no cluster related server will be started.").Default("true").Bool() return grpcBindAddr, httpBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, - grpcTLSSrvClientCA, - func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (cluster.Peer, error) { - if *gossipDisabled { - level.Info(logger).Log("msg", "gossip is disabled") - return cluster.NewNoop(), nil - } - - host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr) - if err != nil { - return nil, errors.Wrapf(err, "calculate advertise StoreAPI addr for gossip based on bindAddr: %s and advAddr: %s", *grpcBindAddr, *grpcAdvertiseAddr) - } - - advStoreAPIAddress := net.JoinHostPort(host, strconv.Itoa(port)) - if cluster.IsUnroutable(advStoreAPIAddress) { - level.Warn(logger).Log("msg", "this component advertises its gRPC StoreAPI on an unroutable address. This will not work cross-cluster", "addr", advStoreAPIAddress) - level.Warn(logger).Log("msg", "provide --grpc-address as routable ip:port or --grpc-advertise-address as a routable host:port") - } - - level.Info(logger).Log("msg", "StoreAPI address that will be propagated through gossip", "address", advStoreAPIAddress) - - advQueryAPIAddress := httpAdvertiseAddr - if queryAPIEnabled { - host, port, err := cluster.CalculateAdvertiseAddress(*httpBindAddr, advQueryAPIAddress) - if err != nil { - return nil, errors.Wrapf(err, "calculate advertise QueryAPI addr for gossip based on bindAddr: %s and advAddr: %s", *httpBindAddr, advQueryAPIAddress) - } - - advQueryAPIAddress = net.JoinHostPort(host, strconv.Itoa(port)) - if cluster.IsUnroutable(advQueryAPIAddress) { - level.Warn(logger).Log("msg", "this component advertises its HTTP QueryAPI on an unroutable address. This will not work cross-cluster", "addr", advQueryAPIAddress) - level.Warn(logger).Log("msg", "provide --http-address as routable ip:port or --http-advertise-address as a routable host:port") - } - - level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress) - } - - return cluster.New(logger, - reg, - *clusterBindAddr, - *clusterAdvertiseAddr, - advStoreAPIAddress, - advQueryAPIAddress, - *peers, - waitIfEmpty, - time.Duration(*gossipInterval), - time.Duration(*pushPullInterval), - time.Duration(*refreshInterval), - *secretKey, - *networkType, - ) - } + grpcTLSSrvClientCA } func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string { diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 1e229bc789..e5b4291fb7 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -16,7 +16,6 @@ import ( "github.com/go-kit/kit/log/level" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/component" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" @@ -46,10 +45,7 @@ import ( func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") - grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA, newPeerFn := regCommonServerFlags(cmd) - - httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used."). - String() + grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA := regCommonServerFlags(cmd) secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() @@ -102,10 +98,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms")) m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { - peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true) - if err != nil { - return errors.Wrap(err, "new cluster peer") - } selectorLset, err := parseFlagLabels(*selectorLabels) if err != nil { return errors.Wrap(err, "parse federation labels") @@ -153,7 +145,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string time.Duration(*queryTimeout), time.Duration(*storeResponseTimeout), *replicaLabel, - peer, selectorLset, *stores, *enableAutodownsampling, @@ -271,7 +262,6 @@ func runQuery( queryTimeout time.Duration, storeResponseTimeout time.Duration, replicaLabel string, - peer cluster.Peer, selectorLset labels.Labels, storeAddrs []string, enableAutodownsampling bool, @@ -305,16 +295,6 @@ func runQuery( logger, reg, func() (specs []query.StoreSpec) { - // Add store specs from gossip. - for id, ps := range peer.PeerStates(cluster.PeerTypesStoreAPIs()...) { - if ps.StoreAPIAddr == "" { - level.Error(logger).Log("msg", "Gossip found peer that propagates empty address, ignoring.", "lset", fmt.Sprintf("%v", ps.Metadata.Labels)) - continue - } - - specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, stateFetcher: peer}) - } - // Add DNS resolved addresses from static flags and file SD. for _, addr := range dnsProvider.Addresses() { specs = append(specs, query.NewGRPCStoreSpec(addr)) @@ -388,21 +368,6 @@ func runQuery( close(fileSDUpdates) }) } - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - // New gossip cluster. - if err := peer.Join(cluster.PeerTypeQuery, cluster.PeerMetadata{}); err != nil { - return errors.Wrap(err, "join cluster") - } - - <-ctx.Done() - return nil - }, func(error) { - cancel() - peer.Close(5 * time.Second) - }) - } // Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary. { ctx, cancel := context.WithCancel(context.Background()) @@ -507,23 +472,3 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co } return deduplicated } - -type gossipSpec struct { - id string - addr string - - stateFetcher cluster.PeerStateFetcher -} - -func (s *gossipSpec) Addr() string { - return s.addr -} - -// Metadata method for gossip store tries get current peer state. -func (s *gossipSpec) Metadata(_ context.Context, _ storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error) { - state, ok := s.stateFetcher.PeerState(s.id) - if !ok { - return nil, 0, 0, errors.Errorf("peer %s is no longer in gossip cluster", s.id) - } - return state.Metadata.Labels, state.Metadata.MinTime, state.Metadata.MaxTime, nil -} diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e4085d9caa..737aad81ff 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "math" "math/rand" "net" "net/http" @@ -12,7 +11,6 @@ import ( "os/signal" "path" "path/filepath" - "sort" "strconv" "strings" "sync" @@ -23,7 +21,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" "github.com/improbable-eng/thanos/pkg/block/metadata" - "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/component" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" @@ -60,7 +57,7 @@ import ( func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket") - grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd) labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated). Similar to external labels for Prometheus, used to identify ruler and its blocks as unique source."). PlaceHolder("=\"\"").Strings() @@ -112,10 +109,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) if err != nil { return errors.Wrap(err, "parse labels") } - peer, err := newPeerFn(logger, reg, false, "", false) - if err != nil { - return errors.Wrap(err, "new cluster peer") - } alertQueryURL, err := url.Parse(*alertQueryURL) if err != nil { return errors.Wrap(err, "parse alert query url") @@ -146,8 +139,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) fileSD = file.NewDiscovery(conf, logger) } - if len(*queries) < 1 && peer.Name() == "no gossip" && fileSD == nil { - return errors.Errorf("Gossip is disabled and no --query parameter was given.") + if fileSD == nil { + return errors.Errorf("No --query parameter was given.") } return runRule(g, @@ -168,7 +161,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) time.Duration(*evalInterval), *dataDir, *ruleFiles, - peer, objStoreConfig, tsdbOpts, alertQueryURL, @@ -202,7 +194,6 @@ func runRule( evalInterval time.Duration, dataDir string, ruleFiles []string, - peer cluster.Peer, objStoreConfig *pathOrContent, tsdbOpts *tsdb.Options, alertQueryURL *url.URL, @@ -326,7 +317,7 @@ func runRule( opts := opts opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) opts.Context = ctx - opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, ruleEvalWarnings, s) + opts.QueryFunc = queryFunc(logger, dnsProvider, duplicatedQuery, ruleEvalWarnings, s) ruleMgrs[s] = rules.NewManager(&opts) g.Add(func() error { @@ -340,32 +331,6 @@ func runRule( }) } } - { - var storeLset []storepb.Label - for _, l := range lset { - storeLset = append(storeLset, storepb.Label{Name: l.Name, Value: l.Value}) - } - - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - // New gossip cluster. - if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{ - Labels: storeLset, - // Start out with the full time range. The shipper will constrain it later. - // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. - MinTime: 0, - MaxTime: math.MaxInt64, - }); err != nil { - return errors.Wrap(err, "join cluster") - } - - <-ctx.Done() - return nil - }, func(error) { - cancel() - peer.Close(5 * time.Second) - }) - } { // TODO(bwplotka): https://github.com/improbable-eng/thanos/issues/660 sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout) @@ -622,13 +587,6 @@ func runRule( if _, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err) } - - minTime, _, err := s.Timestamps() - if err != nil { - level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) - } else { - peer.SetTimestamps(minTime, math.MaxInt64) - } return nil }) }, func(error) { @@ -636,7 +594,7 @@ func runRule( }) } - level.Info(logger).Log("msg", "starting rule node", "peer", peer.Name()) + level.Info(logger).Log("msg", "starting rule node") return nil } @@ -765,7 +723,6 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus. // back or the context get canceled. func queryFunc( logger log.Logger, - peer cluster.Peer, dnsProvider *dns.Provider, duplicatedQuery prometheus.Counter, ruleEvalWarnings *prometheus.CounterVec, @@ -784,24 +741,9 @@ func queryFunc( } return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - var addrs []string - - // Add addresses from gossip. - peers := peer.PeerStates(cluster.PeerTypeQuery) - var ids []string - for id := range peers { - ids = append(ids, id) - } - sort.Slice(ids, func(i int, j int) bool { - return strings.Compare(ids[i], ids[j]) < 0 - }) - for _, id := range ids { - addrs = append(addrs, peers[id].QueryAPIAddr) - } - // Add DNS resolved addresses from static flags and file SD. // TODO(bwplotka): Consider generating addresses in *url.URL - addrs = append(addrs, dnsProvider.Addresses()...) + addrs := dnsProvider.Addresses() removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 58087b2f8c..f8fc4750ec 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/kit/log/level" version "github.com/hashicorp/go-version" "github.com/improbable-eng/thanos/pkg/block/metadata" - "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/component" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/promclient" @@ -35,7 +34,7 @@ import ( func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "sidecar for Prometheus server") - grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd) promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URL() @@ -63,10 +62,6 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri *reloaderCfgOutputFile, *reloaderRuleDirs, ) - peer, err := newPeerFn(logger, reg, false, "", false) - if err != nil { - return errors.Wrap(err, "new cluster peer") - } return runSidecar( g, logger, @@ -80,7 +75,6 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri *promURL, *dataDir, objStoreConfig, - peer, rl, *uploadCompacted, ) @@ -100,7 +94,6 @@ func runSidecar( promURL *url.URL, dataDir string, objStoreConfig *pathOrContent, - peer cluster.Peer, reloader *reloader.Reloader, uploadCompacted bool, ) error { @@ -189,16 +182,6 @@ func runSidecar( return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured") } - // New gossip cluster. - mint, maxt := m.Timestamps() - if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{ - Labels: m.LabelsPB(), - MinTime: mint, - MaxTime: maxt, - }); err != nil { - return errors.Wrap(err, "join cluster") - } - // Periodically query the Prometheus config. We use this as a heartbeat as well as for updating // the external labels we apply. return runutil.Repeat(30*time.Second, ctx.Done(), func() error { @@ -209,9 +192,6 @@ func runSidecar( level.Warn(logger).Log("msg", "heartbeat failed", "err", err) promUp.Set(0) } else { - // Update gossip. - peer.SetLabels(m.LabelsPB()) - promUp.Set(1) lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9) } @@ -220,7 +200,6 @@ func runSidecar( }) }, func(error) { cancel() - peer.Close(2 * time.Second) }) } { @@ -305,9 +284,6 @@ func runSidecar( level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) } else { m.UpdateTimestamps(minTime, math.MaxInt64) - - mint, maxt := m.Timestamps() - peer.SetTimestamps(mint, maxt) } return nil }) @@ -316,7 +292,7 @@ func runSidecar( }) } - level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name()) + level.Info(logger).Log("msg", "starting sidecar") return nil } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index c135956403..8a7cfcf144 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -2,13 +2,11 @@ package main import ( "context" - "math" "net" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/store" @@ -26,7 +24,7 @@ import ( func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.") - grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). Default("./data").String() @@ -52,10 +50,6 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string Default("20").Int() m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { - peer, err := newPeerFn(logger, reg, false, "", false) - if err != nil { - return errors.Wrap(err, "new cluster peer") - } return runStore(g, logger, reg, @@ -67,7 +61,6 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string *key, *clientCA, *httpBindAddr, - peer, uint64(*indexCacheSize), uint64(*chunkPoolSize), uint64(*maxSampleCount), @@ -93,7 +86,6 @@ func runStore( key string, clientCA string, httpBindAddr string, - peer cluster.Peer, indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, maxSampleCount uint64, @@ -163,7 +155,6 @@ func runStore( if err := bs.SyncBlocks(ctx); err != nil { level.Warn(logger).Log("msg", "syncing blocks failed", "err", err) } - peer.SetTimestamps(bs.TimeRange()) return nil }) @@ -193,27 +184,6 @@ func runStore( runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) } - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - // New gossip cluster. - if err := peer.Join( - cluster.PeerTypeStore, - cluster.PeerMetadata{ - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - }, - ); err != nil { - return errors.Wrap(err, "join cluster") - } - - <-ctx.Done() - return nil - }, func(error) { - cancel() - peer.Close(5 * time.Second) - }) - } if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { return err } diff --git a/go.mod b/go.mod index 5ed4dedc09..5df9b578e1 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/gophercloud/gophercloud v0.0.0-20181206160319-9d88c34913a9 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117 - github.com/hashicorp/go-sockaddr v1.0.0 github.com/hashicorp/go-version v1.2.0 github.com/hashicorp/golang-lru v0.5.1 github.com/hashicorp/memberlist v0.1.3 diff --git a/go.sum b/go.sum index 4b637c57d4..5a651f3395 100644 --- a/go.sum +++ b/go.sum @@ -9,7 +9,8 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a github.com/Azure/azure-sdk-for-go v23.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c h1:Y5ueznoCekgCWBytF1Q9lTpZ3tJeX37dQtCcGjMCLYI= github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= -github.com/Azure/go-autorest v11.2.8+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/Azure/go-autorest v10.8.1+incompatible h1:u0jVQf+a6k6x8A+sT60l6EY9XZu+kHdnZVPAYqpVRo0= +github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= @@ -163,12 +164,18 @@ github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8Bz github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +<<<<<<< HEAD github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.4/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.8 h1:1QYRAKU3lN5cRfLCkPU08hwvLJFhvjP6MqNMmQz6ZVI= github.com/miekg/dns v1.1.8/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/minio-go v0.0.0-20200511070425-f33eae714a28 h1:SOCqV8mmu2Aqo2MktoZkHq17+jklWGlXsy0uf5q0ckM= github.com/minio/minio-go v0.0.0-20200511070425-f33eae714a28/go.mod h1:/haSOWG8hQNx2+JOfLJ9GKp61EAmgPwRVw/Sac0NzaM= +======= +github.com/miekg/dns v1.0.4/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 h1:jw16EimP5oAEM/2wt+SiEUov/YDyTCTDuPtIKgQIvk0= +github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1/go.mod h1:vuvdOZLJuf5HmJAJrKV64MmozrSsk+or0PB5dzdfspg= +>>>>>>> Remove gossip github.com/mitchellh/go-homedir v0.0.0-20180523094522-3864e76763d9/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -183,6 +190,7 @@ github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSr github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 h1:F9x/1yl3T2AeKLr2AMdilSD8+f9bvMnNN8VS5iDtovc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/oklog v0.0.0-20170918173356-f857583a70c3 h1:K/4JIfLQRyyHKJGd6ZfHply0GYxMuiqLCGuA8904lJk= github.com/oklog/oklog v0.0.0-20170918173356-f857583a70c3/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go deleted file mode 100644 index 67470b0596..0000000000 --- a/pkg/cluster/cluster.go +++ /dev/null @@ -1,569 +0,0 @@ -package cluster - -import ( - "context" - stdlog "log" - "math/rand" - "net" - "strconv" - "strings" - "sync" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/hashicorp/memberlist" - "github.com/improbable-eng/thanos/pkg/runutil" - "github.com/improbable-eng/thanos/pkg/store/storepb" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" -) - -type PeerStateFetcher interface { - PeerState(id string) (PeerState, bool) -} - -type Peer interface { - PeerStateFetcher - - Name() string - SetLabels(labels []storepb.Label) - SetTimestamps(mint int64, maxt int64) - Join(peerType PeerType, initialMetadata PeerMetadata) error - PeerStates(types ...PeerType) map[string]PeerState - Close(timeout time.Duration) -} - -// Peer is a single peer in a gossip cluster. -type peer struct { - logger log.Logger - mlistMtx sync.RWMutex - mlist *memberlist.Memberlist - stopc chan struct{} - - cfg *memberlist.Config - knownPeers []string - advertiseAddr string - refreshInterval time.Duration - - data *data - gossipMsgsReceived prometheus.Counter - gossipClusterMembers prometheus.Gauge - - // Own External gRPC StoreAPI host:port (if any) to propagate to other peers. - advertiseStoreAPIAddr string - // Own External HTTP QueryAPI host:port (if any) to propagate to other peers. - advertiseQueryAPIAddress string -} - -const ( - DefaultRefreshInterval = model.Duration(60 * time.Second) - - // Peer's network types. These are used as a predefined peer configurations for a specified network type. - LocalNetworkPeerType = "local" - LanNetworkPeerType = "lan" - WanNetworkPeerType = "wan" -) - -var ( - // NetworkPeerTypes is a list of available peers' network types. - NetworkPeerTypes = []string{LocalNetworkPeerType, LanNetworkPeerType, WanNetworkPeerType} -) - -// PeerType describes a peer's role in the cluster. -type PeerType string - -// Constants holding valid PeerType values. -const ( - // PeerTypeStore is for peers that implements StoreAPI and are used for browsing historical data. - PeerTypeStore = "store" - // PeerTypeSource is for peers that implements StoreAPI and are used for scraping data. They tend to - // have data accessible only for short period. - PeerTypeSource = "source" - - // PeerTypeQuery is for peers that implements QueryAPI and are used for querying the metrics. - PeerTypeQuery = "query" -) - -// PeerState contains state for the peer. -type PeerState struct { - // Type represents type of the peer holding the state. - Type PeerType - - // StoreAPIAddr is a host:port address of gRPC StoreAPI of the peer holding the state. Required for PeerTypeSource and PeerTypeStore. - StoreAPIAddr string - // QueryAPIAddr is a host:port address of HTTP QueryAPI of the peer holding the state. Required for PeerTypeQuery type only. - QueryAPIAddr string - - // Metadata holds metadata of the peer holding the state. - Metadata PeerMetadata -} - -// PeerMetadata are the information that can change in runtime of the peer. -type PeerMetadata struct { - // Labels represents external labels for the peer. Only relevant for PeerTypeSource. Empty for other types. - Labels []storepb.Label - - // MinTime indicates the minTime of the oldest block available from this peer. - MinTime int64 - // MaxTime indicates the maxTime of the youngest block available from this peer. - MaxTime int64 -} - -// New returns "alone" peer that is ready to join. -func New( - l log.Logger, - reg *prometheus.Registry, - bindAddr string, - advertiseAddr string, - advertiseStoreAPIAddr string, - advertiseQueryAPIAddress string, - knownPeers []string, - waitIfEmpty bool, - pushPullInterval time.Duration, - gossipInterval time.Duration, - refreshInterval time.Duration, - secretKey []byte, - networkType string, -) (*peer, error) { - l = log.With(l, "component", "cluster") - - bindHost, bindPortStr, err := net.SplitHostPort(bindAddr) - if err != nil { - return nil, err - } - bindPort, err := strconv.Atoi(bindPortStr) - if err != nil { - return nil, errors.Wrap(err, "invalid listen address") - } - - // Best-effort deduction of advertise address. - advertiseHost, advertisePort, err := CalculateAdvertiseAddress(bindAddr, advertiseAddr) - if err != nil { - level.Warn(l).Log("err", "couldn't deduce an advertise address: "+err.Error()) - } - - if IsUnroutable(advertiseHost) { - level.Warn(l).Log("err", "this node advertises itself on an unroutable address", "host", advertiseHost, "port", advertisePort) - level.Warn(l).Log("err", "this node will be unreachable in the cluster") - level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname") - } - - resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, *net.DefaultResolver, waitIfEmpty) - if err != nil { - return nil, errors.Wrap(err, "resolve peers") - } - level.Debug(l).Log("msg", "resolved peers to following addresses", "peers", strings.Join(resolvedPeers, ",")) - - // TODO(fabxc): generate human-readable but random names? - name, err := ulid.New(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) - if err != nil { - return nil, err - } - - cfg, err := parseNetworkConfig(networkType) - if err != nil { - return nil, err - } - cfg.Name = name.String() - cfg.BindAddr = bindHost - cfg.BindPort = bindPort - if gossipInterval != 0 { - cfg.GossipInterval = gossipInterval - } - if pushPullInterval != 0 { - cfg.PushPullInterval = pushPullInterval - } - cfg.Logger = stdlog.New(log.NewStdlibAdapter(level.Debug(l)), "peers", stdlog.LstdFlags) - cfg.SecretKey = secretKey - if advertiseAddr != "" { - cfg.AdvertiseAddr = advertiseHost - cfg.AdvertisePort = advertisePort - } - - gossipMsgsReceived := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_gossip_messages_received_total", - Help: "Total gossip NotifyMsg calls.", - }) - gossipClusterMembers := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "thanos_cluster_members", - Help: "Number indicating current number of members in cluster.", - }) - - reg.MustRegister(gossipMsgsReceived) - reg.MustRegister(gossipClusterMembers) - - return &peer{ - logger: l, - knownPeers: knownPeers, - cfg: cfg, - refreshInterval: refreshInterval, - gossipMsgsReceived: gossipMsgsReceived, - gossipClusterMembers: gossipClusterMembers, - stopc: make(chan struct{}), - data: &data{data: map[string]PeerState{}}, - advertiseAddr: advertiseAddr, - advertiseStoreAPIAddr: advertiseStoreAPIAddr, - advertiseQueryAPIAddress: advertiseQueryAPIAddress, - }, nil -} - -// Join joins to the memberlist gossip cluster using knownPeers and given peerType and initialMetadata. -func (p *peer) Join(peerType PeerType, initialMetadata PeerMetadata) error { - if p.hasJoined() { - return errors.New("peer already joined; close it first to rejoin") - } - - var ml *memberlist.Memberlist - d := newDelegate(p.logger, ml.NumMembers, p.data, p.gossipMsgsReceived, p.gossipClusterMembers) - p.cfg.Delegate = d - p.cfg.Events = d - - ml, err := memberlist.Create(p.cfg) - if err != nil { - return errors.Wrap(err, "create memberlist") - } - - n, err := ml.Join(p.knownPeers) - if err != nil { - level.Error(p.logger).Log("msg", "none of the peers was can be reached", "peerType", peerType, "knownPeers", strings.Join(p.knownPeers, ","), "err", err) - } else { - level.Debug(p.logger).Log("msg", "joined cluster", "peerType", peerType, "knownPeers", strings.Join(p.knownPeers, ",")) - } - - if n > 0 { - go warnIfAlone(p.logger, 10*time.Second, p.stopc, ml.NumMembers) - } - - p.mlistMtx.Lock() - p.mlist = ml - p.mlistMtx.Unlock() - - // Initialize state with ourselves. - p.data.Set(p.Name(), PeerState{ - Type: peerType, - StoreAPIAddr: p.advertiseStoreAPIAddr, - QueryAPIAddr: p.advertiseQueryAPIAddress, - Metadata: initialMetadata, - }) - - if p.refreshInterval != 0 { - go p.periodicallyRefresh() - } - - return nil -} - -func (p *peer) periodicallyRefresh() { - tick := time.NewTicker(p.refreshInterval) - defer tick.Stop() - - for { - select { - case <-p.stopc: - return - case <-tick.C: - if err := p.Refresh(); err != nil { - level.Error(p.logger).Log("msg", "Refreshing memberlist", "err", err) - } - } - } -} - -// Refresh renews membership cluster, this will refresh DNS names and join newly added members -func (p *peer) Refresh() error { - p.mlistMtx.Lock() - defer p.mlistMtx.Unlock() - - if p.mlist == nil { - return nil - } - - resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, *net.DefaultResolver, false) - if err != nil { - return errors.Wrapf(err, "refresh cluster could not resolve peers: %v", resolvedPeers) - } - - currMembers := p.mlist.Members() - var notConnected []string - for _, peer := range resolvedPeers { - var isPeerFound bool - - for _, mem := range currMembers { - if mem.Address() == peer { - isPeerFound = true - break - } - } - - if !isPeerFound { - notConnected = append(notConnected, peer) - } - } - - if len(notConnected) == 0 { - level.Debug(p.logger).Log("msg", "refresh cluster done", "peers", strings.Join(p.knownPeers, ","), "resolvedPeers", strings.Join(resolvedPeers, ",")) - return nil - } - - curr, err := p.mlist.Join(notConnected) - if err != nil { - return errors.Wrapf(err, "join peers %s ", strings.Join(notConnected, ",")) - } - - level.Debug(p.logger).Log("msg", "refresh cluster done, peers joined", "peers", strings.Join(notConnected, ","), "before", len(currMembers), "after", curr) - return nil -} - -func (p *peer) hasJoined() bool { - p.mlistMtx.RLock() - defer p.mlistMtx.RUnlock() - - return p.mlist != nil -} - -func warnIfAlone(logger log.Logger, d time.Duration, stopc chan struct{}, numNodes func() int) { - tick := time.NewTicker(d) - defer tick.Stop() - - for { - select { - case <-stopc: - return - case <-tick.C: - if n := numNodes(); n <= 1 { - level.Warn(logger).Log("NumMembers", n, "msg", "I appear to be alone in the cluster") - } - } - } -} - -// SetLabels updates internal metadata's labels stored in PeerState for this peer. -// Note that this data will be propagated based on gossipInterval we set. -func (p *peer) SetLabels(labels []storepb.Label) { - if !p.hasJoined() { - return - } - - s, _ := p.data.Get(p.Name()) - s.Metadata.Labels = labels - p.data.Set(p.Name(), s) -} - -// SetTimestamps updates internal metadata's timestamps stored in PeerState for this peer. -// Note that this data will be propagated based on gossipInterval we set. -func (p *peer) SetTimestamps(mint int64, maxt int64) { - if !p.hasJoined() { - return - } - - s, _ := p.data.Get(p.Name()) - s.Metadata.MinTime = mint - s.Metadata.MaxTime = maxt - p.data.Set(p.Name(), s) -} - -// Close leaves the cluster waiting up to timeout and shutdowns peer if cluster left. -// TODO(bplotka): Add this method into run.Group closing logic for each command. This will improve graceful shutdown. -func (p *peer) Close(timeout time.Duration) { - if !p.hasJoined() { - return - } - - if err := p.mlist.Leave(timeout); err != nil { - level.Error(p.logger).Log("msg", "memberlist leave failed", "err", err) - } - close(p.stopc) - if err := p.mlist.Shutdown(); err != nil { - level.Error(p.logger).Log("msg", "memberlist shutdown failed", "err", err) - } - p.mlist = nil -} - -// Name returns the unique ID of this peer in the cluster. -func (p *peer) Name() string { - if !p.hasJoined() { - return "" - } - - return p.mlist.LocalNode().Name -} - -// PeerTypesStoreAPIs gives a PeerType that allows all types that exposes StoreAPI. -func PeerTypesStoreAPIs() []PeerType { - return []PeerType{PeerTypeStore, PeerTypeSource} -} - -// PeerStates returns the custom state information for each peer by memberlist peer id (name). -func (p *peer) PeerStates(types ...PeerType) map[string]PeerState { - if !p.hasJoined() { - return nil - } - - ps := map[string]PeerState{} - for _, o := range p.mlist.Members() { - os, ok := p.data.Get(o.Name) - if !ok { - continue - } - - if len(types) == 0 { - ps[o.Name] = os - continue - } - for _, t := range types { - if os.Type == t { - ps[o.Name] = os - break - } - } - } - return ps -} - -// PeerState returns the custom state information by memberlist peer name. -func (p *peer) PeerState(id string) (PeerState, bool) { - if !p.hasJoined() { - return PeerState{}, false - } - - ps, ok := p.data.Get(id) - if !ok { - return PeerState{}, false - } - return ps, true -} - -// Info returns a JSON-serializable dump of cluster state. -// Useful for debug. -func (p *peer) Info() map[string]interface{} { - if !p.hasJoined() { - return nil - } - - d := map[string]PeerState{} - for k, v := range p.data.Data() { - d[k] = v - } - - return map[string]interface{}{ - "self": p.mlist.LocalNode(), - "members": p.mlist.Members(), - "n": p.mlist.NumMembers(), - "state": d, - } -} - -func resolvePeers(ctx context.Context, peers []string, myAddress string, res net.Resolver, waitIfEmpty bool) ([]string, error) { - var resolvedPeers []string - - for _, peer := range peers { - host, port, err := net.SplitHostPort(peer) - if err != nil { - return nil, errors.Wrapf(err, "split host/port for peer %s", peer) - } - - ips, err := res.LookupIPAddr(ctx, host) - if err != nil { - // Assume direct address. - resolvedPeers = append(resolvedPeers, peer) - continue - } - - if len(ips) == 0 { - var lookupErrSpotted bool - retryCtx, cancel := context.WithCancel(ctx) - defer cancel() - - err := runutil.Retry(2*time.Second, retryCtx.Done(), func() error { - if lookupErrSpotted { - // We need to invoke cancel in next run of retry when lookupErrSpotted to preserve LookupIPAddr error. - cancel() - } - - ips, err = res.LookupIPAddr(retryCtx, host) - if err != nil { - lookupErrSpotted = true - return errors.Wrapf(err, "IP Addr lookup for peer %s", peer) - } - - ips = removeMyAddr(ips, port, myAddress) - if len(ips) == 0 { - if !waitIfEmpty { - return nil - } - return errors.New("empty IPAddr result. Retrying") - } - - return nil - }) - if err != nil { - return nil, err - } - } - - for _, ip := range ips { - resolvedPeers = append(resolvedPeers, net.JoinHostPort(ip.String(), port)) - } - } - - return resolvedPeers, nil -} - -func removeMyAddr(ips []net.IPAddr, targetPort string, myAddr string) []net.IPAddr { - var result []net.IPAddr - - for _, ip := range ips { - if net.JoinHostPort(ip.String(), targetPort) == myAddr { - continue - } - result = append(result, ip) - } - - return result -} - -func IsUnroutable(host string) bool { - if ip := net.ParseIP(host); ip != nil && (ip.IsUnspecified() || ip.IsLoopback()) { - return true // typically 0.0.0.0 or localhost - } else if ip == nil && strings.ToLower(host) == "localhost" { - return true - } - return false -} - -func parseNetworkConfig(networkType string) (*memberlist.Config, error) { - var mc *memberlist.Config - - switch networkType { - case LanNetworkPeerType: - mc = memberlist.DefaultLANConfig() - case WanNetworkPeerType: - mc = memberlist.DefaultWANConfig() - case LocalNetworkPeerType: - mc = memberlist.DefaultLocalConfig() - default: - return nil, errors.Errorf("unexpected network type %s, should be one of: %s", - networkType, - strings.Join(NetworkPeerTypes, ", "), - ) - } - - return mc, nil -} - -func NewNoop() Peer { - return noopPeer{} -} - -type noopPeer struct{} - -func (n noopPeer) Name() string { return "no gossip" } -func (n noopPeer) SetLabels(labels []storepb.Label) {} -func (n noopPeer) SetTimestamps(mint int64, maxt int64) {} -func (n noopPeer) PeerState(id string) (PeerState, bool) { return PeerState{}, false } -func (n noopPeer) Join(peerType PeerType, initialMetadata PeerMetadata) error { return nil } -func (n noopPeer) PeerStates(types ...PeerType) map[string]PeerState { return nil } -func (n noopPeer) Close(timeout time.Duration) {} diff --git a/pkg/cluster/data.go b/pkg/cluster/data.go deleted file mode 100644 index 189fb3727c..0000000000 --- a/pkg/cluster/data.go +++ /dev/null @@ -1,42 +0,0 @@ -package cluster - -import "sync" - -type data struct { - mtx sync.RWMutex - data map[string]PeerState -} - -func (d *data) Set(k string, v PeerState) { - d.mtx.Lock() - defer d.mtx.Unlock() - - d.data[k] = v -} - -func (d *data) Del(k string) { - d.mtx.Lock() - defer d.mtx.Unlock() - - delete(d.data, k) -} - -func (d *data) Get(k string) (PeerState, bool) { - d.mtx.RLock() - defer d.mtx.RUnlock() - - p, ok := d.data[k] - return p, ok -} - -func (d *data) Data() map[string]PeerState { - d.mtx.RLock() - defer d.mtx.RUnlock() - - res := map[string]PeerState{} - for k, v := range d.data { - res[k] = v - } - - return res -} diff --git a/pkg/cluster/delegate.go b/pkg/cluster/delegate.go deleted file mode 100644 index 1a7a1a175b..0000000000 --- a/pkg/cluster/delegate.go +++ /dev/null @@ -1,95 +0,0 @@ -package cluster - -import ( - "encoding/json" - "strings" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/hashicorp/memberlist" - "github.com/prometheus/client_golang/prometheus" -) - -// delegate implements memberlist.Delegate and memberlist.EventDelegate -// and broadcasts its peer's state in the cluster. -type delegate struct { - *memberlist.TransmitLimitedQueue - - logger log.Logger - data *data - - gossipMsgsReceived prometheus.Counter - gossipClusterMembers prometheus.Gauge -} - -func newDelegate(l log.Logger, numNodes func() int, data *data, gossipMsgsReceived prometheus.Counter, gossipClusterMembers prometheus.Gauge) *delegate { - return &delegate{ - TransmitLimitedQueue: &memberlist.TransmitLimitedQueue{ - NumNodes: numNodes, - RetransmitMult: 3, - }, - logger: l, - data: data, - gossipMsgsReceived: gossipMsgsReceived, - gossipClusterMembers: gossipClusterMembers, - } -} - -// NodeMeta retrieves meta-data about the current node when broadcasting an alive message. -func (d *delegate) NodeMeta(limit int) []byte { - return []byte{} -} - -// NotifyMsg is the callback invoked when a user-level gossip message is received. -func (d *delegate) NotifyMsg(b []byte) { - var data map[string]PeerState - if err := json.Unmarshal(b, &data); err != nil { - level.Error(d.logger).Log("method", "NotifyMsg", "b", strings.TrimSpace(string(b)), "err", err) - return - } - d.gossipMsgsReceived.Inc() - - for k, v := range data { - // Removing data is handled by NotifyLeave - d.data.Set(k, v) - } -} - -// LocalState is called when gossip fetches local state. -func (d *delegate) LocalState(_ bool) []byte { - b, err := json.Marshal(d.data.Data()) - if err != nil { - panic(err) - } - return b -} - -func (d *delegate) MergeRemoteState(buf []byte, _ bool) { - var data map[string]PeerState - if err := json.Unmarshal(buf, &data); err != nil { - level.Error(d.logger).Log("method", "MergeRemoteState", "err", err) - return - } - for k, v := range data { - // Removing data is handled by NotifyLeave - d.data.Set(k, v) - } -} - -// NotifyJoin is called if a peer joins the cluster. -func (d *delegate) NotifyJoin(n *memberlist.Node) { - d.gossipClusterMembers.Inc() - level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address()) -} - -// NotifyLeave is called if a peer leaves the cluster. -func (d *delegate) NotifyLeave(n *memberlist.Node) { - d.gossipClusterMembers.Dec() - level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address()) - d.data.Del(n.Name) -} - -// NotifyUpdate is called if a cluster peer gets updated. -func (d *delegate) NotifyUpdate(n *memberlist.Node) { - level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address()) -}