Skip to content

Commit

Permalink
Improve metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Dec 18, 2023
1 parent 9e75379 commit e13eff8
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 37 deletions.
6 changes: 3 additions & 3 deletions codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ type Stats struct {
PrimaryOnly bool `json:"primary_only"`
} `json:"backend"`

Runtime *RuntimeStats `json:"runtime,omitempty"`
TimeoutCmdNumber int64 `json:"timeout_cmd_number"`
Runtime *RuntimeStats `json:"runtime,omitempty"`
SlowCmdCount int64 `json:"slow_cmd_count"` // Cumulative count of slow log
}

type RuntimeStats struct {
Expand Down Expand Up @@ -668,6 +668,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Runtime.NumCgoCall = runtime.NumCgoCall()
stats.Runtime.MemOffheap = unsafe2.OffheapBytes()
}
stats.TimeoutCmdNumber = TimeoutCmdNumberInSecond.Int64()
stats.SlowCmdCount = SlowCmdCount.Int64()
return stats
}
14 changes: 11 additions & 3 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,14 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
} else {
s.incrOpStats(r, resp.Type)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= s.config.SlowlogLogSlowerThan {
TimeoutCmdNumber.Incr()
SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
Expand Down Expand Up @@ -759,3 +760,10 @@ func (s *Session) handlePConfig(r *Request) error {
}
return nil
}

func (s *Session) updateMaxDelay(duration int64, r *Request) {
e := s.getOpStats(r.OpStr) // There is no race condition in the session
if duration > e.maxDelay.Int64() {
e.maxDelay.Set(duration)
}
}
46 changes: 34 additions & 12 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var (
TimeoutCmdNumber atomic2.Int64
TimeoutCmdNumberInSecond atomic2.Int64
)
var SlowCmdCount atomic2.Int64 // Cumulative count of slow log

type opStats struct {
opstr string
Expand All @@ -27,14 +24,16 @@ type opStats struct {
redis struct {
errors atomic2.Int64
}
maxDelay atomic2.Int64
}

func (s *opStats) OpStats() *OpStats {
o := &OpStats{
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
MaxDelay: s.maxDelay.Int64(),
}
if o.Calls != 0 {
o.UsecsPercall = o.Usecs / o.Calls
Expand All @@ -50,6 +49,7 @@ type OpStats struct {
UsecsPercall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
RedisErrType int64 `json:"redis_errtype"`
MaxDelay int64 `json:"max_delay"`
}

var cmdstats struct {
Expand All @@ -67,8 +67,7 @@ var cmdstats struct {

func init() {
cmdstats.opmap = make(map[string]*opStats, 128)
TimeoutCmdNumber.Set(0)
TimeoutCmdNumberInSecond.Set(0)
SlowCmdCount.Set(0)
go func() {
for {
start := time.Now()
Expand All @@ -77,9 +76,16 @@ func init() {
delta := cmdstats.total.Int64() - total
normalized := math.Max(0, float64(delta)) * float64(time.Second) / float64(time.Since(start))
cmdstats.qps.Set(int64(normalized + 0.5))
}
}()

TimeoutCmdNumberInSecond.Swap(TimeoutCmdNumber.Int64())
TimeoutCmdNumber.Set(0)
// Clear the accumulated maximum delay to 0 every 15 seconds.
go func() {
for {
time.Sleep(15 * time.Second)
for _, s := range cmdstats.opmap {
s.maxDelay.Set(0)
}
}
}()
}
Expand Down Expand Up @@ -175,6 +181,22 @@ func incrOpStats(e *opStats) {
s.redis.errors.Add(n)
cmdstats.redis.errors.Add(n)
}

/**
Each session refreshes its own saved metrics, and there is a race condition at this time.
Use the CAS method to update.
*/
for {
oldValue := s.maxDelay
if e.maxDelay > oldValue {
if s.maxDelay.CompareAndSwap(oldValue.Int64(), e.maxDelay.Int64()) {
e.maxDelay.Set(0)
break
}
} else {
break
}
}
}

var sessions struct {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class PikaServer : public pstd::noncopyable {
uint32_t SlowlogLen();
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int64_t duration);
uint64_t SlowlogCount();

/*
* Statistic used
Expand Down Expand Up @@ -680,6 +681,7 @@ class PikaServer : public pstd::noncopyable {
* Slowlog used
*/
uint64_t slowlog_entry_id_ = 0;
uint64_t slowlog_counter_ = 0;
std::shared_mutex slowlog_protector_;
std::list<SlowlogEntry> slowlog_list_;

Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "is_slots_migrating:" << (is_migrating ? "Yes, " : "No, ") << start_migration_time_str << ", "
<< (is_migrating ? (current_time_s - start_migration_time) : (end_migration_time - start_migration_time))
<< "\r\n";

tmp_stream << "slow_logs_count:" << g_pika_server->SlowlogCount() << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
6 changes: 6 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1198,11 +1198,17 @@ void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int
entry.start_time = time;
entry.duration = duration;
slowlog_list_.push_front(entry);
slowlog_counter_++;
}

SlowlogTrim();
}

uint64_t PikaServer::SlowlogCount() {
std::shared_lock l(slowlog_protector_);
return slowlog_counter_;
}

void PikaServer::ResetStat() {
statistic_.server_stat.accumulative_connections.store(0);
statistic_.server_stat.qps.querynum.store(0);
Expand Down
11 changes: 6 additions & 5 deletions tools/pika_exporter/discovery/codis_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type CmdInfo struct {
Calls int64 `json:"calls"`
Usecs_percall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
MaxDelay int64 `json:"max_delay"`
}

type ProxyOpsInfo struct {
Expand Down Expand Up @@ -97,9 +98,9 @@ type RunTimeInfo struct {
}

type ProxyStats struct {
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
TimeoutCmdNumber int64 `json:"timeout_cmd_number"`
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
SlowCmdCount int64 `json:"slow_cmd_count"`
}
2 changes: 2 additions & 0 deletions tools/pika_exporter/exporter/metrics/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ func (p *proxyParser) Parse(m MetricMeta, c Collector, opt ParseOption) {
metric.Value = convertToFloat64(strconv.FormatInt(v[1], 10))
case "fails":
metric.Value = convertToFloat64(strconv.FormatInt(v[2], 10))
case "max_delay":
metric.Value = convertToFloat64(strconv.FormatInt(v[3], 10))
}

if err := c.Collect(metric); err != nil {
Expand Down
36 changes: 23 additions & 13 deletions tools/pika_exporter/exporter/metrics/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ func RegisterForProxy() {
}

var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{
"ops_total": {
"total_ops": {
Parser: &normalParser{},
MetricMeta: &MetaData{
Name: "ops_total",
Name: "total_ops",
Help: "proxy total ops",
Type: metricTypeCounter,
Labels: []string{LabelNameAddr, LabelID, LabelProductName},
ValueName: "ops_total",
},
},
"ops_fails": {
"total_ops_fails": {
Parser: &normalParser{},
MetricMeta: &MetaData{
Name: "ops_fails",
Name: "total_ops_fails",
Help: "proxy fails counter",
Type: metricTypeCounter,
Labels: []string{LabelNameAddr, LabelID, LabelProductName},
Expand Down Expand Up @@ -66,23 +66,23 @@ var collectProxyMetrics map[string]MetricConfig = map[string]MetricConfig{
ValueName: "online",
},
},
"timeout_cmd_number": {
"total_slow_cmd": {
Parser: &normalParser{},
MetricMeta: &MetaData{
Name: "timeout_cmd_number",
Help: "The number of commands recorded in the slow log within the last second",
Type: metricTypeGauge,
Name: "total_slow_cmd",
Help: "The number of commands recorded in the slow log",
Type: metricTypeCounter,
Labels: []string{LabelNameAddr, LabelID, LabelProductName},
ValueName: "timeout_cmd_number",
ValueName: "slow_cmd_count",
},
},
}

var collectPorxyCmdMetrics map[string]MetricConfig = map[string]MetricConfig{
"calls": {
"total_calls": {
Parser: &proxyParser{},
MetricMeta: &MetaData{
Name: "calls",
Name: "total_calls",
Help: "the number of cmd calls",
Type: metricTypeCounter,
Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr},
Expand All @@ -99,14 +99,24 @@ var collectPorxyCmdMetrics map[string]MetricConfig = map[string]MetricConfig{
ValueName: "usecs_percall",
},
},
"fails": {
"total_fails": {
Parser: &proxyParser{},
MetricMeta: &MetaData{
Name: "fails",
Name: "total_fails",
Help: "the number of cmd fail",
Type: metricTypeCounter,
Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr},
ValueName: "fails",
},
},
"max_delay": {
Parser: &proxyParser{},
MetricMeta: &MetaData{
Name: "max_delay",
Help: "The maximum time consumed by this command since the last collection.",
Type: metricTypeGauge,
Labels: []string{LabelNameAddr, LabelID, LabelProductName, LabelOpstr},
ValueName: "max_delay",
},
},
}
10 changes: 10 additions & 0 deletions tools/pika_exporter/exporter/metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,14 @@ var collectStatsMetrics = map[string]MetricConfig{
Labels: []string{LabelNameAddr, LabelNameAlias, "is_compact", "compact_cron", "compact_interval"},
},
},
"total_slow_log": {
Parser: &normalParser{},
MetricMeta: &MetaData{
Name: "total_slow_log",
Help: "pika serve instance total count of slow log",
Type: metricTypeCounter,
Labels: []string{LabelNameAddr, LabelNameAlias},
ValueName: "slow_logs_count",
},
},
}

0 comments on commit e13eff8

Please sign in to comment.