From 7ecf162f6a44533d523f1add7cfd07b21ecb7400 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Mon, 11 Dec 2023 11:16:52 +0800 Subject: [PATCH 1/4] First edition --- .../discovery/codis_dashboard.go | 67 ++++++ tools/pika_exporter/discovery/discovery.go | 48 +++- tools/pika_exporter/exporter/future.go | 35 +++ .../pika_exporter/exporter/metrics/metrics.go | 13 ++ .../pika_exporter/exporter/metrics/parser.go | 25 ++ tools/pika_exporter/exporter/metrics/proxy.go | 38 +++ tools/pika_exporter/exporter/proxy.go | 219 ++++++++++++++++++ tools/pika_exporter/main.go | 33 ++- 8 files changed, 471 insertions(+), 7 deletions(-) create mode 100644 tools/pika_exporter/exporter/metrics/proxy.go create mode 100644 tools/pika_exporter/exporter/proxy.go diff --git a/tools/pika_exporter/discovery/codis_dashboard.go b/tools/pika_exporter/discovery/codis_dashboard.go index 1084a1aaf7..6f9992f7ef 100644 --- a/tools/pika_exporter/discovery/codis_dashboard.go +++ b/tools/pika_exporter/discovery/codis_dashboard.go @@ -16,14 +16,81 @@ type CodisModelInfo struct { Servers []CodisServerInfo `json:"servers"` } +type CodisProxyModelInfo struct { + Id int `json:"id"` + AdminAddr string `json:"admin_addr"` + ProductName string `json:"product_name"` + DataCenter string `json:"data_center"` +} + type CodisGroupInfo struct { Models []CodisModelInfo `json:"models"` } +type CodisProxyInfo struct { + Models []CodisProxyModelInfo `json:"models"` +} + type CodisStatsInfo struct { Group CodisGroupInfo `json:"group"` + Proxy CodisProxyInfo `json:"proxy"` } type CodisTopomInfo struct { Stats CodisStatsInfo `json:"stats"` } + +type RedisInfo struct { + Errors int `json:"errors"` +} + +type ProxyOpsInfo struct { + Total int `json:"total"` + Fails int `json:"fails"` + Redis RedisInfo `json:"redis"` + Qps int `json:"qps"` +} + +type RowInfo struct { + Utime int64 `json:"utime"` + Stime int64 `json:"stime"` + MaxRss int64 `json:"max_rss"` + IxRss int64 `json:"ix_rss"` + IdRss int64 `json:"id_rss"` + IsRss int64 `json:"is_rss"` +} + +type RusageInfo struct { + Now string `json:"now"` + Cpu float64 `json:"cpu"` + Mem float64 `json:"mem"` + Raw RowInfo `json:"raw"` +} + +type GeneralInfo struct { + Alloc int64 `json:"alloc"` + Sys int64 `json:"sys"` + Lookups int64 `json:"lookups"` + Mallocs int64 `json:"mallocs"` + Frees int64 `json:"frees"` +} + +type HeapInfo struct { + Alloc int64 `json:"alloc"` + Sys int64 `json:"sys"` + Idle int64 `json:"idle"` + Inuse int64 `json:"inuse"` + Objects int64 `json:"objects"` +} + +type RunTimeInfo struct { + General GeneralInfo `json:"general"` + Heap HeapInfo `json:"heap"` +} + +type ProxyStats struct { + Ops ProxyOpsInfo `json:"ops"` + Rusage RusageInfo `json:"rusage"` + RunTime RunTimeInfo `json:"runtime"` + TimeoutCmdNumber int64 `json:"timeout_cmd_number"` +} diff --git a/tools/pika_exporter/discovery/discovery.go b/tools/pika_exporter/discovery/discovery.go index a7e236b577..d6f8011b18 100644 --- a/tools/pika_exporter/discovery/discovery.go +++ b/tools/pika_exporter/discovery/discovery.go @@ -20,8 +20,15 @@ type Instance struct { Alias string } +type InstanceProxy struct { + ID int + Addr string + ProductName string +} + type Discovery interface { GetInstances() []Instance + GetInstancesProxy() []InstanceProxy CheckUpdate(chan int, string) } @@ -58,6 +65,10 @@ func (d *cmdArgsDiscovery) GetInstances() []Instance { return d.instances } +func (d *cmdArgsDiscovery) GetInstancesProxy() []InstanceProxy { + return nil +} + func (d *cmdArgsDiscovery) CheckUpdate(chan int, string) {} type fileDiscovery struct { @@ -107,10 +118,15 @@ func (d *fileDiscovery) GetInstances() []Instance { return d.instances } +func (d *fileDiscovery) GetInstancesProxy() []InstanceProxy { + return nil +} + func (d *fileDiscovery) CheckUpdate(chan int, string) {} type codisDiscovery struct { - instances []Instance + instances []Instance + instanceProxy []InstanceProxy } func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) { @@ -155,12 +171,27 @@ func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) { Alias: aliases[i], } } - return &codisDiscovery{instances: instances}, nil + + instancesproxy := make([]InstanceProxy, len(result.Stats.Proxy.Models)) + for i := range result.Stats.Proxy.Models { + instancesproxy[i] = InstanceProxy{ + ID: result.Stats.Proxy.Models[i].Id, + Addr: result.Stats.Proxy.Models[i].AdminAddr, + ProductName: result.Stats.Proxy.Models[i].ProductName, + } + } + return &codisDiscovery{ + instances: instances, + instanceProxy: instancesproxy, + }, nil } func (d *codisDiscovery) GetInstances() []Instance { return d.instances } +func (d *codisDiscovery) GetInstancesProxy() []InstanceProxy { + return d.instanceProxy +} func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) { newdis, err := NewCodisDiscovery(codisaddr, "", "") @@ -174,7 +205,7 @@ func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) { } func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool { - var addrs []string + var addrs, addrsProxy []string var diff bool = false for _, instance := range new_instance.instances { addrs = append(addrs, instance.Addr) @@ -185,7 +216,16 @@ func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool { return false } } - if !diff && len(new_instance.instances) == len(d.instances) { + for _, instance := range new_instance.instanceProxy { + addrsProxy = append(addrsProxy, instance.Addr) + } + for _, instance := range d.instanceProxy { + if !contains(instance.Addr, addrsProxy) { + diff = true + return false + } + } + if !diff && len(new_instance.instances) == len(d.instances) && len(new_instance.instanceProxy) == len(d.instanceProxy) { return true } return false diff --git a/tools/pika_exporter/exporter/future.go b/tools/pika_exporter/exporter/future.go index 8b393682ec..e75ff999f0 100644 --- a/tools/pika_exporter/exporter/future.go +++ b/tools/pika_exporter/exporter/future.go @@ -36,3 +36,38 @@ func (f *future) Wait() map[futureKey]error { defer f.Unlock() return f.m } + +type futureKeyForProxy struct { + addr, instance, ID, productName string +} + +type futureForProxy struct { + *sync.Mutex + wait sync.WaitGroup + m map[futureKeyForProxy]error +} + +func newFutureForProxy() *futureForProxy { + return &futureForProxy{ + Mutex: new(sync.Mutex), + m: make(map[futureKeyForProxy]error), + } +} + +func (f *futureForProxy) Add() { + f.wait.Add(1) +} + +func (f *futureForProxy) Done(key futureKeyForProxy, val error) { + f.Lock() + defer f.Unlock() + f.m[key] = val + f.wait.Done() +} + +func (f *futureForProxy) Wait() map[futureKeyForProxy]error { + f.wait.Wait() + f.Lock() + defer f.Unlock() + return f.m +} diff --git a/tools/pika_exporter/exporter/metrics/metrics.go b/tools/pika_exporter/exporter/metrics/metrics.go index d571a1787c..c0cd55ea9e 100644 --- a/tools/pika_exporter/exporter/metrics/metrics.go +++ b/tools/pika_exporter/exporter/metrics/metrics.go @@ -16,6 +16,9 @@ const ( LabelNameAlias = "alias" LabelInstanceMode = "instance-mode" LabelConsensusLevel = "consensus-level" + LabelInstance = "instance" + LabelID = "id" + LabelProductName = "product_name" ) type Describer interface { @@ -96,6 +99,7 @@ type MetricConfig struct { } var MetricConfigs = make(map[string]MetricConfig) +var MetricConfigsProxy = make(map[string]MetricConfig) func Register(mcs map[string]MetricConfig) { for k, mc := range mcs { @@ -105,3 +109,12 @@ func Register(mcs map[string]MetricConfig) { MetricConfigs[k] = mc } } + +func RegisterProxy(mcs map[string]MetricConfig) { + for k, mc := range mcs { + if _, ok := MetricConfigsProxy[k]; ok { + panic(fmt.Sprintf("register metrics config error. metricConfigProxyName:%s existed", k)) + } + MetricConfigsProxy[k] = mc + } +} diff --git a/tools/pika_exporter/exporter/metrics/parser.go b/tools/pika_exporter/exporter/metrics/parser.go index d37184d0c3..c34129f6d5 100644 --- a/tools/pika_exporter/exporter/metrics/parser.go +++ b/tools/pika_exporter/exporter/metrics/parser.go @@ -1,6 +1,8 @@ package metrics import ( + "fmt" + "reflect" "regexp" "strconv" "strings" @@ -271,3 +273,26 @@ func convertTimeToUnix(ts string) (int64, error) { } return t.Unix(), nil } + +func StructToMap(obj interface{}) (map[string]string, error) { + objValue := reflect.ValueOf(obj) + objType := objValue.Type() + + data := make(map[string]string) + + for i := 0; i < objValue.NumField(); i++ { + field := objValue.Field(i) + fieldName := objType.Field(i).Name + + if field.Kind() == reflect.Struct { + innerData, _ := StructToMap(field.Interface()) + for k, v := range innerData { + data[k] = v + } + } else { + data[fieldName] = fmt.Sprintf("%v", field.Interface()) + } + } + + return data, nil +} diff --git a/tools/pika_exporter/exporter/metrics/proxy.go b/tools/pika_exporter/exporter/metrics/proxy.go new file mode 100644 index 0000000000..1566475c74 --- /dev/null +++ b/tools/pika_exporter/exporter/metrics/proxy.go @@ -0,0 +1,38 @@ +package metrics + +func RegisterForProxy() { + RegisterProxy(collectProxyMetrics) +} + +var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{ + "ops_total": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "ops_total", + Help: "proxy total ops", + Type: metricTypeCounter, + Labels: []string{LabelNameAddr, LabelInstance, LabelID, LabelProductName}, + ValueName: "Total", + }, + }, + "ops_fails": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "ops_fails", + Help: "proxy fails counter", + Type: metricTypeCounter, + Labels: []string{LabelNameAddr, LabelInstance, LabelID, LabelProductName}, + ValueName: "Fails", + }, + }, + "qps": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "qps", + Help: "proxy qps", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelInstance, LabelID, LabelProductName}, + ValueName: "Qps", + }, + }, +} diff --git a/tools/pika_exporter/exporter/proxy.go b/tools/pika_exporter/exporter/proxy.go new file mode 100644 index 0000000000..fe8d2140a0 --- /dev/null +++ b/tools/pika_exporter/exporter/proxy.go @@ -0,0 +1,219 @@ +package exporter + +import ( + "encoding/json" + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "net/http" + "strconv" + "sync" + "time" +) + +const ( + NamespaceProxy = "proxy" + InstanceType +) + +type exporterProxy struct { + dis discovery.Discovery + namespace string + collectDuration prometheus.Histogram + collectCount prometheus.Counter + scrapeDuration *prometheus.HistogramVec + scrapeErrors *prometheus.CounterVec + scrapeLastError *prometheus.GaugeVec + scrapeCount *prometheus.CounterVec + up *prometheus.GaugeVec + mutex *sync.Mutex +} + +func (p *exporterProxy) registerMetrics() { + metrics.RegisterForProxy() +} + +func (p *exporterProxy) initMetrics() { + p.collectDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: p.namespace, + Name: "exporter_collect_duration_seconds", + Help: "the duration of proxy-exporter collect in seconds", + Buckets: []float64{ // 1ms ~ 10s + 0.001, 0.005, 0.01, + 0.015, 0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.055, 0.06, 0.065, 0.07, 0.075, 0.08, 0.085, 0.09, 0.095, 0.1, + 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.20, + 0.25, 0.5, 0.75, + 1, 2, 5, 10, + }}) // done + p.collectCount = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: p.namespace, + Name: "exporter_collect_count", + Help: "the count of proxy-exporter collect"}) // done + p.scrapeDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: p.namespace, + Name: "exporter_scrape_duration_seconds", + Help: "the each of proxy scrape duration in seconds", + Buckets: []float64{ // 1ms ~ 10s + 0.001, 0.005, 0.01, + 0.015, 0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.055, 0.06, 0.065, 0.07, 0.075, 0.08, 0.085, 0.09, 0.095, 0.1, + 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.20, + 0.25, 0.5, 0.75, + 1, 2, 5, 10, + }, + }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + p.scrapeErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: p.namespace, + Name: "exporter_scrape_errors", + Help: "the each of proxy scrape error count", + }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + p.scrapeLastError = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: p.namespace, + Name: "exporter_last_scrape_error", + Help: "the each of proxy scrape last error", + }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName, "error"}) + p.scrapeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: p.namespace, + Name: "exporter_scrape_count", + Help: "the each of proxy scrape count", + }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + p.up = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: p.namespace, + Name: "up", + Help: "the each of proxy connection status", + }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) +} + +func NewProxyExporter(dis discovery.Discovery, namespace string) (*exporterProxy, error) { + e := &exporterProxy{ + dis: dis, + namespace: namespace, + mutex: new(sync.Mutex), + } + e.registerMetrics() + e.initMetrics() + return e, nil +} + +func (p *exporterProxy) Close() error { + return nil +} + +func (p *exporterProxy) Describe(ch chan<- *prometheus.Desc) { + describer := metrics.DescribeFunc(func(m metrics.MetaData) { + ch <- prometheus.NewDesc(prometheus.BuildFQName(p.namespace, "", m.Name), m.Help, m.Labels, nil) + }) + + for _, metric := range metrics.MetricConfigsProxy { + metric.Desc(describer) + } + + ch <- p.collectDuration.Desc() + ch <- p.collectCount.Desc() + + p.scrapeDuration.Describe(ch) + p.scrapeErrors.Describe(ch) + p.scrapeLastError.Describe(ch) + p.scrapeCount.Describe(ch) + + p.up.Describe(ch) +} + +func (p *exporterProxy) Collect(ch chan<- prometheus.Metric) { + p.mutex.Lock() + defer p.mutex.Unlock() + + startTime := time.Now() + defer func() { + p.collectCount.Inc() + p.collectDuration.Observe(time.Since(startTime).Seconds()) + ch <- p.collectCount + ch <- p.collectDuration + }() + + p.scrape(ch) + + p.scrapeDuration.Collect(ch) + p.scrapeErrors.Collect(ch) + p.scrapeLastError.Collect(ch) + p.scrapeCount.Collect(ch) + + p.up.Collect(ch) +} + +func (p *exporterProxy) scrape(ch chan<- prometheus.Metric) { + startTime := time.Now() + + fut := newFutureForProxy() + + for _, instance := range p.dis.GetInstancesProxy() { + fut.Add() + go func(addr, instanceType, id, productName string) { + p.scrapeCount.WithLabelValues(addr, instanceType, id, productName).Inc() + + defer func() { + p.scrapeDuration.WithLabelValues(addr, instanceType, id, productName).Observe(time.Since(startTime).Seconds()) + }() + + fut.Done(futureKeyForProxy{addr: addr, instance: instanceType, ID: id, productName: productName}, p.collectProxyStats(addr, instanceType, id, productName, ch)) + }(instance.Addr, InstanceType, strconv.Itoa(instance.ID), instance.ProductName) + } + for k, v := range fut.Wait() { + if v != nil { + p.scrapeErrors.WithLabelValues(k.addr, k.instance, k.ID, k.productName).Inc() + p.scrapeLastError.WithLabelValues(k.addr, k.instance, k.ID, k.productName, v.Error()).Set(0) + + log.Errorf("exporter::scrape collect pika failed. pika server:%#v err:%s", k, v.Error()) + } + } + +} + +func (p *exporterProxy) collectProxyStats(addr, instanceType, id, productName string, ch chan<- prometheus.Metric) error { + resp, err := http.Get("http://" + addr + "/proxy/stats") + if err != nil { + p.up.WithLabelValues(addr, instanceType, id, productName).Set(0) + log.Errorf("exporter::scrape collect proxy failed. proxy server:%#v err:%s", addr, err.Error()) + return err + } + p.up.WithLabelValues(addr, instanceType, id, productName).Set(1) + + var resultProxy discovery.ProxyStats + if err = json.NewDecoder(resp.Body).Decode(&resultProxy); err != nil { + log.Errorf("exporter::scrape decode json failed. proxy server:%#v err:%s", addr, err.Error()) + p.scrapeErrors.WithLabelValues(addr, instanceType, id, productName).Inc() + p.scrapeLastError.WithLabelValues(addr, instanceType, id, productName, err.Error()).Set(0) + return err + } + + result, err := metrics.StructToMap(resultProxy) + for k, v := range result { + log.Printf("%s = %s\n", k, v) + } + result[metrics.LabelNameAddr] = addr + result[metrics.LabelInstance] = instanceType + result[metrics.LabelID] = id + result[metrics.LabelProductName] = productName + + collector := metrics.CollectFunc(func(m metrics.Metric) error { + p, err := prometheus.NewConstMetric( + prometheus.NewDesc(prometheus.BuildFQName(p.namespace, "", m.Name), m.Help, m.Labels, nil), + m.MetricsType(), m.Value, m.LabelValues...) + if err != nil { + return err + } + ch <- p + return nil + }) + + parseOpt := metrics.ParseOption{ + Version: nil, + Extracts: result, + Info: "", + } + + for _, m := range metrics.MetricConfigsProxy { + m.Parse(m, collector, parseOpt) + } + return nil +} diff --git a/tools/pika_exporter/main.go b/tools/pika_exporter/main.go index 9f1c600495..57645173cc 100644 --- a/tools/pika_exporter/main.go +++ b/tools/pika_exporter/main.go @@ -9,6 +9,7 @@ import ( "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter" + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -108,6 +109,12 @@ func main() { } defer e.Close() + ep, err := exporter.NewProxyExporter(dis, exporter.NamespaceProxy) + if err != nil { + log.Fatalln("exporter init failed. err:", err) + } + defer ep.Close() + buildInfo := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "pika_exporter_build_info", Help: "pika exporter build_info", @@ -118,7 +125,7 @@ func main() { updatechan := make(chan int) defer close(updatechan) - var exptr_regis ExporterInterface = NewExporterRegistry(dis, e, buildInfo, registry, updatechan) + var exptr_regis ExporterInterface = NewExporterRegistry(dis, e, ep, buildInfo, registry, updatechan) exptr_regis.Start() defer exptr_regis.Stop() @@ -137,6 +144,9 @@ func main() { for _, instance := range dis.GetInstances() { log.Println("Connecting to Pika:", instance.Addr, "Alias:", instance.Alias) } + for _, instance := range dis.GetInstancesProxy() { + log.Println("connecting to Proxy:", instance.Addr, " ID:", instance.ID, " ProductName:", instance.ProductName) + } if *codisaddr != "" { go func() { exptr_regis.LoopCheckUpdate(updatechan, *codisaddr) @@ -170,15 +180,17 @@ type ExporterInterface interface { type ExporterRegistry struct { dis discovery.Discovery expt prometheus.Collector + exptProxy prometheus.Collector regis *prometheus.Registry bif prometheus.Collector updatechan chan int } -func NewExporterRegistry(dis discovery.Discovery, e, buildInfo prometheus.Collector, registry *prometheus.Registry, updatechan chan int) *ExporterRegistry { +func NewExporterRegistry(dis discovery.Discovery, e, ep, buildInfo prometheus.Collector, registry *prometheus.Registry, updatechan chan int) *ExporterRegistry { return &ExporterRegistry{ dis: dis, expt: e, + exptProxy: ep, regis: registry, bif: buildInfo, updatechan: updatechan, @@ -191,11 +203,13 @@ func (exptr_regis *ExporterRegistry) Getregis() *prometheus.Registry { func (exptr_regis *ExporterRegistry) Start() { exptr_regis.regis.MustRegister(exptr_regis.expt) + exptr_regis.regis.MustRegister(exptr_regis.exptProxy) exptr_regis.regis.MustRegister(exptr_regis.bif) } func (exptr_regis *ExporterRegistry) Stop() { exptr_regis.regis.Unregister(exptr_regis.expt) + exptr_regis.regis.Unregister(exptr_regis.exptProxy) exptr_regis.regis.Unregister(exptr_regis.bif) } @@ -206,9 +220,10 @@ func (exptr_regis *ExporterRegistry) Update() { log.Fatalln("exporter get NewCodisDiscovery failed. err:", err) } - exptr_regis.regis.Unregister(exptr_regis.expt) exptr_regis.Stop() + metrics.MetricConfigs = make(map[string]metrics.MetricConfig) + metrics.MetricConfigsProxy = make(map[string]metrics.MetricConfig) new_e, err := exporter.NewPikaExporter(newdis, *namespace, *checkKeyPatterns, *checkKeys, *checkScanCount, *keySpaceStatsClock) if err != nil { log.Fatalln("exporter init failed. err:", err) @@ -216,10 +231,22 @@ func (exptr_regis *ExporterRegistry) Update() { exptr_regis.dis = newdis exptr_regis.expt = new_e } + + new_ep, err := exporter.NewProxyExporter(newdis, exporter.NamespaceProxy) + if err != nil { + log.Fatalln("exporter init failed. err:", err) + } else { + exptr_regis.exptProxy = new_ep + } + exptr_regis.Start() for _, instance := range exptr_regis.dis.GetInstances() { log.Println("Reconnecting to Pika:", instance.Addr, "Alias:", instance.Alias) } + + for _, instance := range exptr_regis.dis.GetInstancesProxy() { + log.Println("Reconnecting to Proxy:", instance.Addr, " ID:", instance.ID, " ProductName:", instance.ProductName) + } } } From 9e75379d2382f3dfdfff2fb9b8b3fb54b449d9f7 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Tue, 12 Dec 2023 19:43:21 +0800 Subject: [PATCH 2/4] Exporter collects Proxy information. --- codis/pkg/proxy/proxy.go | 4 +- codis/pkg/proxy/session.go | 1 + codis/pkg/proxy/stats.go | 10 + .../discovery/codis_dashboard.go | 9 + tools/pika_exporter/discovery/discovery.go | 1 + .../pika_exporter/discovery/discovery_test.go | 14 ++ .../discovery/mockCodisTopom.json | 2 + tools/pika_exporter/exporter/future.go | 2 +- .../pika_exporter/exporter/metrics/metrics.go | 4 +- .../pika_exporter/exporter/metrics/parser.go | 99 +++++++-- tools/pika_exporter/exporter/metrics/proxy.go | 88 +++++++- .../exporter/mockCodisStats.json | 194 ++++++++++++++++++ tools/pika_exporter/exporter/parser_test.go | 37 ++++ tools/pika_exporter/exporter/proxy.go | 53 +++-- 14 files changed, 464 insertions(+), 54 deletions(-) create mode 100644 tools/pika_exporter/exporter/mockCodisStats.json diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index 05cf9de8ef..dac4a9a374 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -558,7 +558,8 @@ type Stats struct { PrimaryOnly bool `json:"primary_only"` } `json:"backend"` - Runtime *RuntimeStats `json:"runtime,omitempty"` + Runtime *RuntimeStats `json:"runtime,omitempty"` + TimeoutCmdNumber int64 `json:"timeout_cmd_number"` } type RuntimeStats struct { @@ -667,5 +668,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats { stats.Runtime.NumCgoCall = runtime.NumCgoCall() stats.Runtime.MemOffheap = unsafe2.OffheapBytes() } + stats.TimeoutCmdNumber = TimeoutCmdNumberInSecond.Int64() return stats } diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index b4fb8f7e57..b8e0c791df 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -242,6 +242,7 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { nowTime := time.Now().UnixNano() duration := int64((nowTime - r.ReceiveTime) / 1e3) if duration >= s.config.SlowlogLogSlowerThan { + TimeoutCmdNumber.Incr() //client -> proxy -> server -> porxy -> client //Record the waiting time from receiving the request from the client to sending it to the backend server //the waiting time from sending the request to the backend server to receiving the response from the server diff --git a/codis/pkg/proxy/stats.go b/codis/pkg/proxy/stats.go index 0af40fb434..0aaa9299cb 100644 --- a/codis/pkg/proxy/stats.go +++ b/codis/pkg/proxy/stats.go @@ -14,6 +14,11 @@ import ( "pika/codis/v2/pkg/utils/sync2/atomic2" ) +var ( + TimeoutCmdNumber atomic2.Int64 + TimeoutCmdNumberInSecond atomic2.Int64 +) + type opStats struct { opstr string calls atomic2.Int64 @@ -62,6 +67,8 @@ var cmdstats struct { func init() { cmdstats.opmap = make(map[string]*opStats, 128) + TimeoutCmdNumber.Set(0) + TimeoutCmdNumberInSecond.Set(0) go func() { for { start := time.Now() @@ -70,6 +77,9 @@ func init() { delta := cmdstats.total.Int64() - total normalized := math.Max(0, float64(delta)) * float64(time.Second) / float64(time.Since(start)) cmdstats.qps.Set(int64(normalized + 0.5)) + + TimeoutCmdNumberInSecond.Swap(TimeoutCmdNumber.Int64()) + TimeoutCmdNumber.Set(0) } }() } diff --git a/tools/pika_exporter/discovery/codis_dashboard.go b/tools/pika_exporter/discovery/codis_dashboard.go index 6f9992f7ef..ff0cd7897a 100644 --- a/tools/pika_exporter/discovery/codis_dashboard.go +++ b/tools/pika_exporter/discovery/codis_dashboard.go @@ -44,11 +44,19 @@ type RedisInfo struct { Errors int `json:"errors"` } +type CmdInfo struct { + Opstr string `json:"opstr"` + Calls int64 `json:"calls"` + Usecs_percall int64 `json:"usecs_percall"` + Fails int64 `json:"fails"` +} + type ProxyOpsInfo struct { Total int `json:"total"` Fails int `json:"fails"` Redis RedisInfo `json:"redis"` Qps int `json:"qps"` + Cmd []CmdInfo `json:"cmd"` } type RowInfo struct { @@ -89,6 +97,7 @@ type RunTimeInfo struct { } type ProxyStats struct { + Online bool `json:"online"` Ops ProxyOpsInfo `json:"ops"` Rusage RusageInfo `json:"rusage"` RunTime RunTimeInfo `json:"runtime"` diff --git a/tools/pika_exporter/discovery/discovery.go b/tools/pika_exporter/discovery/discovery.go index d6f8011b18..be4b155f96 100644 --- a/tools/pika_exporter/discovery/discovery.go +++ b/tools/pika_exporter/discovery/discovery.go @@ -189,6 +189,7 @@ func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) { func (d *codisDiscovery) GetInstances() []Instance { return d.instances } + func (d *codisDiscovery) GetInstancesProxy() []InstanceProxy { return d.instanceProxy } diff --git a/tools/pika_exporter/discovery/discovery_test.go b/tools/pika_exporter/discovery/discovery_test.go index 4b504655e1..9dda9a2611 100644 --- a/tools/pika_exporter/discovery/discovery_test.go +++ b/tools/pika_exporter/discovery/discovery_test.go @@ -44,11 +44,19 @@ func TestNewCodisDiscovery(t *testing.T) { "password1", "password2", } + expectedAddrsForProxy := []string{ + "1.2.3.4:1234", + "1.2.3.4:4321", + } if len(discovery.instances) != len(expectedAddrs) { t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances)) } + if len(discovery.instanceProxy) != len(expectedAddrsForProxy) { + t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances)) + } + for i := range expectedAddrs { if discovery.instances[i].Addr != expectedAddrs[i] { t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr) @@ -57,4 +65,10 @@ func TestNewCodisDiscovery(t *testing.T) { t.Errorf("instance %d password: expected %s but got %s", i, expectedPasswords[i], discovery.instances[i].Password) } } + + for i := range expectedAddrsForProxy { + if expectedAddrsForProxy[i] != discovery.instanceProxy[i].Addr { + t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr) + } + } } diff --git a/tools/pika_exporter/discovery/mockCodisTopom.json b/tools/pika_exporter/discovery/mockCodisTopom.json index 137987af0c..4a364385f5 100644 --- a/tools/pika_exporter/discovery/mockCodisTopom.json +++ b/tools/pika_exporter/discovery/mockCodisTopom.json @@ -89,12 +89,14 @@ { "id": 2, "token": "4337153123141c4df6e363d281", + "admin_addr": "1.2.3.4:1234", "start_time": "2021-07-06 10:30:33.588330243 +0800 CST", "datacenter": "" }, { "id": 3, "token": "ef7e1ad5422ab8e28241410e856", + "admin_addr": "1.2.3.4:4321", "start_time": "2021-07-06 10:34:21.305110128 +0800 CST", "datacenter": "" } diff --git a/tools/pika_exporter/exporter/future.go b/tools/pika_exporter/exporter/future.go index e75ff999f0..6bb39fe5e7 100644 --- a/tools/pika_exporter/exporter/future.go +++ b/tools/pika_exporter/exporter/future.go @@ -38,7 +38,7 @@ func (f *future) Wait() map[futureKey]error { } type futureKeyForProxy struct { - addr, instance, ID, productName string + addr, ID, productName string } type futureForProxy struct { diff --git a/tools/pika_exporter/exporter/metrics/metrics.go b/tools/pika_exporter/exporter/metrics/metrics.go index c0cd55ea9e..d54f78d570 100644 --- a/tools/pika_exporter/exporter/metrics/metrics.go +++ b/tools/pika_exporter/exporter/metrics/metrics.go @@ -16,9 +16,9 @@ const ( LabelNameAlias = "alias" LabelInstanceMode = "instance-mode" LabelConsensusLevel = "consensus-level" - LabelInstance = "instance" - LabelID = "id" + LabelID = "proxy_id" LabelProductName = "product_name" + LabelOpstr = "opstr" ) type Describer interface { diff --git a/tools/pika_exporter/exporter/metrics/parser.go b/tools/pika_exporter/exporter/metrics/parser.go index c34129f6d5..55466beb37 100644 --- a/tools/pika_exporter/exporter/metrics/parser.go +++ b/tools/pika_exporter/exporter/metrics/parser.go @@ -17,9 +17,10 @@ const ( ) type ParseOption struct { - Version *semver.Version - Extracts map[string]string - Info string + Version *semver.Version + Extracts map[string]string + ExtractsProxy map[string][]int64 + Info string } type Parser interface { @@ -242,9 +243,9 @@ func convertToFloat64(s string) float64 { s = strings.ToLower(s) switch s { - case "yes", "up", "online": + case "yes", "up", "online", "true": return 1 - case "no", "down", "offline", "null": + case "no", "down", "offline", "null", "false": return 0 } @@ -274,25 +275,93 @@ func convertTimeToUnix(ts string) (int64, error) { return t.Unix(), nil } -func StructToMap(obj interface{}) (map[string]string, error) { +type proxyParser struct{} + +func (p *proxyParser) Parse(m MetricMeta, c Collector, opt ParseOption) { + m.Lookup(func(m MetaData) { + for opstr, v := range opt.ExtractsProxy { + metric := Metric{ + MetaData: m, + LabelValues: make([]string, len(m.Labels)), + Value: defaultValue, + } + + for i := 0; i < len(m.Labels)-1; i++ { + labelValue, ok := findInMap(m.Labels[i], opt.Extracts) + if !ok { + log.Debugf("normalParser::Parse not found label value. metricName:%s labelName:%s", + m.Name, m.Labels[i]) + } + + metric.LabelValues[i] = labelValue + } + metric.LabelValues[len(m.Labels)-1] = opstr + + switch m.ValueName { + case "calls": + metric.Value = convertToFloat64(strconv.FormatInt(v[0], 10)) + case "usecs_percall": + metric.Value = convertToFloat64(strconv.FormatInt(v[1], 10)) + case "fails": + metric.Value = convertToFloat64(strconv.FormatInt(v[2], 10)) + } + + if err := c.Collect(metric); err != nil { + log.Errorf("proxyParser::Parse metric collect failed. metric:%#v err:%s", + m, m.ValueName) + } + } + }) + +} + +func StructToMap(obj interface{}) (map[string]string, map[string][]int64, error) { + result := make(map[string]string) + cmdResult := make(map[string][]int64) objValue := reflect.ValueOf(obj) objType := objValue.Type() - data := make(map[string]string) - for i := 0; i < objValue.NumField(); i++ { field := objValue.Field(i) - fieldName := objType.Field(i).Name + fieldType := objType.Field(i) + jsonName := fieldType.Tag.Get("json") + if jsonName == "" { + jsonName = fieldType.Name + } + value := field.Interface() if field.Kind() == reflect.Struct { - innerData, _ := StructToMap(field.Interface()) - for k, v := range innerData { - data[k] = v + subMap, subCmdMap, _ := StructToMap(value) + for k, v := range subMap { + result[strings.ToLower(jsonName+"_"+k)] = v + } + for k, v := range subCmdMap { + if v != nil { + for index := range v { + cmdResult[k] = append(cmdResult[k], v[index]) + } + } + } + } else if field.Kind() == reflect.Slice && field.Len() > 0 { + for j := 0; j < field.Len(); j++ { + elemType := field.Index(j).Type() + elemValue := field.Index(j) + if elemType.Kind() == reflect.Struct { + var key string + for p := 0; p < elemValue.NumField(); p++ { + if p == 0 { + key = elemValue.Field(p).String() + } else { + cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int()) + } + } + } else { + result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value) + } } } else { - data[fieldName] = fmt.Sprintf("%v", field.Interface()) + result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value) } } - - return data, nil + return result, cmdResult, nil } diff --git a/tools/pika_exporter/exporter/metrics/proxy.go b/tools/pika_exporter/exporter/metrics/proxy.go index 1566475c74..655f22401b 100644 --- a/tools/pika_exporter/exporter/metrics/proxy.go +++ b/tools/pika_exporter/exporter/metrics/proxy.go @@ -2,6 +2,7 @@ package metrics func RegisterForProxy() { RegisterProxy(collectProxyMetrics) + RegisterProxy(collectPorxyCmdMetrics) } var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{ @@ -11,8 +12,8 @@ var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{ Name: "ops_total", Help: "proxy total ops", Type: metricTypeCounter, - Labels: []string{LabelNameAddr, LabelInstance, LabelID, LabelProductName}, - ValueName: "Total", + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "ops_total", }, }, "ops_fails": { @@ -21,18 +22,91 @@ var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{ Name: "ops_fails", Help: "proxy fails counter", Type: metricTypeCounter, - Labels: []string{LabelNameAddr, LabelInstance, LabelID, LabelProductName}, - ValueName: "Fails", + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "ops_fails", }, }, "qps": { Parser: &normalParser{}, MetricMeta: &MetaData{ Name: "qps", - Help: "proxy qps", + Help: "The Proxy qps", Type: metricTypeGauge, - Labels: []string{LabelNameAddr, LabelInstance, LabelID, LabelProductName}, - ValueName: "Qps", + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "ops_qps", + }, + }, + "rusage_cpu": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "rusage_cpu", + Help: "The CPU usage rate of the proxy", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "rusage_cpu", + }, + }, + "reusage_mem": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "rusage_mem", + Help: "The mem usage of the proxy", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "rusage_mem", + }, + }, + "online": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "online", + Help: "Is the Proxy online", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "online", + }, + }, + "timeout_cmd_number": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "timeout_cmd_number", + Help: "The number of commands recorded in the slow log within the last second", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelID, LabelProductName}, + ValueName: "timeout_cmd_number", + }, + }, +} + +var collectPorxyCmdMetrics map[string]MetricConfig = map[string]MetricConfig{ + "calls": { + Parser: &proxyParser{}, + MetricMeta: &MetaData{ + Name: "calls", + Help: "the number of cmd calls", + Type: metricTypeCounter, + Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr}, + ValueName: "calls", + }, + }, + "usecs_percall": { + Parser: &proxyParser{}, + MetricMeta: &MetaData{ + Name: "usecs_percall", + Help: "Average duration per call", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr}, + ValueName: "usecs_percall", + }, + }, + "fails": { + Parser: &proxyParser{}, + MetricMeta: &MetaData{ + Name: "fails", + Help: "the number of cmd fail", + Type: metricTypeCounter, + Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr}, + ValueName: "fails", }, }, } diff --git a/tools/pika_exporter/exporter/mockCodisStats.json b/tools/pika_exporter/exporter/mockCodisStats.json new file mode 100644 index 0000000000..2dd3fbd646 --- /dev/null +++ b/tools/pika_exporter/exporter/mockCodisStats.json @@ -0,0 +1,194 @@ +{ + "version": "2018-11-04 16:22:35 +0800 @de1ad026e329561c22e2a3035fbfe89dc7fef764 @3.2.2-12-gde1ad026", + "compile": "2023-02-23 11:25:09 +0800 by go version go1.19.6 linux/amd64", + "config": { + "coordinator_name": "zookeeper", + "coordinator_addr": "1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181", + "coordinator_auth": "", + "admin_addr": "0.0.0.0:1234", + "product_name": "test", + "migration_method": "semi-async", + "migration_parallel_slots": 100, + "migration_async_maxbulks": 200, + "migration_async_maxbytes": "32mb", + "migration_async_numkeys": 500, + "migration_timeout": "30s", + "sentinel_client_timeout": "10s", + "sentinel_quorum": 2, + "sentinel_parallel_syncs": 1, + "sentinel_down_after": "30s", + "sentinel_failover_timeout": "5m", + "sentinel_notification_script": "", + "sentinel_client_reconfig_script": "" + }, + "model": { + "token": "sdf123sdf112xx4", + "start_time": "2021-06-28 11:32:49.497871229 +0800 CST", + "admin_addr": "1.1.1.4:12345", + "product_name": "test", + "pid": 1740, + "pwd": "/data/codis", + "sys": "Linux localhost.localdomain 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux" + }, + "stats": { + "closed": false, + "slots": [ + { + "id": 0, + "group_id": 1, + "action": {} + }, + { + "id": 1, + "group_id": 1, + "action": {} + } + ], + "group": { + "models": [ + { + "id": 1, + "servers": [ + { + "server": "127.0.0.1:9221", + "datacenter": "", + "action": {}, + "role": "master", + "reply_offset": 0, + "state": 2, + "recall_times": 33, + "replica_group": false + } + ], + "promoting": {}, + "out_of_sync": false + } + ], + "stats": { + "1.1.1.6:1100": { + "stats": { + "active_defrag_hits": "0" + }, + "unixtime": 1693281655 + }, + "1.1.1.7:1100": { + "stats": { + "active_defrag_hits": "0" + }, + "unixtime": 1693281655 + }, + "unixtime": 1693281655 + } + }, + "proxy": { + "models": [ + { + "id": 1, + "token": "722fc570089c135c34f668996c12aa40", + "start_time": "2023-12-12 15:10:18.645485 +0800 CST m=+0.036954501", + "admin_addr": "1.2.3.4:1234", + "proto_type": "tcp4", + "proxy_addr": "1.2.3.4:4321", + "product_name": "codis-demo", + "pid": 60611, + "pwd": "/Users/admin", + "sys": "Darwin MacBook-Pro.local 22.6.0 Darwin Kernel Version 22.6.0; root:xnu-8796.141.3~6/RELEASE_ARM64_T8112 arm64", + "max_slot_num": 1024, + "hostname": "MacBook-Pro.local", + "datacenter": "" + } + ], + "stats": { + "4337153c64e1babd3b5c4df6e363d281": { + "stats": { + "online": true, + "closed": false, + "sentinels": {}, + "ops": { + "total": 791976, + "fails": 6, + "redis": { + "errors": 1 + }, + "qps": 11 + }, + "sessions": { + "total": 528784, + "alive": 34 + }, + "rusage": { + "now": "2023-08-29 12:00:53.605229492 +0800 CST", + "cpu": 0.009997645624438964, + "mem": 162881512312336, + "raw": { + "utime": 4474490510000000, + "stime": 4125310130000000, + "cutime": 0, + "cstime": 0, + "num_threads": 358, + "vm_size": 29351325696, + "vm_rss": 162881536 + } + }, + "backend": { + "primary_only": false + } + }, + "unixtime": 1693281654 + }, + "ef87e1ad5422ab8e2816b6694750e856": { + "stats": { + "online": true, + "closed": false, + "sentinels": {}, + "ops": { + "total": 27570566, + "fails": 31, + "redis": { + "errors": 2 + }, + "qps": 8 + }, + "sessions": { + "total": 6349, + "alive": 4 + }, + "rusage": { + "now": "2023-08-29 12:00:54.516379647 +0800 CST", + "cpu": 0, + "mem": 19081124412160, + "raw": { + "utime": 1666690710000000, + "stime": 2573457670000000, + "cutime": 0, + "cstime": 0, + "num_threads": 51, + "vm_size": 8284008448, + "vm_rss": 190812160 + } + }, + "backend": { + "primary_only": false + } + }, + "unixtime": 1693281654 + } + } + }, + "slot_action": { + "interval": 0, + "disabled": false, + "progress": { + "status": "" + }, + "executor": 0 + }, + "sentinels": { + "model": { + "out_of_sync": false + }, + "stats": {}, + "masters": {} + } + } +} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/parser_test.go b/tools/pika_exporter/exporter/parser_test.go index ded4f598b4..c7e425ae2b 100644 --- a/tools/pika_exporter/exporter/parser_test.go +++ b/tools/pika_exporter/exporter/parser_test.go @@ -1,12 +1,15 @@ package exporter import ( + "encoding/json" "fmt" + "io/ioutil" "testing" "github.com/Masterminds/semver" "github.com/stretchr/testify/assert" + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/test" ) @@ -92,3 +95,37 @@ func Benchmark_Parse(b *testing.B) { } }) } + +func Test_Parse_Proxy_Stats(t *testing.T) { + jsonFile := "mockCodisStats.json" + jsonData, err := ioutil.ReadFile(jsonFile) + if err != nil { + t.Fatalf("failed to read test data: %v", err) + } + var resultProxy discovery.ProxyStats + err = json.Unmarshal(jsonData, &resultProxy) + + result, resultCmd, err := metrics.StructToMap(resultProxy) + + result[metrics.LabelNameAddr] = "addr" + result[metrics.LabelID] = "id" + result[metrics.LabelProductName] = "productName" + + collector := metrics.CollectFunc(func(m metrics.Metric) error { + t.Logf("metric:%#v", m) + return nil + }) + + parseOpt := metrics.ParseOption{ + Version: nil, + Extracts: result, + ExtractsProxy: resultCmd, + Info: "", + } + + t.Logf("########## begin parse###########") + for _, m := range metrics.MetricConfigsProxy { + m.Parse(m, collector, parseOpt) + fmt.Println(m) + } +} diff --git a/tools/pika_exporter/exporter/proxy.go b/tools/pika_exporter/exporter/proxy.go index fe8d2140a0..13c43fb9c4 100644 --- a/tools/pika_exporter/exporter/proxy.go +++ b/tools/pika_exporter/exporter/proxy.go @@ -14,7 +14,6 @@ import ( const ( NamespaceProxy = "proxy" - InstanceType ) type exporterProxy struct { @@ -45,11 +44,11 @@ func (p *exporterProxy) initMetrics() { 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.20, 0.25, 0.5, 0.75, 1, 2, 5, 10, - }}) // done + }}) p.collectCount = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: p.namespace, Name: "exporter_collect_count", - Help: "the count of proxy-exporter collect"}) // done + Help: "the count of proxy-exporter collect"}) p.scrapeDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: p.namespace, Name: "exporter_scrape_duration_seconds", @@ -61,27 +60,27 @@ func (p *exporterProxy) initMetrics() { 0.25, 0.5, 0.75, 1, 2, 5, 10, }, - }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + }, []string{metrics.LabelNameAddr, metrics.LabelID, metrics.LabelProductName}) p.scrapeErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: p.namespace, Name: "exporter_scrape_errors", Help: "the each of proxy scrape error count", - }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + }, []string{metrics.LabelNameAddr, metrics.LabelID, metrics.LabelProductName}) p.scrapeLastError = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: p.namespace, Name: "exporter_last_scrape_error", Help: "the each of proxy scrape last error", - }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName, "error"}) + }, []string{metrics.LabelNameAddr, metrics.LabelID, metrics.LabelProductName, "error"}) p.scrapeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: p.namespace, Name: "exporter_scrape_count", Help: "the each of proxy scrape count", - }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + }, []string{metrics.LabelNameAddr, metrics.LabelID, metrics.LabelProductName}) p.up = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: p.namespace, Name: "up", Help: "the each of proxy connection status", - }, []string{metrics.LabelNameAddr, metrics.LabelInstance, metrics.LabelID, metrics.LabelProductName}) + }, []string{metrics.LabelNameAddr, metrics.LabelID, metrics.LabelProductName}) } func NewProxyExporter(dis discovery.Discovery, namespace string) (*exporterProxy, error) { @@ -148,20 +147,20 @@ func (p *exporterProxy) scrape(ch chan<- prometheus.Metric) { for _, instance := range p.dis.GetInstancesProxy() { fut.Add() - go func(addr, instanceType, id, productName string) { - p.scrapeCount.WithLabelValues(addr, instanceType, id, productName).Inc() + go func(addr, id, productName string) { + p.scrapeCount.WithLabelValues(addr, id, productName).Inc() defer func() { - p.scrapeDuration.WithLabelValues(addr, instanceType, id, productName).Observe(time.Since(startTime).Seconds()) + p.scrapeDuration.WithLabelValues(addr, id, productName).Observe(time.Since(startTime).Seconds()) }() - fut.Done(futureKeyForProxy{addr: addr, instance: instanceType, ID: id, productName: productName}, p.collectProxyStats(addr, instanceType, id, productName, ch)) - }(instance.Addr, InstanceType, strconv.Itoa(instance.ID), instance.ProductName) + fut.Done(futureKeyForProxy{addr: addr, ID: id, productName: productName}, p.collectProxyStats(addr, id, productName, ch)) + }(instance.Addr, strconv.Itoa(instance.ID), instance.ProductName) } for k, v := range fut.Wait() { if v != nil { - p.scrapeErrors.WithLabelValues(k.addr, k.instance, k.ID, k.productName).Inc() - p.scrapeLastError.WithLabelValues(k.addr, k.instance, k.ID, k.productName, v.Error()).Set(0) + p.scrapeErrors.WithLabelValues(k.addr, k.ID, k.productName).Inc() + p.scrapeLastError.WithLabelValues(k.addr, k.ID, k.productName, v.Error()).Set(0) log.Errorf("exporter::scrape collect pika failed. pika server:%#v err:%s", k, v.Error()) } @@ -169,29 +168,26 @@ func (p *exporterProxy) scrape(ch chan<- prometheus.Metric) { } -func (p *exporterProxy) collectProxyStats(addr, instanceType, id, productName string, ch chan<- prometheus.Metric) error { +func (p *exporterProxy) collectProxyStats(addr, id, productName string, ch chan<- prometheus.Metric) error { resp, err := http.Get("http://" + addr + "/proxy/stats") if err != nil { - p.up.WithLabelValues(addr, instanceType, id, productName).Set(0) + p.up.WithLabelValues(addr, id, productName).Set(0) log.Errorf("exporter::scrape collect proxy failed. proxy server:%#v err:%s", addr, err.Error()) return err } - p.up.WithLabelValues(addr, instanceType, id, productName).Set(1) + p.up.WithLabelValues(addr, id, productName).Set(1) var resultProxy discovery.ProxyStats if err = json.NewDecoder(resp.Body).Decode(&resultProxy); err != nil { log.Errorf("exporter::scrape decode json failed. proxy server:%#v err:%s", addr, err.Error()) - p.scrapeErrors.WithLabelValues(addr, instanceType, id, productName).Inc() - p.scrapeLastError.WithLabelValues(addr, instanceType, id, productName, err.Error()).Set(0) + p.scrapeErrors.WithLabelValues(addr, id, productName).Inc() + p.scrapeLastError.WithLabelValues(addr, id, productName, err.Error()).Set(0) return err } - result, err := metrics.StructToMap(resultProxy) - for k, v := range result { - log.Printf("%s = %s\n", k, v) - } + result, resultCmd, err := metrics.StructToMap(resultProxy) + result[metrics.LabelNameAddr] = addr - result[metrics.LabelInstance] = instanceType result[metrics.LabelID] = id result[metrics.LabelProductName] = productName @@ -207,9 +203,10 @@ func (p *exporterProxy) collectProxyStats(addr, instanceType, id, productName st }) parseOpt := metrics.ParseOption{ - Version: nil, - Extracts: result, - Info: "", + Version: nil, + Extracts: result, + ExtractsProxy: resultCmd, + Info: "", } for _, m := range metrics.MetricConfigsProxy { From e13eff8936de48946438f265e58994a8facabfd3 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Fri, 15 Dec 2023 11:38:16 +0800 Subject: [PATCH 3/4] Improve metrics --- codis/pkg/proxy/proxy.go | 6 +-- codis/pkg/proxy/session.go | 14 ++++-- codis/pkg/proxy/stats.go | 46 ++++++++++++++----- include/pika_server.h | 2 + src/pika_admin.cc | 2 +- src/pika_server.cc | 6 +++ .../discovery/codis_dashboard.go | 11 +++-- .../pika_exporter/exporter/metrics/parser.go | 2 + tools/pika_exporter/exporter/metrics/proxy.go | 36 +++++++++------ tools/pika_exporter/exporter/metrics/stats.go | 10 ++++ 10 files changed, 98 insertions(+), 37 deletions(-) diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index dac4a9a374..379e883ee4 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -558,8 +558,8 @@ type Stats struct { PrimaryOnly bool `json:"primary_only"` } `json:"backend"` - Runtime *RuntimeStats `json:"runtime,omitempty"` - TimeoutCmdNumber int64 `json:"timeout_cmd_number"` + Runtime *RuntimeStats `json:"runtime,omitempty"` + SlowCmdCount int64 `json:"slow_cmd_count"` // Cumulative count of slow log } type RuntimeStats struct { @@ -668,6 +668,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats { stats.Runtime.NumCgoCall = runtime.NumCgoCall() stats.Runtime.MemOffheap = unsafe2.OffheapBytes() } - stats.TimeoutCmdNumber = TimeoutCmdNumberInSecond.Int64() + stats.SlowCmdCount = SlowCmdCount.Int64() return stats } diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index b8e0c791df..26137d2182 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -236,13 +236,14 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { } else { s.incrOpStats(r, resp.Type) } + nowTime := time.Now().UnixNano() + duration := int64((nowTime - r.ReceiveTime) / 1e3) + s.updateMaxDelay(duration, r) if fflush { s.flushOpStats(false) } - nowTime := time.Now().UnixNano() - duration := int64((nowTime - r.ReceiveTime) / 1e3) if duration >= s.config.SlowlogLogSlowerThan { - TimeoutCmdNumber.Incr() + SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs. //client -> proxy -> server -> porxy -> client //Record the waiting time from receiving the request from the client to sending it to the backend server //the waiting time from sending the request to the backend server to receiving the response from the server @@ -759,3 +760,10 @@ func (s *Session) handlePConfig(r *Request) error { } return nil } + +func (s *Session) updateMaxDelay(duration int64, r *Request) { + e := s.getOpStats(r.OpStr) // There is no race condition in the session + if duration > e.maxDelay.Int64() { + e.maxDelay.Set(duration) + } +} diff --git a/codis/pkg/proxy/stats.go b/codis/pkg/proxy/stats.go index 0aaa9299cb..eb46539550 100644 --- a/codis/pkg/proxy/stats.go +++ b/codis/pkg/proxy/stats.go @@ -14,10 +14,7 @@ import ( "pika/codis/v2/pkg/utils/sync2/atomic2" ) -var ( - TimeoutCmdNumber atomic2.Int64 - TimeoutCmdNumberInSecond atomic2.Int64 -) +var SlowCmdCount atomic2.Int64 // Cumulative count of slow log type opStats struct { opstr string @@ -27,14 +24,16 @@ type opStats struct { redis struct { errors atomic2.Int64 } + maxDelay atomic2.Int64 } func (s *opStats) OpStats() *OpStats { o := &OpStats{ - OpStr: s.opstr, - Calls: s.calls.Int64(), - Usecs: s.nsecs.Int64() / 1e3, - Fails: s.fails.Int64(), + OpStr: s.opstr, + Calls: s.calls.Int64(), + Usecs: s.nsecs.Int64() / 1e3, + Fails: s.fails.Int64(), + MaxDelay: s.maxDelay.Int64(), } if o.Calls != 0 { o.UsecsPercall = o.Usecs / o.Calls @@ -50,6 +49,7 @@ type OpStats struct { UsecsPercall int64 `json:"usecs_percall"` Fails int64 `json:"fails"` RedisErrType int64 `json:"redis_errtype"` + MaxDelay int64 `json:"max_delay"` } var cmdstats struct { @@ -67,8 +67,7 @@ var cmdstats struct { func init() { cmdstats.opmap = make(map[string]*opStats, 128) - TimeoutCmdNumber.Set(0) - TimeoutCmdNumberInSecond.Set(0) + SlowCmdCount.Set(0) go func() { for { start := time.Now() @@ -77,9 +76,16 @@ func init() { delta := cmdstats.total.Int64() - total normalized := math.Max(0, float64(delta)) * float64(time.Second) / float64(time.Since(start)) cmdstats.qps.Set(int64(normalized + 0.5)) + } + }() - TimeoutCmdNumberInSecond.Swap(TimeoutCmdNumber.Int64()) - TimeoutCmdNumber.Set(0) + // Clear the accumulated maximum delay to 0 every 15 seconds. + go func() { + for { + time.Sleep(15 * time.Second) + for _, s := range cmdstats.opmap { + s.maxDelay.Set(0) + } } }() } @@ -175,6 +181,22 @@ func incrOpStats(e *opStats) { s.redis.errors.Add(n) cmdstats.redis.errors.Add(n) } + + /** + Each session refreshes its own saved metrics, and there is a race condition at this time. + Use the CAS method to update. + */ + for { + oldValue := s.maxDelay + if e.maxDelay > oldValue { + if s.maxDelay.CompareAndSwap(oldValue.Int64(), e.maxDelay.Int64()) { + e.maxDelay.Set(0) + break + } + } else { + break + } + } } var sessions struct { diff --git a/include/pika_server.h b/include/pika_server.h index 98594cbf8f..29f3e54589 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -327,6 +327,7 @@ class PikaServer : public pstd::noncopyable { uint32_t SlowlogLen(); void SlowlogObtain(int64_t number, std::vector* slowlogs); void SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int64_t duration); + uint64_t SlowlogCount(); /* * Statistic used @@ -680,6 +681,7 @@ class PikaServer : public pstd::noncopyable { * Slowlog used */ uint64_t slowlog_entry_id_ = 0; + uint64_t slowlog_counter_ = 0; std::shared_mutex slowlog_protector_; std::list slowlog_list_; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index d2f02610f6..bd939e3e6c 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1056,7 +1056,7 @@ void InfoCmd::InfoStats(std::string& info) { tmp_stream << "is_slots_migrating:" << (is_migrating ? "Yes, " : "No, ") << start_migration_time_str << ", " << (is_migrating ? (current_time_s - start_migration_time) : (end_migration_time - start_migration_time)) << "\r\n"; - + tmp_stream << "slow_logs_count:" << g_pika_server->SlowlogCount() << "\r\n"; info.append(tmp_stream.str()); } diff --git a/src/pika_server.cc b/src/pika_server.cc index 3d09f47625..7462203cb9 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1198,11 +1198,17 @@ void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int entry.start_time = time; entry.duration = duration; slowlog_list_.push_front(entry); + slowlog_counter_++; } SlowlogTrim(); } +uint64_t PikaServer::SlowlogCount() { + std::shared_lock l(slowlog_protector_); + return slowlog_counter_; +} + void PikaServer::ResetStat() { statistic_.server_stat.accumulative_connections.store(0); statistic_.server_stat.qps.querynum.store(0); diff --git a/tools/pika_exporter/discovery/codis_dashboard.go b/tools/pika_exporter/discovery/codis_dashboard.go index ff0cd7897a..4526dd6f9b 100644 --- a/tools/pika_exporter/discovery/codis_dashboard.go +++ b/tools/pika_exporter/discovery/codis_dashboard.go @@ -49,6 +49,7 @@ type CmdInfo struct { Calls int64 `json:"calls"` Usecs_percall int64 `json:"usecs_percall"` Fails int64 `json:"fails"` + MaxDelay int64 `json:"max_delay"` } type ProxyOpsInfo struct { @@ -97,9 +98,9 @@ type RunTimeInfo struct { } type ProxyStats struct { - Online bool `json:"online"` - Ops ProxyOpsInfo `json:"ops"` - Rusage RusageInfo `json:"rusage"` - RunTime RunTimeInfo `json:"runtime"` - TimeoutCmdNumber int64 `json:"timeout_cmd_number"` + Online bool `json:"online"` + Ops ProxyOpsInfo `json:"ops"` + Rusage RusageInfo `json:"rusage"` + RunTime RunTimeInfo `json:"runtime"` + SlowCmdCount int64 `json:"slow_cmd_count"` } diff --git a/tools/pika_exporter/exporter/metrics/parser.go b/tools/pika_exporter/exporter/metrics/parser.go index 55466beb37..6488dbd336 100644 --- a/tools/pika_exporter/exporter/metrics/parser.go +++ b/tools/pika_exporter/exporter/metrics/parser.go @@ -304,6 +304,8 @@ func (p *proxyParser) Parse(m MetricMeta, c Collector, opt ParseOption) { metric.Value = convertToFloat64(strconv.FormatInt(v[1], 10)) case "fails": metric.Value = convertToFloat64(strconv.FormatInt(v[2], 10)) + case "max_delay": + metric.Value = convertToFloat64(strconv.FormatInt(v[3], 10)) } if err := c.Collect(metric); err != nil { diff --git a/tools/pika_exporter/exporter/metrics/proxy.go b/tools/pika_exporter/exporter/metrics/proxy.go index 655f22401b..87a623d0a9 100644 --- a/tools/pika_exporter/exporter/metrics/proxy.go +++ b/tools/pika_exporter/exporter/metrics/proxy.go @@ -6,20 +6,20 @@ func RegisterForProxy() { } var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{ - "ops_total": { + "total_ops": { Parser: &normalParser{}, MetricMeta: &MetaData{ - Name: "ops_total", + Name: "total_ops", Help: "proxy total ops", Type: metricTypeCounter, Labels: []string{LabelNameAddr, LabelID, LabelProductName}, ValueName: "ops_total", }, }, - "ops_fails": { + "total_ops_fails": { Parser: &normalParser{}, MetricMeta: &MetaData{ - Name: "ops_fails", + Name: "total_ops_fails", Help: "proxy fails counter", Type: metricTypeCounter, Labels: []string{LabelNameAddr, LabelID, LabelProductName}, @@ -66,23 +66,23 @@ var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{ ValueName: "online", }, }, - "timeout_cmd_number": { + "total_slow_cmd": { Parser: &normalParser{}, MetricMeta: &MetaData{ - Name: "timeout_cmd_number", - Help: "The number of commands recorded in the slow log within the last second", - Type: metricTypeGauge, + Name: "total_slow_cmd", + Help: "The number of commands recorded in the slow log", + Type: metricTypeCounter, Labels: []string{LabelNameAddr, LabelID, LabelProductName}, - ValueName: "timeout_cmd_number", + ValueName: "slow_cmd_count", }, }, } var collectPorxyCmdMetrics map[string]MetricConfig = map[string]MetricConfig{ - "calls": { + "total_calls": { Parser: &proxyParser{}, MetricMeta: &MetaData{ - Name: "calls", + Name: "total_calls", Help: "the number of cmd calls", Type: metricTypeCounter, Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr}, @@ -99,14 +99,24 @@ var collectPorxyCmdMetrics map[string]MetricConfig = map[string]MetricConfig{ ValueName: "usecs_percall", }, }, - "fails": { + "total_fails": { Parser: &proxyParser{}, MetricMeta: &MetaData{ - Name: "fails", + Name: "total_fails", Help: "the number of cmd fail", Type: metricTypeCounter, Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr}, ValueName: "fails", }, }, + "max_delay": { + Parser: &proxyParser{}, + MetricMeta: &MetaData{ + Name: "max_delay", + Help: "The maximum time consumed by this command since the last collection.", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr}, + ValueName: "max_delay", + }, + }, } diff --git a/tools/pika_exporter/exporter/metrics/stats.go b/tools/pika_exporter/exporter/metrics/stats.go index e2b2672f2e..6fb0d59dc6 100644 --- a/tools/pika_exporter/exporter/metrics/stats.go +++ b/tools/pika_exporter/exporter/metrics/stats.go @@ -158,4 +158,14 @@ var collectStatsMetrics = map[string]MetricConfig{ Labels: []string{LabelNameAddr, LabelNameAlias, "is_compact", "compact_cron", "compact_interval"}, }, }, + "total_slow_log": { + Parser: &normalParser{}, + MetricMeta: &MetaData{ + Name: "total_slow_log", + Help: "pika serve instance total count of slow log", + Type: metricTypeCounter, + Labels: []string{LabelNameAddr, LabelNameAlias}, + ValueName: "slow_logs_count", + }, + }, } From f7c73b890d154624a6648f3852ed7d4a622b787d Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Tue, 19 Dec 2023 14:00:48 +0800 Subject: [PATCH 4/4] Dynamically adjust refresh time. --- codis/cmd/proxy/main.go | 2 + codis/config/proxy.toml | 2 + codis/pkg/proxy/config.go | 9 + codis/pkg/proxy/proxy.go | 18 ++ codis/pkg/proxy/stats.go | 9 +- tools/pika_exporter/discovery/discovery.go | 7 +- tools/pika_exporter/exporter/future.go | 13 +- .../pika_exporter/exporter/metrics/parser.go | 12 +- .../exporter/mockCodisStats.json | 194 ------------------ .../exporter/mockProxyStats.json | 88 ++++++++ tools/pika_exporter/exporter/parser_test.go | 2 +- tools/pika_exporter/exporter/proxy.go | 13 +- 12 files changed, 151 insertions(+), 218 deletions(-) delete mode 100644 tools/pika_exporter/exporter/mockCodisStats.json create mode 100644 tools/pika_exporter/exporter/mockProxyStats.json diff --git a/codis/cmd/proxy/main.go b/codis/cmd/proxy/main.go index 8d7da2381f..26cfb37197 100644 --- a/codis/cmd/proxy/main.go +++ b/codis/cmd/proxy/main.go @@ -193,6 +193,8 @@ Options: } defer s.Close() + proxy.RefreshPeriod.Set(config.MaxDelayRefreshTimeInterval.Int64()) + log.Warnf("create proxy with config\n%s", config) if s, ok := utils.Argument(d, "--pidfile"); ok { diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index 07f5a3126a..1cc55ea27a 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -118,3 +118,5 @@ metrics_report_statsd_server = "" metrics_report_statsd_period = "1s" metrics_report_statsd_prefix = "" +# Maximum delay statistical time interval.(This value must be greater than 0.) +max_delay_refresh_time_interval = "15s" diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index c531d4ab53..62051c9017 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -133,6 +133,9 @@ metrics_report_influxdb_database = "" metrics_report_statsd_server = "" metrics_report_statsd_period = "1s" metrics_report_statsd_prefix = "" + +# Maximum delay statistical time interval.(This value must be greater than 0.) +max_delay_refresh_time_interval = "15s" ` type Config struct { @@ -192,6 +195,8 @@ type Config struct { MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` + MaxDelayRefreshTimeInterval timesize.Duration `toml:"max_delay_refresh_time_interval" json:"max_delay_refresh_time_interval"` + ConfigFileName string `toml:"-" json:"config_file_name"` } @@ -323,5 +328,9 @@ func (c *Config) Validate() error { return errors.New("invalid metrics_report_statsd_period") } + if c.MaxDelayRefreshTimeInterval <= 0 { + return errors.New("max_delay_refresh_time_interval must be greater than 0") + } + return nil } diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index 379e883ee4..ba6115cd4a 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -308,6 +308,12 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp { redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")), redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)), }) + case "max_delay_refresh_time_interval": + if text, err := p.config.MaxDelayRefreshTimeInterval.MarshalText(); err != nil { + return redis.NewErrorf("cant get max_delay_refresh_time_interval value.") + } else { + return redis.NewBulkBytes(text) + } default: return redis.NewErrorf("unsupported key: %s", key) } @@ -342,6 +348,18 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp { } p.config.SlowlogLogSlowerThan = n return redis.NewString([]byte("OK")) + case "max_delay_refresh_time_interval": + s := &(p.config.MaxDelayRefreshTimeInterval) + err := s.UnmarshalText([]byte(value)) + if err != nil { + return redis.NewErrorf("err:%s.", err) + } + if d := p.config.MaxDelayRefreshTimeInterval.Duration(); d <= 0 { + return redis.NewErrorf("max_delay_refresh_time_interval must be greater than 0") + } else { + RefreshPeriod.Set(int64(d)) + return redis.NewString([]byte("OK")) + } default: return redis.NewErrorf("unsupported key: %s", key) } diff --git a/codis/pkg/proxy/stats.go b/codis/pkg/proxy/stats.go index eb46539550..06a2b67aa2 100644 --- a/codis/pkg/proxy/stats.go +++ b/codis/pkg/proxy/stats.go @@ -14,7 +14,10 @@ import ( "pika/codis/v2/pkg/utils/sync2/atomic2" ) -var SlowCmdCount atomic2.Int64 // Cumulative count of slow log +var ( + SlowCmdCount atomic2.Int64 // Cumulative count of slow log + RefreshPeriod atomic2.Int64 +) type opStats struct { opstr string @@ -79,10 +82,10 @@ func init() { } }() - // Clear the accumulated maximum delay to 0 every 15 seconds. + // Clear the accumulated maximum delay to 0 go func() { for { - time.Sleep(15 * time.Second) + time.Sleep(time.Duration(RefreshPeriod.Int64())) for _, s := range cmdstats.opmap { s.maxDelay.Set(0) } diff --git a/tools/pika_exporter/discovery/discovery.go b/tools/pika_exporter/discovery/discovery.go index be4b155f96..fd16a0979d 100644 --- a/tools/pika_exporter/discovery/discovery.go +++ b/tools/pika_exporter/discovery/discovery.go @@ -206,8 +206,11 @@ func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) { } func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool { - var addrs, addrsProxy []string - var diff bool = false + var ( + addrs []string + addrsProxy []string + diff bool + ) for _, instance := range new_instance.instances { addrs = append(addrs, instance.Addr) } diff --git a/tools/pika_exporter/exporter/future.go b/tools/pika_exporter/exporter/future.go index 6bb39fe5e7..3ca3afbc4a 100644 --- a/tools/pika_exporter/exporter/future.go +++ b/tools/pika_exporter/exporter/future.go @@ -3,7 +3,8 @@ package exporter import "sync" type futureKey struct { - addr, alias string + addr string + alias string } type future struct { @@ -38,7 +39,9 @@ func (f *future) Wait() map[futureKey]error { } type futureKeyForProxy struct { - addr, ID, productName string + addr string + ID string + productName string } type futureForProxy struct { @@ -60,9 +63,11 @@ func (f *futureForProxy) Add() { func (f *futureForProxy) Done(key futureKeyForProxy, val error) { f.Lock() - defer f.Unlock() + defer func() { + f.Unlock() + f.wait.Done() + }() f.m[key] = val - f.wait.Done() } func (f *futureForProxy) Wait() map[futureKeyForProxy]error { diff --git a/tools/pika_exporter/exporter/metrics/parser.go b/tools/pika_exporter/exporter/metrics/parser.go index 6488dbd336..15d7691732 100644 --- a/tools/pika_exporter/exporter/metrics/parser.go +++ b/tools/pika_exporter/exporter/metrics/parser.go @@ -348,14 +348,10 @@ func StructToMap(obj interface{}) (map[string]string, map[string][]int64, error) for j := 0; j < field.Len(); j++ { elemType := field.Index(j).Type() elemValue := field.Index(j) - if elemType.Kind() == reflect.Struct { - var key string - for p := 0; p < elemValue.NumField(); p++ { - if p == 0 { - key = elemValue.Field(p).String() - } else { - cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int()) - } + if elemType.Kind() == reflect.Struct && elemValue.NumField() > 0 { + var key string = elemValue.Field(0).String() + for p := 1; p < elemValue.NumField(); p++ { + cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int()) } } else { result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value) diff --git a/tools/pika_exporter/exporter/mockCodisStats.json b/tools/pika_exporter/exporter/mockCodisStats.json deleted file mode 100644 index 2dd3fbd646..0000000000 --- a/tools/pika_exporter/exporter/mockCodisStats.json +++ /dev/null @@ -1,194 +0,0 @@ -{ - "version": "2018-11-04 16:22:35 +0800 @de1ad026e329561c22e2a3035fbfe89dc7fef764 @3.2.2-12-gde1ad026", - "compile": "2023-02-23 11:25:09 +0800 by go version go1.19.6 linux/amd64", - "config": { - "coordinator_name": "zookeeper", - "coordinator_addr": "1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181", - "coordinator_auth": "", - "admin_addr": "0.0.0.0:1234", - "product_name": "test", - "migration_method": "semi-async", - "migration_parallel_slots": 100, - "migration_async_maxbulks": 200, - "migration_async_maxbytes": "32mb", - "migration_async_numkeys": 500, - "migration_timeout": "30s", - "sentinel_client_timeout": "10s", - "sentinel_quorum": 2, - "sentinel_parallel_syncs": 1, - "sentinel_down_after": "30s", - "sentinel_failover_timeout": "5m", - "sentinel_notification_script": "", - "sentinel_client_reconfig_script": "" - }, - "model": { - "token": "sdf123sdf112xx4", - "start_time": "2021-06-28 11:32:49.497871229 +0800 CST", - "admin_addr": "1.1.1.4:12345", - "product_name": "test", - "pid": 1740, - "pwd": "/data/codis", - "sys": "Linux localhost.localdomain 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux" - }, - "stats": { - "closed": false, - "slots": [ - { - "id": 0, - "group_id": 1, - "action": {} - }, - { - "id": 1, - "group_id": 1, - "action": {} - } - ], - "group": { - "models": [ - { - "id": 1, - "servers": [ - { - "server": "127.0.0.1:9221", - "datacenter": "", - "action": {}, - "role": "master", - "reply_offset": 0, - "state": 2, - "recall_times": 33, - "replica_group": false - } - ], - "promoting": {}, - "out_of_sync": false - } - ], - "stats": { - "1.1.1.6:1100": { - "stats": { - "active_defrag_hits": "0" - }, - "unixtime": 1693281655 - }, - "1.1.1.7:1100": { - "stats": { - "active_defrag_hits": "0" - }, - "unixtime": 1693281655 - }, - "unixtime": 1693281655 - } - }, - "proxy": { - "models": [ - { - "id": 1, - "token": "722fc570089c135c34f668996c12aa40", - "start_time": "2023-12-12 15:10:18.645485 +0800 CST m=+0.036954501", - "admin_addr": "1.2.3.4:1234", - "proto_type": "tcp4", - "proxy_addr": "1.2.3.4:4321", - "product_name": "codis-demo", - "pid": 60611, - "pwd": "/Users/admin", - "sys": "Darwin MacBook-Pro.local 22.6.0 Darwin Kernel Version 22.6.0; root:xnu-8796.141.3~6/RELEASE_ARM64_T8112 arm64", - "max_slot_num": 1024, - "hostname": "MacBook-Pro.local", - "datacenter": "" - } - ], - "stats": { - "4337153c64e1babd3b5c4df6e363d281": { - "stats": { - "online": true, - "closed": false, - "sentinels": {}, - "ops": { - "total": 791976, - "fails": 6, - "redis": { - "errors": 1 - }, - "qps": 11 - }, - "sessions": { - "total": 528784, - "alive": 34 - }, - "rusage": { - "now": "2023-08-29 12:00:53.605229492 +0800 CST", - "cpu": 0.009997645624438964, - "mem": 162881512312336, - "raw": { - "utime": 4474490510000000, - "stime": 4125310130000000, - "cutime": 0, - "cstime": 0, - "num_threads": 358, - "vm_size": 29351325696, - "vm_rss": 162881536 - } - }, - "backend": { - "primary_only": false - } - }, - "unixtime": 1693281654 - }, - "ef87e1ad5422ab8e2816b6694750e856": { - "stats": { - "online": true, - "closed": false, - "sentinels": {}, - "ops": { - "total": 27570566, - "fails": 31, - "redis": { - "errors": 2 - }, - "qps": 8 - }, - "sessions": { - "total": 6349, - "alive": 4 - }, - "rusage": { - "now": "2023-08-29 12:00:54.516379647 +0800 CST", - "cpu": 0, - "mem": 19081124412160, - "raw": { - "utime": 1666690710000000, - "stime": 2573457670000000, - "cutime": 0, - "cstime": 0, - "num_threads": 51, - "vm_size": 8284008448, - "vm_rss": 190812160 - } - }, - "backend": { - "primary_only": false - } - }, - "unixtime": 1693281654 - } - } - }, - "slot_action": { - "interval": 0, - "disabled": false, - "progress": { - "status": "" - }, - "executor": 0 - }, - "sentinels": { - "model": { - "out_of_sync": false - }, - "stats": {}, - "masters": {} - } - } -} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/mockProxyStats.json b/tools/pika_exporter/exporter/mockProxyStats.json new file mode 100644 index 0000000000..79cd43d1a5 --- /dev/null +++ b/tools/pika_exporter/exporter/mockProxyStats.json @@ -0,0 +1,88 @@ +{ + "online": true, + "closed": false, + "sentinels": {}, + "ops": { + "total": 6, + "fails": 0, + "redis": { + "errors": 0 + }, + "qps": 0, + "cmd": [ + { + "opstr": "GET", + "calls": 1, + "usecs": 1540, + "usecs_percall": 1540, + "fails": 0, + "redis_errtype": 0, + "max_delay": 0 + }, + { + "opstr": "PCONFIG", + "calls": 4, + "usecs": 435, + "usecs_percall": 108, + "fails": 0, + "redis_errtype": 0, + "max_delay": 0 + }, + { + "opstr": "SET", + "calls": 1, + "usecs": 4706, + "usecs_percall": 4706, + "fails": 0, + "redis_errtype": 0, + "max_delay": 0 + } + ] + }, + "sessions": { + "total": 1, + "alive": 1 + }, + "rusage": { + "now": "2023-12-19 13:34:12.039062 +0800 CST m=+743.815706668", + "cpu": 0.001923812544336962, + "mem": 0, + "raw": { + "utime": 936053000, + "stime": 559258000, + "max_rss": 126533632, + "ix_rss": 0, + "id_rss": 0, + "is_rss": 0 + } + }, + "backend": { + "primary_only": false + }, + "runtime": { + "general": { + "alloc": 288695704, + "sys": 389611864, + "lookups": 0, + "mallocs": 180635, + "frees": 174876 + }, + "heap": { + "alloc": 288695704, + "sys": 376864768, + "idle": 86532096, + "inuse": 290332672, + "objects": 5759 + }, + "gc": { + "num": 7, + "cpu_fraction": 0.000007142915997134459, + "total_pausems": 2 + }, + "num_procs": 4, + "num_goroutines": 21, + "num_cgo_call": 2979, + "mem_offheap": 458752 + }, + "slow_cmd_count": 0 +} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/parser_test.go b/tools/pika_exporter/exporter/parser_test.go index c7e425ae2b..77a4e016b5 100644 --- a/tools/pika_exporter/exporter/parser_test.go +++ b/tools/pika_exporter/exporter/parser_test.go @@ -97,7 +97,7 @@ func Benchmark_Parse(b *testing.B) { } func Test_Parse_Proxy_Stats(t *testing.T) { - jsonFile := "mockCodisStats.json" + jsonFile := "mockProxyStats.json" jsonData, err := ioutil.ReadFile(jsonFile) if err != nil { t.Fatalf("failed to read test data: %v", err) diff --git a/tools/pika_exporter/exporter/proxy.go b/tools/pika_exporter/exporter/proxy.go index 13c43fb9c4..56355e54e9 100644 --- a/tools/pika_exporter/exporter/proxy.go +++ b/tools/pika_exporter/exporter/proxy.go @@ -2,14 +2,16 @@ package exporter import ( "encoding/json" - "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" - "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" "net/http" "strconv" "sync" "time" + + log "github.com/sirupsen/logrus" + + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/discovery" + "github.com/OpenAtomFoundation/pika/tools/pika_exporter/exporter/metrics" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -120,14 +122,13 @@ func (p *exporterProxy) Describe(ch chan<- *prometheus.Desc) { func (p *exporterProxy) Collect(ch chan<- prometheus.Metric) { p.mutex.Lock() - defer p.mutex.Unlock() - startTime := time.Now() defer func() { p.collectCount.Inc() p.collectDuration.Observe(time.Since(startTime).Seconds()) ch <- p.collectCount ch <- p.collectDuration + p.mutex.Unlock() }() p.scrape(ch)