Skip to content

Commit

Permalink
sd: add cleanup for dead hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Jul 27, 2023
1 parent e279534 commit 25f774a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 23 deletions.
16 changes: 13 additions & 3 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func main() {
pprof := flag.String("pprof", "", "Additional pprof listen addr for non-server modes (tagger, etc..), overrides pprof-listen from common ")

sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup registered nodes in SD")

printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
Expand Down Expand Up @@ -124,19 +125,28 @@ func main() {
return
}

if *sdList {
if *sdList || *sdClean {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var sd sd.SD
logger := zapwriter.Default()
switch cfg.Common.SDType {
case config.SDNginx:
sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, "", logger)
default:
panic("serive discovery type not registered")
panic("service discovery type not registered")
}
ts := time.Now().Unix() - 3600
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
fmt.Printf("%s: %s\n", node.Key, node.Value)
if *sdClean && node.Flags > 0 {
if ts > node.Flags {
fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
}
} else {
log.Fatal(err)
Expand Down
35 changes: 29 additions & 6 deletions sd/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"strconv"
"strings"
"time"

"github.com/lomik/graphite-clickhouse/sd/utils"

Expand All @@ -17,6 +18,8 @@ var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
ErrNoKey = errors.New("list key no found")
ErrInvalidKey = errors.New("list key is invalid")

timeNow = time.Now
)

func splitNode(node string) (dc, host, listen string, ok bool) {
Expand Down Expand Up @@ -51,7 +54,7 @@ func New(url, namespace, hostname string, logger *zap.Logger) *Nginx {
sd := &Nginx{
logger: logger,
body: make([]byte, 128),
backupBody: []byte(`{"backup":1, "max_fails":0}`),
backupBody: []byte(`{"backup":1,"max_fails":0}`),
nsEnd: "upstreams/" + namespace + "/",
hostname: hostname,
}
Expand Down Expand Up @@ -188,19 +191,27 @@ func (sd *Nginx) Nodes() (nodes []utils.KV, err error) {
if s, ok := i.(string); ok {
if strings.HasPrefix(s, sd.nsEnd) {
s = s[len(sd.nsEnd):]
kv := utils.KV{Key: s}
if i, ok := jNode["Value"]; ok {
if v, ok := i.(string); ok {
d, err := base64.StdEncoding.DecodeString(v)
if err != nil {
return nil, err
}
nodes = append(nodes, utils.KV{Key: s, Value: stringutils.UnsafeString(d)})
} else {
nodes = append(nodes, utils.KV{Key: s, Value: ""})
kv.Value = stringutils.UnsafeString(d)
}
}
if i, ok := jNode["Flags"]; ok {
switch v := i.(type) {
case float64:
kv.Flags = int64(v)
case int:
kv.Flags = int64(v)
case int64:
kv.Flags = v
}
} else {
nodes = append(nodes, utils.KV{Key: s, Value: ""})
}
nodes = append(nodes, kv)
} else {
return nil, ErrInvalidKey
}
Expand All @@ -227,11 +238,20 @@ func (sd *Nginx) update(ip, port string, dc []string) (err error) {
}
sd.url.WriteString(port)

// add custom query flags
sd.url.WriteByte('?')
sd.url.WriteString("flags=")
sd.url.WriteInt(timeNow().Unix(), 10)

if err = utils.HttpPut(sd.url.String(), sd.body); err != nil {
sd.logger.Error("put", zap.String("address", sd.url.String()[sd.pos:]), zap.Error(err))
return
}
} else {
flags := make([]byte, 0, 32)
flags = append(flags, "?flags="...)
flags = strconv.AppendInt(flags, timeNow().Unix(), 10)

for i := 0; i < len(dc); i++ {
// cfg.Common.SDDc
sd.url.Truncate(sd.pos)
Expand All @@ -245,6 +265,9 @@ func (sd *Nginx) update(ip, port string, dc []string) (err error) {
}
sd.url.WriteString(port)

// add custom query flags
sd.url.Write(flags)

if i == 0 {
if nErr := utils.HttpPut(sd.url.String(), sd.body); nErr != nil {
sd.logger.Error(
Expand Down
65 changes: 51 additions & 14 deletions sd/nginx/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package nginx
import (
"sort"
"testing"
"time"

"github.com/lomik/graphite-clickhouse/sd/utils"
"github.com/lomik/zapwriter"
Expand All @@ -27,6 +28,10 @@ var (
)

func TestNginx(t *testing.T) {
timeNow = func() time.Time {
return time.Unix(1682408721, 0)
}

logger := zapwriter.Default()

sd1 := New("http://127.0.0.1:8500/v1/kv/upstreams", "graphite", hostname1, logger)
Expand Down Expand Up @@ -126,6 +131,17 @@ func TestNginx(t *testing.T) {
"_/test_host2/192.168.0.1:9090": `{"weight":25,"max_fails":0}`,
}, nodesMap,
)

nodesV, err := sd2.Nodes()
require.NoError(t, err)
assert.Equal(
t, []utils.KV{
{Key: "_/test_host1/192.168.0.1:9090", Value: `{"weight":10,"max_fails":0}`, Flags: 1682408721},
{Key: "_/test_host2/192.168.0.1:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
{Key: "_/test_host2/192.168.1.25:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
}, nodesV,
)

require.NoError(t, sd2.Clear(ip2, port))
nodesMap, err = sd2.ListMap()
require.NoError(t, err)
Expand All @@ -148,6 +164,10 @@ func TestNginx(t *testing.T) {
}

func TestNginxDC(t *testing.T) {
timeNow = func() time.Time {
return time.Unix(1682408721, 0)
}

logger := zapwriter.Default()

sd1 := New("http://127.0.0.1:8500/v1/kv/upstreams", "graphite", hostname1, logger)
Expand Down Expand Up @@ -182,8 +202,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc1/test_host1/192.168.0.1:9090": `{"weight":10,"max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

Expand All @@ -205,8 +225,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":21,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

Expand All @@ -228,8 +248,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

Expand All @@ -245,8 +265,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc1/test_host1/192.168.0.1:9090": `{"weight":10,"max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

Expand All @@ -258,21 +278,38 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc2/test_host2/192.168.0.1:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

nodesV, err := sd2.Nodes()
require.NoError(t, err)
assert.Equal(
t, []utils.KV{
{Key: "dc1/test_host1/192.168.0.1:9090", Value: `{"weight":10,"max_fails":0}`, Flags: 1682408721},
{Key: "dc1/test_host2/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc1/test_host2/192.168.1.25:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc2/test_host1/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc2/test_host2/192.168.0.1:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
{Key: "dc2/test_host2/192.168.1.25:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
{Key: "dc3/test_host1/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc3/test_host2/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc3/test_host2/192.168.1.25:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
}, nodesV,
)

require.NoError(t, sd2.Clear(ip2, port))
nodesMap, err = sd2.ListMap()
require.NoError(t, err)
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

Expand Down
1 change: 1 addition & 0 deletions sd/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
type KV struct {
Key string
Value string
Flags int64
}

func HttpGet(url string) ([]byte, error) {
Expand Down

0 comments on commit 25f774a

Please sign in to comment.