Skip to content

Commit

Permalink
sd: register in service discovery with weight
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Apr 21, 2023
1 parent 423c9c8 commit 2287c31
Show file tree
Hide file tree
Showing 17 changed files with 1,086 additions and 51 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/tests-sd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Tests register in SD

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:

tests:
env:
CGO_ENABLED: 0
name: Test register in SD
runs-on: ubuntu-latest
strategy:
matrix:
go:
- ^1
steps:

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}

- name: Check out code
uses: actions/checkout@v2

- name: Start consul
run: |
./tests/consul.sh 1.15.2 > /tmp/consul.log &
sleep 30
shell: bash

- name: Test
run: go test ./sd/nginx -tags=test_sd -v
48 changes: 48 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,40 @@ import (
"github.com/lomik/zapwriter"
)

type SDType int

const (
SDNginx SDType = iota // https://github.com/weibocom/nginx-upsync-module
)

var sdTypeStrings []string = []string{"nginx"}

func (a *SDType) Set(value string) error {
switch value {
case "nginx":
*a = SDNginx
default:
return fmt.Errorf("invalid sd type %s", value)
}
return nil
}

func (a *SDType) UnmarshalText(data []byte) error {
return a.Set(string(data))
}

func (a *SDType) UnmarshalJSON(data []byte) error {
return a.Set(string(data))
}

func (a *SDType) String() string {
return sdTypeStrings[*a]
}

func (a *SDType) Type() string {
return "service_discovery_type"
}

// Cache config
type CacheConfig struct {
Type string `toml:"type" json:"type" comment:"cache type"`
Expand All @@ -51,6 +85,11 @@ type Common struct {
Blacklist []*regexp.Regexp `toml:"-" json:"-"` // compiled TargetBlacklist
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service_discovery base weight (on 0 load)"`
SDType SDType `toml:"service_discovery_type" json:"service_discovery_type" comment:"service_discovery type"`
SD string `toml:"service_discovery" json:"service_discovery" comment:"service_discovery address (consul)"`
SDNamespace string `toml:"service_discovery_ns" json:"service_discovery_ns" comment:"service_discovery namespace (graphite by default)"`
SDDc []string `toml:"service_discovery_ds" json:"service_discovery_ds" comment:"service_discovery datacenters (first - is primary, in other register as backup)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -650,6 +689,15 @@ func Unmarshal(body []byte, noLog bool) (*Config, error) {

// NeedLoadAvgColect check if load avg collect is neeeded
func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SD != "" {
if c.Common.BaseWeight <= 0 {
c.Common.BaseWeight = 100
}
if c.Common.SDNamespace == "" {
c.Common.SDNamespace = "graphite"
}
return true
}
if c.ClickHouse.RenderAdaptiveQueries > 0 {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
memory-return-interval = "0s"
# additional request headers to log
headers-to-log = []
# service_discovery base weight (on 0 load)
base_weight = 0
# service_discovery type
service_discovery_type = 0
# service_discovery address (consul)
service_discovery = ""
# service_discovery namespace (graphite by default)
service_discovery_ns = ""
# service_discovery datacenters (first - is primary, in other register as backup)
service_discovery_ds = []

# find/tags cache config
[common.find-cache]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/go-graphite/protocol v1.0.0
github.com/gogo/protobuf v1.3.2
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6
github.com/json-iterator/go v1.1.12
github.com/lomik/graphite-pickle v0.0.0-20171221213606-614e8df42119
github.com/lomik/og-rek v0.0.0-20170411191824-628eefeb8d80
github.com/lomik/zapwriter v0.0.0-20210624082824-c1161d1eb463
Expand Down Expand Up @@ -59,7 +60,6 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/lomik/stop v0.0.0-20161127103810-188e98d969bd // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,6 @@ github.com/msaf1980/go-metrics v0.0.14/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0
github.com/msaf1980/go-stringutils v0.1.2/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.4 h1:UwsIT0hplHVucqbknk3CoNqKkmIuSHhsbBldXxyld5U=
github.com/msaf1980/go-stringutils v0.1.4/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-syncutils v0.0.2 h1:F7lTtojuZUHFH9Cs6yRz4SRnvmttSV2qD6nEvseCFVg=
github.com/msaf1980/go-syncutils v0.0.2/go.mod h1:zoZwQNkDATcfKq5lQPK6dmJT7Z01COxw/vd8bcJyC9w=
github.com/msaf1980/go-syncutils v0.0.3 h1:bd6+yTSB8/CmpG7M6j1gq5sJMyPqecjJcBf19s2Y6u4=
github.com/msaf1980/go-syncutils v0.0.3/go.mod h1:zoZwQNkDATcfKq5lQPK6dmJT7Z01COxw/vd8bcJyC9w=
github.com/msaf1980/go-timeutils v0.0.3 h1:c0NIpJBcU6KoMeMCPdnbGFcaP4sm7VCwoW1cdgsmUkU=
Expand Down
38 changes: 28 additions & 10 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/lomik/graphite-clickhouse/find"
"github.com/lomik/graphite-clickhouse/healthcheck"
"github.com/lomik/graphite-clickhouse/index"
"github.com/lomik/graphite-clickhouse/load_avg"
"github.com/lomik/graphite-clickhouse/logs"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/prometheus"
"github.com/lomik/graphite-clickhouse/render"
"github.com/lomik/graphite-clickhouse/sd"
"github.com/lomik/graphite-clickhouse/sd/nginx"
"github.com/lomik/graphite-clickhouse/tagger"
)

Expand Down Expand Up @@ -93,6 +94,8 @@ func main() {
buildTags := flag.Bool("tags", false, "Build tags table")
pprof := flag.String("pprof", "", "Additional pprof listen addr for non-server modes (tagger, etc..), overrides pprof-listen from common ")

sdList := flag.Bool("sd-list", false, "List registered nodes in SD")

printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")

Expand Down Expand Up @@ -120,6 +123,27 @@ func main() {
return
}

if *sdList {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var sd sd.SD
logger := zapwriter.Default()
switch cfg.Common.SDType {
case config.SDNginx:
sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, logger)
default:
panic("serive discovery type not registered")
}
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
fmt.Printf("%s: %s\n", node.Key, node.Value)
}
} else {
log.Fatal(err)
}
}
return
}

if err = zapwriter.ApplyConfig(cfg.Logging); err != nil {
log.Fatal(err)
}
Expand All @@ -128,6 +152,7 @@ func main() {
if err != nil {
log.Fatal(err)
}

logger := localManager.Logger("start")
if *verbose {
logger.Info("starting graphite-clickhouse",
Expand Down Expand Up @@ -215,15 +240,8 @@ func main() {
}

if cfg.NeedLoadAvgColect() {
go func() {
for {
load1, err := load_avg.Normalized()
if err == nil {
load_avg.Store(load1)
}
time.Sleep(time.Second * 10)
}
}()
sdLogger := localManager.Logger("service discovery")
go sd.Register(cfg, sdLogger)
}

log.Fatal(http.ListenAndServe(cfg.Common.Listen, mux))
Expand Down
2 changes: 1 addition & 1 deletion limiter/alimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func getWeighted(n, max int) int {
return 0
}
loadAvg := load_avg.Load()
if loadAvg < 1 {
if loadAvg < 0.6 {
return 0
}

Expand Down
8 changes: 4 additions & 4 deletions limiter/alimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func TestNewALimiter(t *testing.T) {
cancel()

// load_avg 0.5
load_avg.Store(0.5)
load_avg.Store(0.7)
k := getWeighted(n, c)
require.Equal(t, n/2, k)
require.Equal(t, n*7/10, k)

time.Sleep(checkDelay * 2)

Expand All @@ -96,13 +96,13 @@ func TestNewALimiter(t *testing.T) {
time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
for i = 0; i < c-k; i++ {
for i = 0; i < c-n; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 1 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < k; i++ {
for i = 0; i < c-n; i++ {
limiter.Leave(ctx, "render")
}

Expand Down
48 changes: 15 additions & 33 deletions load_avg/load_avg.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,33 @@
package load_avg

import (
"os"
"strings"
"syscall"
"math"

"github.com/msaf1980/go-stringutils"
"github.com/msaf1980/go-syncutils/atomic"
)

var (
loadAvgStore atomic.Float64
)

func Normalized() (float64, error) {
var info syscall.Sysinfo_t
err := syscall.Sysinfo(&info)
if err != nil {
return 0, err
}

cpus, err := CpuCount()
if err != nil {
return 0, err
}

const si_load_shift = 16
load1 := float64(info.Loads[0]) / float64(1<<si_load_shift) / float64(cpus)
return load1, nil
}

func CpuCount() (uint64, error) {
b, err := os.ReadFile("/proc/cpuinfo")
if err != nil {
return 0, err
}
s := stringutils.UnsafeString(b)

cpus := strings.Count(s, "processor\t: ")

return uint64(cpus), nil
}

func Load() float64 {
return loadAvgStore.Load()
}

func Store(f float64) {
loadAvgStore.Store(f)
}

func Weight(n int, l float64) int64 {
// (1 / normalized_load_avg - 1)
l = math.Round(10*l) / 10
if l == 0 {
return 2 * int64(n)
}
l = math.Log10(l)
w := int64(n) - int64(float64(n)*l)
if w < 0 {
return 0
}
return w
}
20 changes: 20 additions & 0 deletions load_avg/load_avg_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//go:build !linux
// +build !linux

package load_avg

import (
"os"
"strings"
"syscall"

"github.com/msaf1980/go-stringutils"
)

func Normalized() (float64, error) {
return 0, nil
}

func CpuCount() (uint64, error) {
return 0, nil
}
41 changes: 41 additions & 0 deletions load_avg/load_avg_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//go:build linux
// +build linux

package load_avg

import (
"os"
"strings"
"syscall"

"github.com/msaf1980/go-stringutils"
)

func Normalized() (float64, error) {
var info syscall.Sysinfo_t
err := syscall.Sysinfo(&info)
if err != nil {
return 0, err
}

cpus, err := CpuCount()
if err != nil {
return 0, err
}

const si_load_shift = 16
load := float64(info.Loads[0]) / float64(1<<si_load_shift) / float64(cpus)
return load, nil
}

func CpuCount() (uint64, error) {
b, err := os.ReadFile("/proc/cpuinfo")
if err != nil {
return 0, err
}
s := stringutils.UnsafeString(b)

cpus := strings.Count(s, "processor\t: ")

return uint64(cpus), nil
}
Loading

0 comments on commit 2287c31

Please sign in to comment.