Skip to content

Commit

Permalink
Merge branch 'unstable' into fix_bitmap_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored and brother-jin committed Jan 18, 2024
2 parents cb4ceb4 + c1917c7 commit 9c1731c
Show file tree
Hide file tree
Showing 20 changed files with 378 additions and 30 deletions.
12 changes: 10 additions & 2 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ backend_max_pipeline = 20480
backend_primary_only = false

# Set backend parallel connections per server
backend_primary_parallel = 1
backend_replica_parallel = 1
backend_primary_parallel = 2
backend_replica_parallel = 2
# Set quick backend parallel connections per server
backend_primary_quick = 1
backend_replica_quick = 1

# Set slot num
max_slot_num = 1024
Expand Down Expand Up @@ -102,6 +105,11 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000

# quick command list e.g. get, set
quick_cmd_list = ""
# slow command list e.g. hgetall, mset
slow_cmd_list = ""

# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down
2 changes: 1 addition & 1 deletion codis/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module pika/codis/v2

go 1.18
go 1.19

replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.4

Expand Down
42 changes: 34 additions & 8 deletions codis/pkg/proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *sharedBackendConn) KeepAlive() {
}
}

func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *BackendConn {
func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool, isQuick bool) *BackendConn {
if s == nil {
return nil
}
Expand All @@ -474,14 +474,35 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *B
}

var parallel = s.conns[database]

var i = seed
for range parallel {
i = (i + 1) % uint(len(parallel))
if bc := parallel[i]; bc.IsConnected() {
return bc

/**
The seed is the result after hashing using a key, so in order to ensure
the execution order of the same key in a pipeline, do not select another
connection when the first connection is invalid.
*/
if quick := s.owner.quick; quick > 0 {
if isQuick {
i = seed % uint(quick)
if bc := parallel[i]; bc.IsConnected() {
return bc
}
} else {
i = uint(quick) + seed%uint(len(parallel)-quick)
if bc := parallel[i]; bc.IsConnected() {
return bc
}
}
} else {
for range parallel {
i = (i + 1) % uint(len(parallel))
if bc := parallel[i]; bc.IsConnected() {
//log.Debugf("BackendConn: find all bc[%d]", i)
return bc
}
}
}

if !must {
return nil
}
Expand All @@ -491,18 +512,23 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *B
type sharedBackendConnPool struct {
config *Config
parallel int
quick int // The number of quick backend connection

pool map[string]*sharedBackendConn
}

func newSharedBackendConnPool(config *Config, parallel int) *sharedBackendConnPool {
func newSharedBackendConnPool(config *Config, parallel, quick int) *sharedBackendConnPool {
p := &sharedBackendConnPool{
config: config, parallel: math2.MaxInt(1, parallel),
config: config, parallel: math2.MaxInt(1, parallel), quick: math2.MaxInt(math2.MinInt(quick, parallel-1), 0),
}
p.pool = make(map[string]*sharedBackendConn)
return p
}

func (p *sharedBackendConnPool) SetQuickConn(quick int) {
p.quick = math2.MaxInt(math2.MinInt(quick, p.parallel-1), 0)
}

func (p *sharedBackendConnPool) KeepAlive() {
for _, bc := range p.pool {
bc.KeepAlive()
Expand Down
23 changes: 21 additions & 2 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ backend_max_pipeline = 20480
backend_primary_only = false
# Set backend parallel connections per server
backend_primary_parallel = 1
backend_replica_parallel = 1
backend_primary_parallel = 2
backend_replica_parallel = 2
# Set quick backend parallel connections per server
backend_primary_quick = 1
backend_replica_quick = 1
# Set slot num
max_slot_num = 1024
Expand Down Expand Up @@ -118,6 +121,11 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000
# quick command list
quick_cmd_list = "get,set"
# slow command list
slow_cmd_list = "mget, mset"
# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down Expand Up @@ -169,8 +177,10 @@ type Config struct {
BackendMaxPipeline int `toml:"backend_max_pipeline" json:"backend_max_pipeline"`
BackendPrimaryOnly bool `toml:"backend_primary_only" json:"backend_primary_only"`
BackendPrimaryParallel int `toml:"backend_primary_parallel" json:"backend_primary_parallel"`
BackendPrimaryQuick int `toml:"backend_primary_quick" json:"backend_primary_quick"`
MaxSlotNum int `toml:"max_slot_num" json:"max_slot_num"`
BackendReplicaParallel int `toml:"backend_replica_parallel" json:"backend_replica_parallel"`
BackendReplicaQuick int `toml:"backend_replica_quick" json:"backend_replica_quick"`
BackendKeepAlivePeriod timesize.Duration `toml:"backend_keepalive_period" json:"backend_keepalive_period"`
BackendNumberDatabases int32 `toml:"backend_number_databases" json:"backend_number_databases"`

Expand All @@ -184,6 +194,9 @@ type Config struct {

SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"`

QuickCmdList string `toml:"quick_cmd_list" json:"quick_cmd_list"`
SlowCmdList string `toml:"slow_cmd_list" json:"slow_cmd_list"`

MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"`
MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
MetricsReportInfluxdbServer string `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"`
Expand Down Expand Up @@ -285,9 +298,15 @@ func (c *Config) Validate() error {
if c.BackendPrimaryParallel < 0 {
return errors.New("invalid backend_primary_parallel")
}
if c.BackendPrimaryQuick < 0 || c.BackendPrimaryQuick >= c.BackendPrimaryParallel {
return errors.New("invalid backend_primary_quick")
}
if c.BackendReplicaParallel < 0 {
return errors.New("invalid backend_replica_parallel")
}
if c.BackendReplicaQuick < 0 || c.BackendReplicaQuick >= c.BackendReplicaParallel {
return errors.New("invalid backend_replica_quick")
}
if c.BackendKeepAlivePeriod < 0 {
return errors.New("invalid backend_keepalive_period")
}
Expand Down
12 changes: 7 additions & 5 deletions codis/pkg/proxy/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin
}
m.Batch = &sync.WaitGroup{}

s.migrate.bc.BackendConn(database, seed, true).PushBack(m)
s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m)

m.Batch.Wait()

Expand Down Expand Up @@ -176,7 +176,7 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3
m.Multi = append(m.Multi, multi...)
m.Batch = &sync.WaitGroup{}

s.migrate.bc.BackendConn(database, seed, true).PushBack(m)
s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m)

m.Batch.Wait()

Expand Down Expand Up @@ -214,17 +214,19 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3
}

func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn {
var database, seed = r.Database, r.Seed16()
var database = r.Database
if s.migrate.bc == nil && !r.IsMasterOnly() && len(s.replicaGroups) != 0 {
var seed = r.Seed16()
for _, group := range s.replicaGroups {
var i = seed
for range group {
i = (i + 1) % uint(len(group))
if bc := group[i].BackendConn(database, seed, false); bc != nil {
if bc := group[i].BackendConn(database, seed, false, r.OpFlag.IsQuick()); bc != nil {
return bc
}
}
}
}
return s.backend.bc.BackendConn(database, seed, true)
// fix:https://github.com/OpenAtomFoundation/pika/issues/2174
return s.backend.bc.BackendConn(database, uint(s.id), true, r.OpFlag.IsQuick())
}
83 changes: 81 additions & 2 deletions codis/pkg/proxy/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"hash/crc32"
"strconv"
"strings"
"sync"

"pika/codis/v2/pkg/proxy/redis"
"pika/codis/v2/pkg/utils/errors"
"pika/codis/v2/pkg/utils/log"
)

var charmap [256]byte
Expand Down Expand Up @@ -45,19 +47,28 @@ func (f OpFlag) IsMasterOnly() bool {
return (f & mask) != 0
}

func (f OpFlag) IsQuick() bool {
return (f & FlagQuick) != 0
}

type OpInfo struct {
Name string
Flag OpFlag
}

const (
FlagWrite = 1 << iota
FlagWrite OpFlag = 1 << iota
FlagMasterOnly
FlagMayWrite
FlagNotAllow
FlagQuick
FlagSlow
)

var opTable = make(map[string]OpInfo, 256)
var (
opTableLock sync.RWMutex
opTable = make(map[string]OpInfo, 256)
)

func init() {
for _, i := range []OpInfo{
Expand Down Expand Up @@ -290,6 +301,10 @@ func getOpInfo(multi []*redis.Resp) (string, OpFlag, error) {
}
}
op = upper[:len(op)]

opTableLock.RLock()
defer opTableLock.RUnlock()

if r, ok := opTable[string(op)]; ok {
return r.Name, r.Flag, nil
}
Expand Down Expand Up @@ -347,3 +362,67 @@ func getWholeCmd(multi []*redis.Resp, cmd []byte) int {
}
return index
}

func setCmdListFlag(cmdlist string, flag OpFlag) error {
reverseFlag := FlagSlow
flagString := "FlagQuick"
if flag&FlagSlow != 0 {
reverseFlag = FlagQuick
flagString = "FlagSlow"
}

opTableLock.Lock()
defer opTableLock.Unlock()

for _, r := range opTable {
r.Flag = r.Flag &^ flag
opTable[r.Name] = r
}
if len(cmdlist) == 0 {
return nil
}
cmdlist = strings.ToUpper(cmdlist)
cmds := strings.Split(cmdlist, ",")
for i := 0; i < len(cmds); i++ {
if r, ok := opTable[strings.TrimSpace(cmds[i])]; ok {
log.Infof("before setCmdListFlag: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
if r.Flag&reverseFlag == 0 {
r.Flag = r.Flag | flag
opTable[strings.TrimSpace(cmds[i])] = r
log.Infof("after setCmdListFlag: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
} else {
log.Warnf("cmd[%s] is %s command.", cmds[i], flagString)
return errors.Errorf("cmd[%s] is %s command.", cmds[i], flagString)
}
} else {
log.Warnf("can not find [%s] command.", cmds[i])
return errors.Errorf("can not find [%s] command.", cmds[i])
}
}
return nil
}

func getCmdFlag() *redis.Resp {
var array = make([]*redis.Resp, 0, 32)
const mask = FlagQuick | FlagSlow

opTableLock.RLock()
defer opTableLock.RUnlock()

for _, r := range opTable {
if r.Flag&mask != 0 {
retStr := r.Name + " : Flag[" + strconv.Itoa(int(r.Flag)) + "]"

if r.Flag&FlagQuick != 0 {
retStr += ", FlagQuick"
}

if r.Flag&FlagSlow != 0 {
retStr += ", FlagSlow"
}

array = append(array, redis.NewBulkBytes([]byte(retStr)))
}
}
return redis.NewArray(array)
}
Loading

0 comments on commit 9c1731c

Please sign in to comment.