Skip to content

Commit

Permalink
Dynamically adjust refresh time.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Dec 21, 2023
1 parent e13eff8 commit f7c73b8
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 218 deletions.
2 changes: 2 additions & 0 deletions codis/cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ Options:
}
defer s.Close()

proxy.RefreshPeriod.Set(config.MaxDelayRefreshTimeInterval.Int64())

log.Warnf("create proxy with config\n%s", config)

if s, ok := utils.Argument(d, "--pidfile"); ok {
Expand Down
2 changes: 2 additions & 0 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""

# Maximum delay statistical time interval.(This value must be greater than 0.)
max_delay_refresh_time_interval = "15s"
9 changes: 9 additions & 0 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ metrics_report_influxdb_database = ""
metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""
# Maximum delay statistical time interval.(This value must be greater than 0.)
max_delay_refresh_time_interval = "15s"
`

type Config struct {
Expand Down Expand Up @@ -192,6 +195,8 @@ type Config struct {
MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"`
MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"`

MaxDelayRefreshTimeInterval timesize.Duration `toml:"max_delay_refresh_time_interval" json:"max_delay_refresh_time_interval"`

ConfigFileName string `toml:"-" json:"config_file_name"`
}

Expand Down Expand Up @@ -323,5 +328,9 @@ func (c *Config) Validate() error {
return errors.New("invalid metrics_report_statsd_period")
}

if c.MaxDelayRefreshTimeInterval <= 0 {
return errors.New("max_delay_refresh_time_interval must be greater than 0")
}

return nil
}
18 changes: 18 additions & 0 deletions codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp {
redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")),
redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)),
})
case "max_delay_refresh_time_interval":
if text, err := p.config.MaxDelayRefreshTimeInterval.MarshalText(); err != nil {
return redis.NewErrorf("cant get max_delay_refresh_time_interval value.")
} else {
return redis.NewBulkBytes(text)
}
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down Expand Up @@ -342,6 +348,18 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp {
}
p.config.SlowlogLogSlowerThan = n
return redis.NewString([]byte("OK"))
case "max_delay_refresh_time_interval":
s := &(p.config.MaxDelayRefreshTimeInterval)
err := s.UnmarshalText([]byte(value))
if err != nil {
return redis.NewErrorf("err:%s.", err)
}
if d := p.config.MaxDelayRefreshTimeInterval.Duration(); d <= 0 {
return redis.NewErrorf("max_delay_refresh_time_interval must be greater than 0")
} else {
RefreshPeriod.Set(int64(d))
return redis.NewString([]byte("OK"))
}
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down
9 changes: 6 additions & 3 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var SlowCmdCount atomic2.Int64 // Cumulative count of slow log
var (
SlowCmdCount atomic2.Int64 // Cumulative count of slow log
RefreshPeriod atomic2.Int64
)

type opStats struct {
opstr string
Expand Down Expand Up @@ -79,10 +82,10 @@ func init() {
}
}()

// Clear the accumulated maximum delay to 0 every 15 seconds.
// Clear the accumulated maximum delay to 0
go func() {
for {
time.Sleep(15 * time.Second)
time.Sleep(time.Duration(RefreshPeriod.Int64()))
for _, s := range cmdstats.opmap {
s.maxDelay.Set(0)
}
Expand Down
7 changes: 5 additions & 2 deletions tools/pika_exporter/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) {
}

func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool {
var addrs, addrsProxy []string
var diff bool = false
var (
addrs []string
addrsProxy []string
diff bool
)
for _, instance := range new_instance.instances {
addrs = append(addrs, instance.Addr)
}
Expand Down
13 changes: 9 additions & 4 deletions tools/pika_exporter/exporter/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package exporter
import "sync"

type futureKey struct {
addr, alias string
addr string
alias string
}

type future struct {
Expand Down Expand Up @@ -38,7 +39,9 @@ func (f *future) Wait() map[futureKey]error {
}

type futureKeyForProxy struct {
addr, ID, productName string
addr string
ID string
productName string
}

type futureForProxy struct {
Expand All @@ -60,9 +63,11 @@ func (f *futureForProxy) Add() {

func (f *futureForProxy) Done(key futureKeyForProxy, val error) {
f.Lock()
defer f.Unlock()
defer func() {
f.Unlock()
f.wait.Done()
}()
f.m[key] = val
f.wait.Done()
}

func (f *futureForProxy) Wait() map[futureKeyForProxy]error {
Expand Down
12 changes: 4 additions & 8 deletions tools/pika_exporter/exporter/metrics/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,10 @@ func StructToMap(obj interface{}) (map[string]string, map[string][]int64, error)
for j := 0; j < field.Len(); j++ {
elemType := field.Index(j).Type()
elemValue := field.Index(j)
if elemType.Kind() == reflect.Struct {
var key string
for p := 0; p < elemValue.NumField(); p++ {
if p == 0 {
key = elemValue.Field(p).String()
} else {
cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int())
}
if elemType.Kind() == reflect.Struct && elemValue.NumField() > 0 {
var key string = elemValue.Field(0).String()
for p := 1; p < elemValue.NumField(); p++ {
cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int())
}
} else {
result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value)
Expand Down
194 changes: 0 additions & 194 deletions tools/pika_exporter/exporter/mockCodisStats.json

This file was deleted.

Loading

0 comments on commit f7c73b8

Please sign in to comment.