Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Exporter for Proxy #2199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codis/cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ Options:
}
defer s.Close()

proxy.RefreshPeriod.Set(config.MaxDelayRefreshTimeInterval.Int64())

log.Warnf("create proxy with config\n%s", config)

if s, ok := utils.Argument(d, "--pidfile"); ok {
Expand Down
2 changes: 2 additions & 0 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""

# Maximum delay statistical time interval.(This value must be greater than 0.)
max_delay_refresh_time_interval = "15s"
9 changes: 9 additions & 0 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ metrics_report_influxdb_database = ""
metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""
# Maximum delay statistical time interval.(This value must be greater than 0.)
max_delay_refresh_time_interval = "15s"
`

type Config struct {
Expand Down Expand Up @@ -192,6 +195,8 @@ type Config struct {
MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"`
MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"`

MaxDelayRefreshTimeInterval timesize.Duration `toml:"max_delay_refresh_time_interval" json:"max_delay_refresh_time_interval"`

ConfigFileName string `toml:"-" json:"config_file_name"`
}

Expand Down Expand Up @@ -323,5 +328,9 @@ func (c *Config) Validate() error {
return errors.New("invalid metrics_report_statsd_period")
}

if c.MaxDelayRefreshTimeInterval <= 0 {
return errors.New("max_delay_refresh_time_interval must be greater than 0")
}

return nil
}
22 changes: 21 additions & 1 deletion codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp {
redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")),
redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)),
})
case "max_delay_refresh_time_interval":
if text, err := p.config.MaxDelayRefreshTimeInterval.MarshalText(); err != nil {
return redis.NewErrorf("cant get max_delay_refresh_time_interval value.")
} else {
return redis.NewBulkBytes(text)
}
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down Expand Up @@ -342,6 +348,18 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp {
}
p.config.SlowlogLogSlowerThan = n
return redis.NewString([]byte("OK"))
case "max_delay_refresh_time_interval":
s := &(p.config.MaxDelayRefreshTimeInterval)
err := s.UnmarshalText([]byte(value))
if err != nil {
return redis.NewErrorf("err:%s.", err)
}
if d := p.config.MaxDelayRefreshTimeInterval.Duration(); d <= 0 {
return redis.NewErrorf("max_delay_refresh_time_interval must be greater than 0")
} else {
RefreshPeriod.Set(int64(d))
return redis.NewString([]byte("OK"))
}
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down Expand Up @@ -558,7 +576,8 @@ type Stats struct {
PrimaryOnly bool `json:"primary_only"`
} `json:"backend"`

Runtime *RuntimeStats `json:"runtime,omitempty"`
Runtime *RuntimeStats `json:"runtime,omitempty"`
SlowCmdCount int64 `json:"slow_cmd_count"` // Cumulative count of slow log
}

type RuntimeStats struct {
Expand Down Expand Up @@ -667,5 +686,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Runtime.NumCgoCall = runtime.NumCgoCall()
stats.Runtime.MemOffheap = unsafe2.OffheapBytes()
}
stats.SlowCmdCount = SlowCmdCount.Int64()
return stats
}
13 changes: 11 additions & 2 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
} else {
s.incrOpStats(r, resp.Type)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= s.config.SlowlogLogSlowerThan {
SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
Expand Down Expand Up @@ -758,3 +760,10 @@ func (s *Session) handlePConfig(r *Request) error {
}
return nil
}

func (s *Session) updateMaxDelay(duration int64, r *Request) {
e := s.getOpStats(r.OpStr) // There is no race condition in the session
if duration > e.maxDelay.Int64() {
e.maxDelay.Set(duration)
}
}
43 changes: 39 additions & 4 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var (
SlowCmdCount atomic2.Int64 // Cumulative count of slow log
RefreshPeriod atomic2.Int64
)

type opStats struct {
opstr string
calls atomic2.Int64
Expand All @@ -22,14 +27,16 @@ type opStats struct {
redis struct {
errors atomic2.Int64
}
maxDelay atomic2.Int64
}

func (s *opStats) OpStats() *OpStats {
o := &OpStats{
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
MaxDelay: s.maxDelay.Int64(),
}
if o.Calls != 0 {
o.UsecsPercall = o.Usecs / o.Calls
Expand All @@ -45,6 +52,7 @@ type OpStats struct {
UsecsPercall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
RedisErrType int64 `json:"redis_errtype"`
MaxDelay int64 `json:"max_delay"`
}

var cmdstats struct {
Expand All @@ -62,6 +70,7 @@ var cmdstats struct {

func init() {
cmdstats.opmap = make(map[string]*opStats, 128)
SlowCmdCount.Set(0)
go func() {
for {
start := time.Now()
Expand All @@ -72,6 +81,16 @@ func init() {
cmdstats.qps.Set(int64(normalized + 0.5))
}
}()

// Clear the accumulated maximum delay to 0
go func() {
for {
time.Sleep(time.Duration(RefreshPeriod.Int64()))
for _, s := range cmdstats.opmap {
s.maxDelay.Set(0)
}
}
}()
}

func OpTotal() int64 {
Expand Down Expand Up @@ -165,6 +184,22 @@ func incrOpStats(e *opStats) {
s.redis.errors.Add(n)
cmdstats.redis.errors.Add(n)
}

/**
Each session refreshes its own saved metrics, and there is a race condition at this time.
Use the CAS method to update.
*/
for {
oldValue := s.maxDelay
if e.maxDelay > oldValue {
if s.maxDelay.CompareAndSwap(oldValue.Int64(), e.maxDelay.Int64()) {
e.maxDelay.Set(0)
break
}
} else {
break
}
}
}

var sessions struct {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class PikaServer : public pstd::noncopyable {
uint32_t SlowlogLen();
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int64_t duration);
uint64_t SlowlogCount();

/*
* Statistic used
Expand Down Expand Up @@ -680,6 +681,7 @@ class PikaServer : public pstd::noncopyable {
* Slowlog used
*/
uint64_t slowlog_entry_id_ = 0;
uint64_t slowlog_counter_ = 0;
std::shared_mutex slowlog_protector_;
std::list<SlowlogEntry> slowlog_list_;

Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "is_slots_migrating:" << (is_migrating ? "Yes, " : "No, ") << start_migration_time_str << ", "
<< (is_migrating ? (current_time_s - start_migration_time) : (end_migration_time - start_migration_time))
<< "\r\n";

tmp_stream << "slow_logs_count:" << g_pika_server->SlowlogCount() << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
6 changes: 6 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1198,11 +1198,17 @@ void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int
entry.start_time = time;
entry.duration = duration;
slowlog_list_.push_front(entry);
slowlog_counter_++;
}

SlowlogTrim();
}

uint64_t PikaServer::SlowlogCount() {
std::shared_lock l(slowlog_protector_);
return slowlog_counter_;
}

void PikaServer::ResetStat() {
statistic_.server_stat.accumulative_connections.store(0);
statistic_.server_stat.qps.querynum.store(0);
Expand Down
77 changes: 77 additions & 0 deletions tools/pika_exporter/discovery/codis_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,91 @@ type CodisModelInfo struct {
Servers []CodisServerInfo `json:"servers"`
}

type CodisProxyModelInfo struct {
Id int `json:"id"`
AdminAddr string `json:"admin_addr"`
ProductName string `json:"product_name"`
DataCenter string `json:"data_center"`
}

type CodisGroupInfo struct {
Models []CodisModelInfo `json:"models"`
}

type CodisProxyInfo struct {
Models []CodisProxyModelInfo `json:"models"`
}

type CodisStatsInfo struct {
Group CodisGroupInfo `json:"group"`
Proxy CodisProxyInfo `json:"proxy"`
}

type CodisTopomInfo struct {
Stats CodisStatsInfo `json:"stats"`
}

type RedisInfo struct {
Errors int `json:"errors"`
}

type CmdInfo struct {
Opstr string `json:"opstr"`
Calls int64 `json:"calls"`
Usecs_percall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
MaxDelay int64 `json:"max_delay"`
}

type ProxyOpsInfo struct {
Total int `json:"total"`
Fails int `json:"fails"`
Redis RedisInfo `json:"redis"`
Qps int `json:"qps"`
Cmd []CmdInfo `json:"cmd"`
}

type RowInfo struct {
Utime int64 `json:"utime"`
Stime int64 `json:"stime"`
MaxRss int64 `json:"max_rss"`
IxRss int64 `json:"ix_rss"`
IdRss int64 `json:"id_rss"`
IsRss int64 `json:"is_rss"`
}

type RusageInfo struct {
Now string `json:"now"`
Cpu float64 `json:"cpu"`
Mem float64 `json:"mem"`
Raw RowInfo `json:"raw"`
}

type GeneralInfo struct {
Alloc int64 `json:"alloc"`
Sys int64 `json:"sys"`
Lookups int64 `json:"lookups"`
Mallocs int64 `json:"mallocs"`
Frees int64 `json:"frees"`
}

type HeapInfo struct {
Alloc int64 `json:"alloc"`
Sys int64 `json:"sys"`
Idle int64 `json:"idle"`
Inuse int64 `json:"inuse"`
Objects int64 `json:"objects"`
}

type RunTimeInfo struct {
General GeneralInfo `json:"general"`
Heap HeapInfo `json:"heap"`
}

type ProxyStats struct {
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
SlowCmdCount int64 `json:"slow_cmd_count"`
}
Loading
Loading