Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
QlQlqiqi committed Jun 7, 2024
2 parents a107816 + 5240e45 commit 0d1b00f
Show file tree
Hide file tree
Showing 46 changed files with 7,611 additions and 4,786 deletions.
5 changes: 3 additions & 2 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ type Config struct {

SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"`

QuickCmdList string `toml:"quick_cmd_list" json:"quick_cmd_list"`
SlowCmdList string `toml:"slow_cmd_list" json:"slow_cmd_list"`
QuickCmdList string `toml:"quick_cmd_list" json:"quick_cmd_list"`
SlowCmdList string `toml:"slow_cmd_list" json:"slow_cmd_list"`
AutoSetSlowFlag bool `toml:"auto_set_slow_flag" json:"auto_set_slow_flag"`

MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"`
MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
Expand Down
32 changes: 31 additions & 1 deletion codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ func (p *Proxy) serveProxy() {
log.PanicErrorf(err, "setSlowCmdList [%s] failed", p.config.SlowCmdList)
}

StatsSetLogSlowerThan(p.config.SlowlogLogSlowerThan)

select {
case <-p.exit.C:
log.Warnf("[%p] proxy shutdown", p)
Expand Down Expand Up @@ -603,6 +605,16 @@ type Overview struct {
Slots []*models.Slot `json:"slots,omitempty"`
}

type CmdInfo struct {
Total int64 `json:"total"`
Fails int64 `json:"fails"`
Redis struct {
Errors int64 `json:"errors"`
} `json:"redis"`
QPS int64 `json:"qps"`
Cmd []*OpStats `json:"cmd,omitempty"`
}

type Stats struct {
Online bool `json:"online"`
Closed bool `json:"closed"`
Expand Down Expand Up @@ -709,7 +721,7 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Ops.Fails = OpFails()
stats.Ops.Redis.Errors = OpRedisErrors()
stats.Ops.QPS = OpQPS()

stats.Ops.Cmd = GetOpStatsByInterval(1)
if flags.HasBit(StatsCmds) {
stats.Ops.Cmd = GetOpStatsAll()
}
Expand Down Expand Up @@ -752,3 +764,21 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.SlowCmdCount = SlowCmdCount.Int64()
return stats
}

func (s *Proxy) CmdInfo(interval int64) *CmdInfo {
info := &CmdInfo{
Total: OpTotal(),
Fails: OpFails(),
QPS: OpQPS(),
Cmd: GetOpStatsByInterval(interval),
}
info.Redis.Errors = OpRedisErrors()
return info
}

func StatsSetLogSlowerThan(ms int64) {
if ms < 0 {
return
}
cmdstats.logSlowerThan.Set(ms)
}
30 changes: 30 additions & 0 deletions codis/pkg/proxy/proxy_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ func newApiServer(p *Proxy) http.Handler {
r.Get("/model", api.Model)
r.Get("/stats", api.StatsNoXAuth)
r.Get("/slots", api.SlotsNoXAuth)
r.Get("/cmdinfo/:interval", api.CmdInfoNoXAuth)
})
r.Group("/api/proxy", func(r martini.Router) {
r.Get("/model", api.Model)
r.Get("/xping/:xauth", api.XPing)
r.Get("/stats/:xauth", api.Stats)
r.Get("/stats/:xauth/:flags", api.Stats)
r.Get("/cmdinfo/:xauth/:interval", api.CmdInfo)
r.Get("/slots/:xauth", api.Slots)
r.Put("/start/:xauth", api.Start)
r.Put("/stats/reset/:xauth", api.ResetStats)
Expand Down Expand Up @@ -115,6 +117,10 @@ func (s *apiServer) SlotsNoXAuth() (int, string) {
return rpc.ApiResponseJson(s.proxy.Slots())
}

func (s *apiServer) CmdInfoNoXAuth() (int, string) {
return rpc.ApiResponseJson(s.proxy.CmdInfo(2))
}

func (s *apiServer) XPing(params martini.Params) (int, string) {
if err := s.verifyXAuth(params); err != nil {
return rpc.ApiResponseError(err)
Expand All @@ -123,6 +129,21 @@ func (s *apiServer) XPing(params martini.Params) (int, string) {
}
}

func (s *apiServer) CmdInfo(params martini.Params) (int, string) {
if err := s.verifyXAuth(params); err != nil {
return rpc.ApiResponseError(err)
}
var interval int64
if i := params["interval"]; i != "" {
n, err := strconv.Atoi(i)
if err != nil {
return rpc.ApiResponseError(err)
}
interval = int64(n)
}
return rpc.ApiResponseJson(s.proxy.CmdInfo(interval))
}

func (s *apiServer) Stats(params martini.Params) (int, string) {
if err := s.verifyXAuth(params); err != nil {
return rpc.ApiResponseError(err)
Expand Down Expand Up @@ -271,6 +292,15 @@ func (c *ApiClient) Stats(flags StatsFlags) (*Stats, error) {
return stats, nil
}

func (c *ApiClient) CmdInfo(interval int64) (*CmdInfo, error) {
url := c.encodeURL("/api/proxy/cmdinfo/%s/%d", c.xauth, interval)
cmdInfo := &CmdInfo{}
if err := rpc.ApiGetJson(url, cmdInfo); err != nil {
return nil, err
}
return cmdInfo, nil
}

func (c *ApiClient) Slots() ([]*models.Slot, error) {
url := c.encodeURL("/api/proxy/slots/%s", c.xauth)
slots := []*models.Slot{}
Expand Down
128 changes: 95 additions & 33 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package proxy
import (
"encoding/json"
"fmt"
"math/rand"
"net"
"pika/codis/v2/pkg/utils"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -47,6 +49,8 @@ type Session struct {
config *Config
proxy *Proxy

rand *rand.Rand

authorized bool
}

Expand Down Expand Up @@ -78,6 +82,7 @@ func NewSession(sock net.Conn, config *Config, proxy *Proxy) *Session {
CreateUnix: time.Now().Unix(),
}
s.stats.opmap = make(map[string]*opStats, 16)
s.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
log.Infof("session [%p] create: %s", s, s)
return s
}
Expand Down Expand Up @@ -236,31 +241,35 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
} else {
s.incrOpStats(r, resp.Type)
}

nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
}
if duration >= s.config.SlowlogLogSlowerThan {
SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
//the waiting time from receiving the server response to sending it to the client
var d0, d1, d2 int64 = -1, -1, -1
if r.SendToServerTime > 0 {
d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
}
if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
}
if r.ReceiveFromServerTime > 0 {
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
if s.config.SlowlogLogSlowerThan >= 0 {
if duration >= s.config.SlowlogLogSlowerThan {
SlowCmdCount.Incr()
// Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
//the waiting time from receiving the server response to sending it to the client
var d0, d1, d2 int64 = -1, -1, -1
if r.SendToServerTime > 0 {
d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
}
if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
}
if r.ReceiveFromServerTime > 0 {
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
}
index := getWholeCmd(r.Multi, cmd)
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
}
index := getWholeCmd(r.Multi, cmd)
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
}
return nil
})
Expand Down Expand Up @@ -299,6 +308,8 @@ func (s *Session) handleRequest(r *Request, d *Router) error {
return s.handleQuit(r)
case "AUTH":
return s.handleAuth(r)
case "CODIS.INFO":
return s.handleCodisInfo(r)
}

if !s.authorized {
Expand Down Expand Up @@ -361,6 +372,22 @@ func (s *Session) handleAuth(r *Request) error {
return nil
}

func (s *Session) handleCodisInfo(r *Request) error {
if len(r.Multi) != 0 {
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'CODIS.INFO' command")
return nil
}

r.Resp = redis.NewArray([]*redis.Resp{
redis.NewString([]byte(utils.Version)),
redis.NewString([]byte(utils.Compile)),
redis.NewString([]byte(fmt.Sprintf("admin addr: %s", s.proxy.model.AdminAddr))),
redis.NewString([]byte(fmt.Sprintf("start time: %s", s.proxy.model.StartTime))),
})

return nil
}

func (s *Session) handleSelect(r *Request) error {
if len(r.Multi) != 2 {
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'SELECT' command")
Expand Down Expand Up @@ -662,32 +689,67 @@ func (s *Session) handleRequestSlotsMapping(r *Request, d *Router) error {
}
}

func (s *Session) incrOpTotal() {
s.stats.total.Incr()
}
func (s *Session) getOpStats(opstr string, create bool) *opStats {
var (
ok bool
stat *opStats
)

func (s *Session) getOpStats(opstr string) *opStats {
e := s.stats.opmap[opstr]
if e == nil {
e = &opStats{opstr: opstr}
s.stats.opmap[opstr] = e
func() {
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.RUnlock()
stat, ok = s.stats.opmap[opstr]
}()
if (ok && stat != nil) || !create {
return stat
}
cmdstats.opmapLock.Lock()
defer cmdstats.opmapLock.Unlock()
stat, ok = cmdstats.opmap[opstr]
if ok && stat != nil {
return stat
}
return e
stat = &opStats{opstr: opstr}
for i := 0; i < IntervalNum; i++ {
stat.delayInfo[i] = &delayInfo{interval: IntervalMark[i]}
}
s.stats.opmap[opstr] = stat

return stat
}

func (s *Session) incrOpStats(r *Request, t redis.RespType) {
e := s.getOpStats(r.OpStr)
e.calls.Incr()
e.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
if r == nil {
return
}
responseTime := time.Now().UnixNano() - r.ReceiveTime
var (
ok bool
stat *opStats
)
stat, ok = s.stats.opmap[r.OpStr]
if !ok || stat == nil {
stat = getOpStats(r.OpStr, true)
s.stats.opmap[r.OpStr] = stat
}
stat.incrOpStats(responseTime, redis.RespType(t))
stat, ok = s.stats.opmap["ALL"]
if !ok || stat == nil {
stat = getOpStats("ALL", true)
s.stats.opmap["ALL"] = stat
}
stat.incrOpStats(responseTime, redis.RespType(t))
stat.calls.Incr()
stat.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
switch t {
case redis.TypeError:
e.redis.errors.Incr()
incrOpRedisErrors()
}
}

func (s *Session) incrOpFails(r *Request, err error) error {
if r != nil {
e := s.getOpStats(r.OpStr)
e := s.getOpStats(r.OpStr, true)
e.fails.Incr()
} else {
s.stats.fails.Incr()
Expand Down Expand Up @@ -762,7 +824,7 @@ func (s *Session) handlePConfig(r *Request) error {
}

func (s *Session) updateMaxDelay(duration int64, r *Request) {
e := s.getOpStats(r.OpStr) // There is no race condition in the session
e := s.getOpStats(r.OpStr, true) // There is no race condition in the session
if duration > e.maxDelay.Int64() {
e.maxDelay.Set(duration)
}
Expand Down
Loading

0 comments on commit 0d1b00f

Please sign in to comment.