diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 9bace2fd7438..381168a19b2c 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -710,9 +710,6 @@ type timeoutError interface { } func (m *redisMeta) shouldRetry(err error, retryOnFailure bool) bool { - if _, ok := err.(syscall.Errno); ok { - return false - } switch err { case redis.TxFailedErr: return true @@ -767,7 +764,6 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin panic(fmt.Sprintf("Invalid key %s not starts with prefix %s", k, m.prefix)) } } - var err error var khash = fnv.New32() _, _ = khash.Write([]byte(keys[0])) h := int(khash.Sum32()) @@ -777,21 +773,25 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin defer m.txUnlock(h) // TODO: enable retry for some of idempodent transactions var retryOnFailture = false + var lastErr error for i := 0; i < 50; i++ { - err = m.rdb.Watch(ctx, txf, keys...) - if m.shouldRetry(err, retryOnFailture) { + err := m.rdb.Watch(ctx, txf, keys...) + if eno, ok := err.(syscall.Errno); ok && eno == 0 { + err = nil + } + if err != nil && m.shouldRetry(err, retryOnFailture) { txRestart.Add(1) logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err) + lastErr = err time.Sleep(time.Millisecond * time.Duration(rand.Int()%((i+1)*(i+1)))) continue - } - if eno, ok := err.(syscall.Errno); ok && eno == 0 { - err = nil + } else if err == nil && i > 1 { + logger.Warnf("Transaction succeeded after %d tries (%s), keys: %v, last error: %s", i+1, time.Since(start), keys, lastErr) } return err } - logger.Warnf("Already tried 50 times, returning: %s", err) - return err + logger.Warnf("Already tried 50 times, returning: %s", lastErr) + return lastErr } func (m *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno { diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 7466e878b20f..ff0992e202c9 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -588,13 +588,6 @@ func mustInsert(s *xorm.Session, beans ...interface{}) error { var errBusy error func (m *dbMeta) shouldRetry(err error) bool { - if err == nil { - return false - } - if _, ok := err.(syscall.Errno); ok { - return false - } - // TODO: add other retryable errors here msg := strings.ToLower(err.Error()) if strings.Contains(msg, "too many connections") || strings.Contains(msg, "too many clients") { logger.Warnf("transaction failed: %s, will retry it. please increase the max number of connections in your database, or use a connection pool.", msg) @@ -630,57 +623,64 @@ func (m *dbMeta) txn(f func(s *xorm.Session) error, inodes ...Ino) error { m.txLock(int(inodes[0])) defer m.txUnlock(int(inodes[0])) } - var err error + var lastErr error for i := 0; i < 50; i++ { - _, err = m.db.Transaction(func(s *xorm.Session) (interface{}, error) { + _, err := m.db.Transaction(func(s *xorm.Session) (interface{}, error) { return nil, f(s) }) if eno, ok := err.(syscall.Errno); ok && eno == 0 { err = nil } - if m.shouldRetry(err) { + if err != nil && m.shouldRetry(err) { txRestart.Add(1) logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err) + lastErr = err time.Sleep(time.Millisecond * time.Duration(i*i)) continue + } else if err == nil && i > 1 { + logger.Warnf("Transaction succeeded after %d tries (%s), inodes: %v, last error: %s", i+1, time.Since(start), inodes, lastErr) } return err } - logger.Warnf("Already tried 50 times, returning: %s", err) - return err + logger.Warnf("Already tried 50 times, returning: %s", lastErr) + return lastErr } func (m *dbMeta) roTxn(f func(s *xorm.Session) error) error { start := time.Now() defer func() { txDist.Observe(time.Since(start).Seconds()) }() - var err error s := m.db.NewSession() defer s.Close() var opt = sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, } + var lastErr error for i := 0; i < 50; i++ { if err := s.BeginTx(&opt); err != nil { logger.Debugf("Start transaction failed, try again (tried %d): %s", i+1, err) + lastErr = err time.Sleep(time.Millisecond * time.Duration(i*i)) continue } - err = f(s) + err := f(s) if eno, ok := err.(syscall.Errno); ok && eno == 0 { err = nil } _ = s.Rollback() - if m.shouldRetry(err) { + if err != nil && m.shouldRetry(err) { txRestart.Add(1) - logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err) + logger.Debugf("Read transaction failed, restart it (tried %d): %s", i+1, err) + lastErr = err time.Sleep(time.Millisecond * time.Duration(i*i)) continue + } else if err == nil && i > 1 { + logger.Warnf("Read transaction succeeded after %d tries (%s), last error: %s", i+1, time.Since(start), lastErr) } return err } - logger.Warnf("Already tried 50 times, returning: %s", err) - return err + logger.Warnf("Already tried 50 times, returning: %s", lastErr) + return lastErr } func (m *dbMeta) parseAttr(n *node, attr *Attr) { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 4676161b3040..2d7076a8376b 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -622,12 +622,6 @@ func (m *kvMeta) ListSessions() ([]*Session, error) { } func (m *kvMeta) shouldRetry(err error) bool { - if err == nil { - return false - } - if _, ok := err.(syscall.Errno); ok { - return false - } return m.client.shouldRetry(err) } @@ -641,21 +635,25 @@ func (m *kvMeta) txn(f func(tx kvTxn) error, inodes ...Ino) error { m.txLock(int(inodes[0])) defer m.txUnlock(int(inodes[0])) } - var err error + var lastErr error for i := 0; i < 50; i++ { - if err = m.client.txn(f); m.shouldRetry(err) { + err := m.client.txn(f) + if eno, ok := err.(syscall.Errno); ok && eno == 0 { + err = nil + } + if err != nil && m.shouldRetry(err) { txRestart.Add(1) logger.Debugf("Transaction failed, restart it (tried %d): %s", i+1, err) + lastErr = err time.Sleep(time.Millisecond * time.Duration(rand.Int()%((i+1)*(i+1)))) continue - } - if eno, ok := err.(syscall.Errno); ok && eno == 0 { - err = nil + } else if err == nil && i > 1 { + logger.Warnf("Transaction succeeded after %d tries (%s), inodes: %v, error: %s", i+1, time.Since(start), inodes, lastErr) } return err } - logger.Warnf("Already tried 50 times, returning: %s", err) - return err + logger.Warnf("Already tried 50 times, returning: %s", lastErr) + return lastErr } func (m *kvMeta) setValue(key, value []byte) error {