From f7c73b890d154624a6648f3852ed7d4a622b787d Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Tue, 19 Dec 2023 14:00:48 +0800 Subject: [PATCH] Dynamically adjust refresh time. --- codis/cmd/proxy/main.go | 2 + codis/config/proxy.toml | 2 + codis/pkg/proxy/config.go | 9 + codis/pkg/proxy/proxy.go | 18 ++ codis/pkg/proxy/stats.go | 9 +- tools/pika_exporter/discovery/discovery.go | 7 +- tools/pika_exporter/exporter/future.go | 13 +- .../pika_exporter/exporter/metrics/parser.go | 12 +- .../exporter/mockCodisStats.json | 194 ------------------ .../exporter/mockProxyStats.json | 88 ++++++++ tools/pika_exporter/exporter/parser_test.go | 2 +- tools/pika_exporter/exporter/proxy.go | 13 +- 12 files changed, 151 insertions(+), 218 deletions(-) delete mode 100644 tools/pika_exporter/exporter/mockCodisStats.json create mode 100644 tools/pika_exporter/exporter/mockProxyStats.json diff --git a/codis/cmd/proxy/main.go b/codis/cmd/proxy/main.go index 8d7da2381f..26cfb37197 100644 --- a/codis/cmd/proxy/main.go +++ b/codis/cmd/proxy/main.go @@ -193,6 +193,8 @@ Options: } defer s.Close() + proxy.RefreshPeriod.Set(config.MaxDelayRefreshTimeInterval.Int64()) + log.Warnf("create proxy with config\n%s", config) if s, ok := utils.Argument(d, "--pidfile"); ok { diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index 07f5a3126a..1cc55ea27a 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -118,3 +118,5 @@ metrics_report_statsd_server = "" metrics_report_statsd_period = "1s" metrics_report_statsd_prefix = "" +# Maximum delay statistical time interval.(This value must be greater than 0.) +max_delay_refresh_time_interval = "15s" diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index c531d4ab53..62051c9017 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -133,6 +133,9 @@ metrics_report_influxdb_database = "" metrics_report_statsd_server = "" metrics_report_statsd_period = "1s" metrics_report_statsd_prefix = "" + +# Maximum delay statistical time interval.(This value must be greater than 0.) +max_delay_refresh_time_interval = "15s" ` type Config struct { @@ -192,6 +195,8 @@ type Config struct { MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` + MaxDelayRefreshTimeInterval timesize.Duration `toml:"max_delay_refresh_time_interval" json:"max_delay_refresh_time_interval"` + ConfigFileName string `toml:"-" json:"config_file_name"` } @@ -323,5 +328,9 @@ func (c *Config) Validate() error { return errors.New("invalid metrics_report_statsd_period") } + if c.MaxDelayRefreshTimeInterval <= 0 { + return errors.New("max_delay_refresh_time_interval must be greater than 0") + } + return nil } diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index 379e883ee4..ba6115cd4a 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -308,6 +308,12 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp { redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")), redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)), }) + case "max_delay_refresh_time_interval": + if text, err := p.config.MaxDelayRefreshTimeInterval.MarshalText(); err != nil { + return redis.NewErrorf("cant get max_delay_refresh_time_interval value.") + } else { + return redis.NewBulkBytes(text) + } default: return redis.NewErrorf("unsupported key: %s", key) } @@ -342,6 +348,18 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp { } p.config.SlowlogLogSlowerThan = n return redis.NewString([]byte("OK")) + case "max_delay_refresh_time_interval": + s := &(p.config.MaxDelayRefreshTimeInterval) + err := s.UnmarshalText([]byte(value)) + if err != nil { + return redis.NewErrorf("err:%s.", err) + } + if d := p.config.MaxDelayRefreshTimeInterval.Duration(); d <= 0 { + return redis.NewErrorf("max_delay_refresh_time_interval must be greater than 0") + } else { + RefreshPeriod.Set(int64(d)) + return redis.NewString([]byte("OK")) + } default: return redis.NewErrorf("unsupported key: %s", key) } diff --git a/codis/pkg/proxy/stats.go b/codis/pkg/proxy/stats.go index eb46539550..06a2b67aa2 100644 --- a/codis/pkg/proxy/stats.go +++ b/codis/pkg/proxy/stats.go @@ -14,7 +14,10 @@ import ( "pika/codis/v2/pkg/utils/sync2/atomic2" ) -var SlowCmdCount atomic2.Int64 // Cumulative count of slow log +var ( + SlowCmdCount atomic2.Int64 // Cumulative count of slow log + RefreshPeriod atomic2.Int64 +) type opStats struct { opstr string @@ -79,10 +82,10 @@ func init() { } }() - // Clear the accumulated maximum delay to 0 every 15 seconds. + // Clear the accumulated maximum delay to 0 go func() { for { - time.Sleep(15 * time.Second) + time.Sleep(time.Duration(RefreshPeriod.Int64())) for _, s := range cmdstats.opmap { s.maxDelay.Set(0) } diff --git a/tools/pika_exporter/discovery/discovery.go b/tools/pika_exporter/discovery/discovery.go index be4b155f96..fd16a0979d 100644 --- a/tools/pika_exporter/discovery/discovery.go +++ b/tools/pika_exporter/discovery/discovery.go @@ -206,8 +206,11 @@ func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) { } func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool { - var addrs, addrsProxy []string - var diff bool = false + var ( + addrs []string + addrsProxy []string + diff bool + ) for _, instance := range new_instance.instances { addrs = append(addrs, instance.Addr) } diff --git a/tools/pika_exporter/exporter/future.go b/tools/pika_exporter/exporter/future.go index 6bb39fe5e7..3ca3afbc4a 100644 --- a/tools/pika_exporter/exporter/future.go +++ b/tools/pika_exporter/exporter/future.go @@ -3,7 +3,8 @@ package exporter import "sync" type futureKey struct { - addr, alias string + addr string + alias string } type future struct { @@ -38,7 +39,9 @@ func (f *future) Wait() map[futureKey]error { } type futureKeyForProxy struct { - addr, ID, productName string + addr string + ID string + productName string } type futureForProxy struct { @@ -60,9 +63,11 @@ func (f *futureForProxy) Add() { func (f *futureForProxy) Done(key futureKeyForProxy, val error) { f.Lock() - defer f.Unlock() + defer func() { + f.Unlock() + f.wait.Done() + }() f.m[key] = val - f.wait.Done() } func (f *futureForProxy) Wait() map[futureKeyForProxy]error { diff --git a/tools/pika_exporter/exporter/metrics/parser.go b/tools/pika_exporter/exporter/metrics/parser.go index 6488dbd336..15d7691732 100644 --- a/tools/pika_exporter/exporter/metrics/parser.go +++ b/tools/pika_exporter/exporter/metrics/parser.go @@ -348,14 +348,10 @@ func StructToMap(obj interface{}) (map[string]string, map[string][]int64, error) for j := 0; j < field.Len(); j++ { elemType := field.Index(j).Type() elemValue := field.Index(j) - if elemType.Kind() == reflect.Struct { - var key string - for p := 0; p < elemValue.NumField(); p++ { - if p == 0 { - key = elemValue.Field(p).String() - } else { - cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int()) - } + if elemType.Kind() == reflect.Struct && elemValue.NumField() > 0 { + var key string = elemValue.Field(0).String() + for p := 1; p < elemValue.NumField(); p++ { + cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int()) } } else { result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value) diff --git a/tools/pika_exporter/exporter/mockCodisStats.json b/tools/pika_exporter/exporter/mockCodisStats.json deleted file mode 100644 index 2dd3fbd646..0000000000 --- a/tools/pika_exporter/exporter/mockCodisStats.json +++ /dev/null @@ -1,194 +0,0 @@ -{ - "version": "2018-11-04 16:22:35 +0800 @de1ad026e329561c22e2a3035fbfe89dc7fef764 @3.2.2-12-gde1ad026", - "compile": "2023-02-23 11:25:09 +0800 by go version go1.19.6 linux/amd64", - "config": { - "coordinator_name": "zookeeper", - "coordinator_addr": "1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181", - "coordinator_auth": "", - "admin_addr": "0.0.0.0:1234", - "product_name": "test", - "migration_method": "semi-async", - "migration_parallel_slots": 100, - "migration_async_maxbulks": 200, - "migration_async_maxbytes": "32mb", - "migration_async_numkeys": 500, - "migration_timeout": "30s", - "sentinel_client_timeout": "10s", - "sentinel_quorum": 2, - "sentinel_parallel_syncs": 1, - "sentinel_down_after": "30s", - "sentinel_failover_timeout": "5m", - "sentinel_notification_script": "", - "sentinel_client_reconfig_script": "" - }, - "model": { - "token": "sdf123sdf112xx4", - "start_time": "2021-06-28 11:32:49.497871229 +0800 CST", - "admin_addr": "1.1.1.4:12345", - "product_name": "test", - "pid": 1740, - "pwd": "/data/codis", - "sys": "Linux localhost.localdomain 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux" - }, - "stats": { - "closed": false, - "slots": [ - { - "id": 0, - "group_id": 1, - "action": {} - }, - { - "id": 1, - "group_id": 1, - "action": {} - } - ], - "group": { - "models": [ - { - "id": 1, - "servers": [ - { - "server": "127.0.0.1:9221", - "datacenter": "", - "action": {}, - "role": "master", - "reply_offset": 0, - "state": 2, - "recall_times": 33, - "replica_group": false - } - ], - "promoting": {}, - "out_of_sync": false - } - ], - "stats": { - "1.1.1.6:1100": { - "stats": { - "active_defrag_hits": "0" - }, - "unixtime": 1693281655 - }, - "1.1.1.7:1100": { - "stats": { - "active_defrag_hits": "0" - }, - "unixtime": 1693281655 - }, - "unixtime": 1693281655 - } - }, - "proxy": { - "models": [ - { - "id": 1, - "token": "722fc570089c135c34f668996c12aa40", - "start_time": "2023-12-12 15:10:18.645485 +0800 CST m=+0.036954501", - "admin_addr": "1.2.3.4:1234", - "proto_type": "tcp4", - "proxy_addr": "1.2.3.4:4321", - "product_name": "codis-demo", - "pid": 60611, - "pwd": "/Users/admin", - "sys": "Darwin MacBook-Pro.local 22.6.0 Darwin Kernel Version 22.6.0; root:xnu-8796.141.3~6/RELEASE_ARM64_T8112 arm64", - "max_slot_num": 1024, - "hostname": "MacBook-Pro.local", - "datacenter": "" - } - ], - "stats": { - "4337153c64e1babd3b5c4df6e363d281": { - "stats": { - "online": true, - "closed": false, - "sentinels": {}, - "ops": { - "total": 791976, - "fails": 6, - "redis": { - "errors": 1 - }, - "qps": 11 - }, - "sessions": { - "total": 528784, - "alive": 34 - }, - "rusage": { - "now": "2023-08-29 12:00:53.605229492 +0800 CST", - "cpu": 0.009997645624438964, - "mem": 162881512312336, - "raw": { - "utime": 4474490510000000, - "stime": 4125310130000000, - "cutime": 0, - "cstime": 0, - "num_threads": 358, - "vm_size": 29351325696, - "vm_rss": 162881536 - } - }, - "backend": { - "primary_only": false - } - }, - "unixtime": 1693281654 - }, - "ef87e1ad5422ab8e2816b6694750e856": { - "stats": { - "online": true, - "closed": false, - "sentinels": {}, - "ops": { - "total": 27570566, - "fails": 31, - "redis": { - "errors": 2 - }, - "qps": 8 - }, - "sessions": { - "total": 6349, - "alive": 4 - }, - "rusage": { - "now": "2023-08-29 12:00:54.516379647 +0800 CST", - "cpu": 0, - "mem": 19081124412160, - "raw": { - "utime": 1666690710000000, - "stime": 2573457670000000, - "cutime": 0, - "cstime": 0, - "num_threads": 51, - "vm_size": 8284008448, - "vm_rss": 190812160 - } - }, - "backend": { - "primary_only": false - } - }, - "unixtime": 1693281654 - } - } - }, - "slot_action": { - "interval": 0, - "disabled": false, - "progress": { - "status": "" - }, - "executor": 0 - }, - "sentinels": { - "model": { - "out_of_sync": false - }, - "stats": {}, - "masters": {} - } - } -} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/mockProxyStats.json b/tools/pika_exporter/exporter/mockProxyStats.json new file mode 100644 index 0000000000..79cd43d1a5 --- /dev/null +++ b/tools/pika_exporter/exporter/mockProxyStats.json @@ -0,0 +1,88 @@ +{ + "online": true, + "closed": false, + "sentinels": {}, + "ops": { + "total": 6, + "fails": 0, + "redis": { + "errors": 0 + }, + "qps": 0, + "cmd": [ + { + "opstr": "GET", + "calls": 1, + "usecs": 1540, + "usecs_percall": 1540, + "fails": 0, + "redis_errtype": 0, + "max_delay": 0 + }, + { + "opstr": "PCONFIG", + "calls": 4, + "usecs": 435, + "usecs_percall": 108, + "fails": 0, + "redis_errtype": 0, + "max_delay": 0 + }, + { + "opstr": "SET", + "calls": 1, + "usecs": 4706, + "usecs_percall": 4706, + "fails": 0, + "redis_errtype": 0, + "max_delay": 0 + } + ] + }, + "sessions": { + "total": 1, + "alive": 1 + }, + "rusage": { + "now": "2023-12-19 13:34:12.039062 +0800 CST m=+743.815706668", + "cpu": 0.001923812544336962, + "mem": 0, + "raw": { + "utime": 936053000, + "stime": 559258000, + "max_rss": 126533632, + "ix_rss": 0, + "id_rss": 0, + "is_rss": 0 + } + }, + "backend": { + "primary_only": false + }, + "runtime": { + "general": { + "alloc": 288695704, + "sys": 389611864, + "lookups": 0, + "mallocs": 180635, + "frees": 174876 + }, + "heap": { + "alloc": 288695704, + "sys": 376864768, + "idle": 86532096, + "inuse": 290332672, + "objects": 5759 + }, + "gc": { + "num": 7, + "cpu_fraction": 0.000007142915997134459, + "total_pausems": 2 + }, + "num_procs": 4, + "num_goroutines": 21, + "num_cgo_call": 2979, + "mem_offheap": 458752 + }, + "slow_cmd_count": 0 +} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/parser_test.go b/tools/pika_exporter/exporter/parser_test.go index c7e425ae2b..77a4e016b5 100644 --- a/tools/pika_exporter/exporter/parser_test.go +++ b/tools/pika_exporter/exporter/parser_test.go @@ -97,7 +97,7 @@ func Benchmark_Parse(b *testing.B) { } func Test_Parse_Proxy_Stats(t *testing.T) { - jsonFile := "mockCodisStats.json" + jsonFile := "mockProxyStats.json" jsonData, err := ioutil.ReadFile(jsonFile) if err != nil { t.Fatalf("failed to read test data: %v", err) diff --git a/tools/pika_exporter/exporter/proxy.go b/tools/pika_exporter/exporter/proxy.go index 13c43fb9c4..56355e54e9 100644 --- a/tools/pika_exporter/exporter/proxy.go +++ b/tools/pika_exporter/exporter/proxy.go @@ -2,14 +2,16 @@ package exporter import ( "encoding/json" - "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" - "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" "net/http" "strconv" "sync" "time" + + log "github.com/sirupsen/logrus" + + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -120,14 +122,13 @@ func (p *exporterProxy) Describe(ch chan<- *prometheus.Desc) { func (p *exporterProxy) Collect(ch chan<- prometheus.Metric) { p.mutex.Lock() - defer p.mutex.Unlock() - startTime := time.Now() defer func() { p.collectCount.Inc() p.collectDuration.Observe(time.Since(startTime).Seconds()) ch <- p.collectCount ch <- p.collectDuration + p.mutex.Unlock() }() p.scrape(ch)