Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Exporter for Proxy #2199

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ type Stats struct {
PrimaryOnly bool `json:"primary_only"`
} `json:"backend"`

Runtime *RuntimeStats `json:"runtime,omitempty"`
Runtime *RuntimeStats `json:"runtime,omitempty"`
SlowCmdCount int64 `json:"slow_cmd_count"` // Cumulative count of slow log
}

type RuntimeStats struct {
Expand Down Expand Up @@ -667,5 +668,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Runtime.NumCgoCall = runtime.NumCgoCall()
stats.Runtime.MemOffheap = unsafe2.OffheapBytes()
}
stats.SlowCmdCount = SlowCmdCount.Int64()
return stats
}
13 changes: 11 additions & 2 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
} else {
s.incrOpStats(r, resp.Type)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= s.config.SlowlogLogSlowerThan {
SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
Expand Down Expand Up @@ -758,3 +760,10 @@ func (s *Session) handlePConfig(r *Request) error {
}
return nil
}

func (s *Session) updateMaxDelay(duration int64, r *Request) {
e := s.getOpStats(r.OpStr) // There is no race condition in the session
if duration > e.maxDelay.Int64() {
e.maxDelay.Set(duration)
}
}
40 changes: 36 additions & 4 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var SlowCmdCount atomic2.Int64 // Cumulative count of slow log

type opStats struct {
opstr string
calls atomic2.Int64
Expand All @@ -22,14 +24,16 @@ type opStats struct {
redis struct {
errors atomic2.Int64
}
maxDelay atomic2.Int64
}

func (s *opStats) OpStats() *OpStats {
o := &OpStats{
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
MaxDelay: s.maxDelay.Int64(),
}
if o.Calls != 0 {
o.UsecsPercall = o.Usecs / o.Calls
Expand All @@ -45,6 +49,7 @@ type OpStats struct {
UsecsPercall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
RedisErrType int64 `json:"redis_errtype"`
MaxDelay int64 `json:"max_delay"`
}

var cmdstats struct {
Expand All @@ -62,6 +67,7 @@ var cmdstats struct {

func init() {
cmdstats.opmap = make(map[string]*opStats, 128)
SlowCmdCount.Set(0)
go func() {
for {
start := time.Now()
Expand All @@ -72,6 +78,16 @@ func init() {
cmdstats.qps.Set(int64(normalized + 0.5))
}
}()

// Clear the accumulated maximum delay to 0 every 15 seconds.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边辛苦小帅调整下,改成动态可配置重置时间,会更好些。避免信息缺失

go func() {
for {
time.Sleep(15 * time.Second)
for _, s := range cmdstats.opmap {
s.maxDelay.Set(0)
}
}
}()
}

func OpTotal() int64 {
Expand Down Expand Up @@ -165,6 +181,22 @@ func incrOpStats(e *opStats) {
s.redis.errors.Add(n)
cmdstats.redis.errors.Add(n)
}

/**
Each session refreshes its own saved metrics, and there is a race condition at this time.
Use the CAS method to update.
*/
for {
oldValue := s.maxDelay
if e.maxDelay > oldValue {
if s.maxDelay.CompareAndSwap(oldValue.Int64(), e.maxDelay.Int64()) {
e.maxDelay.Set(0)
break
}
} else {
break
}
}
}

var sessions struct {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class PikaServer : public pstd::noncopyable {
uint32_t SlowlogLen();
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int64_t duration);
uint64_t SlowlogCount();

/*
* Statistic used
Expand Down Expand Up @@ -680,6 +681,7 @@ class PikaServer : public pstd::noncopyable {
* Slowlog used
*/
uint64_t slowlog_entry_id_ = 0;
uint64_t slowlog_counter_ = 0;
std::shared_mutex slowlog_protector_;
std::list<SlowlogEntry> slowlog_list_;

Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "is_slots_migrating:" << (is_migrating ? "Yes, " : "No, ") << start_migration_time_str << ", "
<< (is_migrating ? (current_time_s - start_migration_time) : (end_migration_time - start_migration_time))
<< "\r\n";

tmp_stream << "slow_logs_count:" << g_pika_server->SlowlogCount() << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
6 changes: 6 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1198,11 +1198,17 @@ void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int
entry.start_time = time;
entry.duration = duration;
slowlog_list_.push_front(entry);
slowlog_counter_++;
}

SlowlogTrim();
}

uint64_t PikaServer::SlowlogCount() {
std::shared_lock l(slowlog_protector_);
return slowlog_counter_;
}

void PikaServer::ResetStat() {
statistic_.server_stat.accumulative_connections.store(0);
statistic_.server_stat.qps.querynum.store(0);
Expand Down
18 changes: 14 additions & 4 deletions tools/pika_exporter/discovery/codis_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ type RedisInfo struct {
Errors int `json:"errors"`
}

type CmdInfo struct {
Opstr string `json:"opstr"`
Calls int64 `json:"calls"`
Usecs_percall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
MaxDelay int64 `json:"max_delay"`
}

type ProxyOpsInfo struct {
Total int `json:"total"`
Fails int `json:"fails"`
Redis RedisInfo `json:"redis"`
Qps int `json:"qps"`
Cmd []CmdInfo `json:"cmd"`
}

type RowInfo struct {
Expand Down Expand Up @@ -89,8 +98,9 @@ type RunTimeInfo struct {
}

type ProxyStats struct {
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
TimeoutCmdNumber int64 `json:"timeout_cmd_number"`
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
SlowCmdCount int64 `json:"slow_cmd_count"`
}
1 change: 1 addition & 0 deletions tools/pika_exporter/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) {
func (d *codisDiscovery) GetInstances() []Instance {
return d.instances
}

func (d *codisDiscovery) GetInstancesProxy() []InstanceProxy {
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
return d.instanceProxy
}
Expand Down
14 changes: 14 additions & 0 deletions tools/pika_exporter/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ func TestNewCodisDiscovery(t *testing.T) {
"password1",
"password2",
}
expectedAddrsForProxy := []string{
"1.2.3.4:1234",
"1.2.3.4:4321",
}

if len(discovery.instances) != len(expectedAddrs) {
t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances))
}

if len(discovery.instanceProxy) != len(expectedAddrsForProxy) {
t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances))
}

for i := range expectedAddrs {
if discovery.instances[i].Addr != expectedAddrs[i] {
t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr)
Expand All @@ -57,4 +65,10 @@ func TestNewCodisDiscovery(t *testing.T) {
t.Errorf("instance %d password: expected %s but got %s", i, expectedPasswords[i], discovery.instances[i].Password)
}
}

for i := range expectedAddrsForProxy {
if expectedAddrsForProxy[i] != discovery.instanceProxy[i].Addr {
t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr)
}
}
}
2 changes: 2 additions & 0 deletions tools/pika_exporter/discovery/mockCodisTopom.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@
{
"id": 2,
"token": "4337153123141c4df6e363d281",
"admin_addr": "1.2.3.4:1234",
"start_time": "2021-07-06 10:30:33.588330243 +0800 CST",
"datacenter": ""
},
{
"id": 3,
"token": "ef7e1ad5422ab8e28241410e856",
"admin_addr": "1.2.3.4:4321",
"start_time": "2021-07-06 10:34:21.305110128 +0800 CST",
"datacenter": ""
}
Expand Down
2 changes: 1 addition & 1 deletion tools/pika_exporter/exporter/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (f *future) Wait() map[futureKey]error {
}

type futureKeyForProxy struct {
addr, instance, ID, productName string
addr, ID, productName string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一行只写一个变量

}

type futureForProxy struct {
Expand Down
4 changes: 2 additions & 2 deletions tools/pika_exporter/exporter/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ const (
LabelNameAlias = "alias"
LabelInstanceMode = "instance-mode"
LabelConsensusLevel = "consensus-level"
LabelInstance = "instance"
LabelID = "id"
LabelID = "proxy_id"
LabelProductName = "product_name"
LabelOpstr = "opstr"
)

type Describer interface {
Expand Down
Loading
Loading