Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta: add more logs when there is a lot of transaction restarts #2172

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,6 @@ type timeoutError interface {
}

func (m *redisMeta) shouldRetry(err error, retryOnFailure bool) bool {
if _, ok := err.(syscall.Errno); ok {
return false
}
switch err {
case redis.TxFailedErr:
return true
Expand Down Expand Up @@ -767,7 +764,6 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
panic(fmt.Sprintf("Invalid key %s not starts with prefix %s", k, m.prefix))
}
}
var err error
var khash = fnv.New32()
_, _ = khash.Write([]byte(keys[0]))
h := int(khash.Sum32())
Expand All @@ -777,21 +773,25 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
defer m.txUnlock(h)
// TODO: enable retry for some of idempodent transactions
var retryOnFailture = false
var lastErr error
for i := 0; i < 50; i++ {
err = m.rdb.Watch(ctx, txf, keys...)
if m.shouldRetry(err, retryOnFailture) {
err := m.rdb.Watch(ctx, txf, keys...)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if err != nil && m.shouldRetry(err, retryOnFailture) {
txRestart.Add(1)
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(rand.Int()%((i+1)*(i+1))))
continue
}
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
} else if err == nil && i > 1 {
logger.Warnf("Transaction succeeded after %d tries (%s), keys: %v, last error: %s", i+1, time.Since(start), keys, lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno {
Expand Down
36 changes: 18 additions & 18 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,6 @@ func mustInsert(s *xorm.Session, beans ...interface{}) error {
var errBusy error

func (m *dbMeta) shouldRetry(err error) bool {
if err == nil {
return false
}
if _, ok := err.(syscall.Errno); ok {
return false
}
// TODO: add other retryable errors here
msg := strings.ToLower(err.Error())
if strings.Contains(msg, "too many connections") || strings.Contains(msg, "too many clients") {
logger.Warnf("transaction failed: %s, will retry it. please increase the max number of connections in your database, or use a connection pool.", msg)
Expand Down Expand Up @@ -630,57 +623,64 @@ func (m *dbMeta) txn(f func(s *xorm.Session) error, inodes ...Ino) error {
m.txLock(int(inodes[0]))
defer m.txUnlock(int(inodes[0]))
}
var err error
var lastErr error
for i := 0; i < 50; i++ {
_, err = m.db.Transaction(func(s *xorm.Session) (interface{}, error) {
_, err := m.db.Transaction(func(s *xorm.Session) (interface{}, error) {
return nil, f(s)
})
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if m.shouldRetry(err) {
if err != nil && m.shouldRetry(err) {
txRestart.Add(1)
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(i*i))
continue
} else if err == nil && i > 1 {
logger.Warnf("Transaction succeeded after %d tries (%s), inodes: %v, last error: %s", i+1, time.Since(start), inodes, lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *dbMeta) roTxn(f func(s *xorm.Session) error) error {
start := time.Now()
defer func() { txDist.Observe(time.Since(start).Seconds()) }()
var err error
s := m.db.NewSession()
defer s.Close()
var opt = sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}
var lastErr error
for i := 0; i < 50; i++ {
if err := s.BeginTx(&opt); err != nil {
logger.Debugf("Start transaction failed, try again (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(i*i))
continue
}
err = f(s)
err := f(s)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
_ = s.Rollback()
if m.shouldRetry(err) {
if err != nil && m.shouldRetry(err) {
txRestart.Add(1)
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
logger.Debugf("Read transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(i*i))
continue
} else if err == nil && i > 1 {
logger.Warnf("Read transaction succeeded after %d tries (%s), last error: %s", i+1, time.Since(start), lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *dbMeta) parseAttr(n *node, attr *Attr) {
Expand Down
24 changes: 11 additions & 13 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,6 @@ func (m *kvMeta) ListSessions() ([]*Session, error) {
}

func (m *kvMeta) shouldRetry(err error) bool {
if err == nil {
return false
}
if _, ok := err.(syscall.Errno); ok {
return false
}
return m.client.shouldRetry(err)
}

Expand All @@ -641,21 +635,25 @@ func (m *kvMeta) txn(f func(tx kvTxn) error, inodes ...Ino) error {
m.txLock(int(inodes[0]))
defer m.txUnlock(int(inodes[0]))
}
var err error
var lastErr error
for i := 0; i < 50; i++ {
if err = m.client.txn(f); m.shouldRetry(err) {
err := m.client.txn(f)
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if err != nil && m.shouldRetry(err) {
txRestart.Add(1)
logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err)
lastErr = err
time.Sleep(time.Millisecond * time.Duration(rand.Int()%((i+1)*(i+1))))
continue
}
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
} else if err == nil && i > 1 {
logger.Warnf("Transaction succeeded after %d tries (%s), inodes: %v, error: %s", i+1, time.Since(start), inodes, lastErr)
}
return err
}
logger.Warnf("Already tried 50 times, returning: %s", err)
return err
logger.Warnf("Already tried 50 times, returning: %s", lastErr)
return lastErr
}

func (m *kvMeta) setValue(key, value []byte) error {
Expand Down