Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Jun 7, 2022
1 parent 01ef1bc commit 9fd78c1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 81 deletions.
33 changes: 12 additions & 21 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,6 @@ type timeoutError interface {
}

func (m *redisMeta) shouldRetry(err error, retryOnFailure bool) bool {
if _, ok := err.(syscall.Errno); ok {
return false
}
switch err {
case redis.TxFailedErr:
return true
Expand Down Expand Up @@ -767,7 +764,6 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
panic(fmt.Sprintf("Invalid key %s not starts with prefix %s", k, m.prefix))
}
}
var err error
var khash = fnv.New32()
_, _ = khash.Write([]byte(keys[0]))
h := int(khash.Sum32())
Expand All @@ -777,30 +773,25 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
defer m.txUnlock(h)
// TODO: enable retry for some of idempodent transactions
var retryOnFailture = false
var lastErr error
for i := 0; i < 50; i++ {
err = m.rdb.Watch(ctx, txf, keys...)
if m.shouldRetry(err, retryOnFailture) {
err := m.rdb.Watch(ctx, txf, keys...)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if err != nil && m.shouldRetry(err, retryOnFailture) {
txRestart.Add(1)
msg := fmt.Sprintf("Transaction failed, restart it (tried %d): %s", i+1, err)
if i+1 == 10 {
logger.Infof(msg)
} else if (i+1)%10 == 0 {
logger.Warnf(msg)
} else {
logger.Debugf(msg)
}
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(rand.Int()%((i+1)*(i+1))))
continue
} else if err == nil && i > 0 {
logger.Warnf("Transaction succeeded after %d tries, keys: %v", i+1, keys)
}
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
} else if err == nil && i > 1 {
logger.Warnf("Transaction succeeded after %d tries (%s), keys: %v, last error: %s", i+1, time.Since(start), keys, lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno {
Expand Down
56 changes: 19 additions & 37 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,6 @@ func mustInsert(s *xorm.Session, beans ...interface{}) error {
var errBusy error

func (m *dbMeta) shouldRetry(err error) bool {
if err == nil {
return false
}
if _, ok := err.(syscall.Errno); ok {
return false
}
// TODO: add other retryable errors here
msg := strings.ToLower(err.Error())
if strings.Contains(msg, "too many connections") || strings.Contains(msg, "too many clients") {
logger.Warnf("transaction failed: %s, will retry it. please increase the max number of connections in your database, or use a connection pool.", msg)
Expand Down Expand Up @@ -630,75 +623,64 @@ func (m *dbMeta) txn(f func(s *xorm.Session) error, inodes ...Ino) error {
m.txLock(int(inodes[0]))
defer m.txUnlock(int(inodes[0]))
}
var err error
var lastErr error
for i := 0; i < 50; i++ {
_, err = m.db.Transaction(func(s *xorm.Session) (interface{}, error) {
_, err := m.db.Transaction(func(s *xorm.Session) (interface{}, error) {
return nil, f(s)
})
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if m.shouldRetry(err) {
if err != nil && m.shouldRetry(err) {
txRestart.Add(1)
msg := fmt.Sprintf("Transaction failed, restart it (tried %d): %s", i+1, err)
if i+1 == 10 {
logger.Infof(msg)
} else if (i+1)%10 == 0 {
logger.Warnf(msg)
} else {
logger.Debugf(msg)
}
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(i*i))
continue
} else if err == nil && i > 0 {
logger.Warnf("Transaction succeeded after %d tries, inodes: %v", i+1, inodes)
} else if err == nil && i > 1 {
logger.Warnf("Transaction succeeded after %d tries (%s), inodes: %v, last error: %s", i+1, time.Since(start), inodes, lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *dbMeta) roTxn(f func(s *xorm.Session) error) error {
start := time.Now()
defer func() { txDist.Observe(time.Since(start).Seconds()) }()
var err error
s := m.db.NewSession()
defer s.Close()
var opt = sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}
var lastErr error
for i := 0; i < 50; i++ {
if err := s.BeginTx(&opt); err != nil {
logger.Debugf("Start transaction failed, try again (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(i*i))
continue
}
err = f(s)
err := f(s)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
_ = s.Rollback()
if m.shouldRetry(err) {
if err != nil && m.shouldRetry(err) {
txRestart.Add(1)
msg := fmt.Sprintf("RoTransaction failed, restart it (tried %d): %s", i+1, err)
if i+1 == 10 {
logger.Infof(msg)
} else if (i+1)%10 == 0 {
logger.Warnf(msg)
} else {
logger.Debugf(msg)
}
logger.Debugf("Read transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(i*i))
continue
} else if err == nil && i > 0 {
logger.Warnf("RoTransaction succeeded after %d tries", i+1)
} else if err == nil && i > 1 {
logger.Warnf("Read transaction succeeded after %d tries (%s), last error: %s", i+1, time.Since(start), lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *dbMeta) parseAttr(n *node, attr *Attr) {
Expand Down
35 changes: 12 additions & 23 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,6 @@ func (m *kvMeta) ListSessions() ([]*Session, error) {
}

func (m *kvMeta) shouldRetry(err error) bool {
if err == nil {
return false
}
if _, ok := err.(syscall.Errno); ok {
return false
}
return m.client.shouldRetry(err)
}

Expand All @@ -641,30 +635,25 @@ func (m *kvMeta) txn(f func(tx kvTxn) error, inodes ...Ino) error {
m.txLock(int(inodes[0]))
defer m.txUnlock(int(inodes[0]))
}
var err error
var lastErr error
for i := 0; i < 50; i++ {
if err = m.client.txn(f); m.shouldRetry(err) {
err := m.client.txn(f)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if err != nil && m.shouldRetry(err) {
txRestart.Add(1)
msg := fmt.Sprintf("Transaction failed, restart it (tried %d): %s", i+1, err)
if i+1 == 10 {
logger.Infof(msg)
} else if (i+1)%10 == 0 {
logger.Warnf(msg)
} else {
logger.Debugf(msg)
}
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(rand.Int()%((i+1)*(i+1))))
continue
} else if err == nil && i > 0 {
logger.Warnf("Transaction succeeded after %d tries, inodes: %v", i+1, inodes)
}
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
} else if err == nil && i > 1 {
logger.Warnf("Transaction succeeded after %d tries (%s), inodes: %v, error: %s", i+1, time.Since(start), inodes, lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *kvMeta) setValue(key, value []byte) error {
Expand Down

0 comments on commit 9fd78c1

Please sign in to comment.