Skip to content

Commit

Permalink
tikvclient: reduce wait backoff time when lock has be expired (#10006)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored May 17, 2019
1 parent a3b3326 commit 0130831
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 39 deletions.
7 changes: 7 additions & 0 deletions store/mockoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool {
return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL)
}

// UntilExpired implement oracle.Oracle interface.
func (o *MockOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 {
o.RLock()
defer o.RUnlock()
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now().Add(o.offset))
}

// Close implements oracle.Oracle interface.
func (o *MockOracle) Close() {

Expand Down
12 changes: 6 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,13 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -619,12 +619,12 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
locks = append(locks, lock)
}
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down
26 changes: 19 additions & 7 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (t backoffType) Counter() prometheus.Counter {
// NewBackoffFn creates a backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int {
if base < 2 {
// Top prevent panic in 'rand.Intn'.
base = 2
}
attempts := 0
lastSleep := base
return func(ctx context.Context) int {
return func(ctx context.Context, maxSleepMs int) int {
var sleep int
switch jitter {
case NoJitter:
Expand All @@ -102,8 +102,14 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
logutil.Logger(context.Background()).Debug("backoff",
zap.Int("base", base),
zap.Int("sleep", sleep))

realSleep := sleep
// when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds.
if maxSleepMs >= 0 && realSleep > maxSleepMs {
realSleep = maxSleepMs
}
select {
case <-time.After(time.Duration(sleep) * time.Millisecond):
case <-time.After(time.Duration(realSleep) * time.Millisecond):
case <-ctx.Done():
}

Expand All @@ -130,7 +136,7 @@ const (
boServerBusy
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context) int {
func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
if vars.Hook != nil {
vars.Hook(t.String(), vars)
}
Expand Down Expand Up @@ -217,7 +223,7 @@ var CommitMaxBackoff = 41000
type Backoffer struct {
ctx context.Context

fn map[backoffType]func(context.Context) int
fn map[backoffType]func(context.Context, int) int
maxSleep int
totalSleep int
errors []error
Expand Down Expand Up @@ -253,6 +259,12 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer {
// Backoff sleeps a while base on the backoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ backoffType, err error) error {
return b.BackoffWithMaxSleep(typ, -1, err)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err error) error {
if strings.Contains(err.Error(), mismatchClusterID) {
logutil.Logger(context.Background()).Fatal("critical error", zap.Error(err))
}
Expand All @@ -265,15 +277,15 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
typ.Counter().Inc()
// Lazy initialize.
if b.fn == nil {
b.fn = make(map[backoffType]func(context.Context) int)
b.fn = make(map[backoffType]func(context.Context, int) int)
}
f, ok := b.fn[typ]
if !ok {
f = typ.createFn(b.vars)
b.fn[typ] = f
}

b.totalSleep += f(b.ctx)
b.totalSleep += f(b.ctx, maxSleepMs)
b.types = append(b.types, typ)

var startTs interface{}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,12 +778,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.Logger(context.Background()).Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
if !ok {
if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
if msBeforeExpired > 0 {
if err := bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
33 changes: 23 additions & 10 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve")
tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired")
tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired")
tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired")
tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve")
tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status")
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
Expand Down Expand Up @@ -265,47 +266,59 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (ok bool, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
if len(locks) == 0 {
return true, nil
return
}

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
tikvLockResolverCountWithNotExpired.Inc()
}
}
if len(expiredLocks) == 0 {
return false, nil
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
return false, errors.Trace(err)
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}

cleanRegions := cleanTxns[l.TxnID]
if cleanRegions == nil {
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
return false, errors.Trace(err)
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}
}
return len(expiredLocks) == len(locks), nil
return
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
Expand Down
1 change: 1 addition & 0 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Oracle interface {
GetTimestamp(ctx context.Context) (uint64, error)
GetTimestampAsync(ctx context.Context) Future
IsExpired(lockTimestamp uint64, TTL uint64) bool
UntilExpired(lockTimeStamp uint64, TTL uint64) int64
Close()
}

Expand Down
54 changes: 54 additions & 0 deletions store/tikv/oracle/oracles/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package oracles

import (
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
)

// SetOracleHookCurrentTime exports localOracle's time hook to test.
func SetOracleHookCurrentTime(oc oracle.Oracle, t time.Time) {
switch o := oc.(type) {
case *localOracle:
if o.hook == nil {
o.hook = &struct {
currentTime time.Time
}{}
}
o.hook.currentTime = t
}
}

// ClearOracleHook exports localOracle's clear hook method
func ClearOracleHook(oc oracle.Oracle) {
switch o := oc.(type) {
case *localOracle:
o.hook = nil
}
}

// NewEmptyPDOracle exports pdOracle struct to test
func NewEmptyPDOracle() oracle.Oracle {
return &pdOracle{}
}

// SetEmptyPDOracleLastTs exports PD oracle's last ts to test.
func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) {
switch o := oc.(type) {
case *pdOracle:
o.lastTS = ts
}
}
24 changes: 22 additions & 2 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type localOracle struct {
sync.Mutex
lastTimeStampTS uint64
n uint64
hook *struct {
currentTime time.Time
}
}

// NewLocalOracle creates an Oracle that uses local time as data source.
Expand All @@ -35,13 +38,21 @@ func NewLocalOracle() oracle.Oracle {
}

func (l *localOracle) IsExpired(lockTS uint64, TTL uint64) bool {
return oracle.GetPhysical(time.Now()) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.GetPhysical(now) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
}

func (l *localOracle) GetTimestamp(context.Context) (uint64, error) {
l.Lock()
defer l.Unlock()
physical := oracle.GetPhysical(time.Now())
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
physical := oracle.GetPhysical(now)
ts := oracle.ComposeTS(physical, 0)
if l.lastTimeStampTS == ts {
l.n++
Expand All @@ -68,5 +79,14 @@ func (f *future) Wait() (uint64, error) {
return f.l.GetTimestamp(f.ctx)
}

// UntilExpired implement oracle.Oracle interface.
func (l *localOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 {
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(now)
}

func (l *localOracle) Close() {
}
26 changes: 21 additions & 5 deletions store/tikv/oracle/oracles/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package oracles
package oracles_test

import (
"context"
"testing"
"time"

"github.com/pingcap/tidb/store/tikv/oracle/oracles"
)

func TestLocalOracle(t *testing.T) {
l := NewLocalOracle()
l := oracles.NewLocalOracle()
defer l.Close()
m := map[uint64]struct{}{}
for i := 0; i < 100000; i++ {
Expand All @@ -37,11 +39,13 @@ func TestLocalOracle(t *testing.T) {
}

func TestIsExpired(t *testing.T) {
o := NewLocalOracle()
o := oracles.NewLocalOracle()
defer o.Close()
start := time.Now()
oracles.SetOracleHookCurrentTime(o, start)
ts, _ := o.GetTimestamp(context.Background())
time.Sleep(50 * time.Millisecond)
expire := o.IsExpired(uint64(ts), 40)
oracles.SetOracleHookCurrentTime(o, start.Add(10*time.Millisecond))
expire := o.IsExpired(uint64(ts), 5)
if !expire {
t.Error("should expired")
}
Expand All @@ -50,3 +54,15 @@ func TestIsExpired(t *testing.T) {
t.Error("should not expired")
}
}

func TestLocalOracle_UntilExpired(t *testing.T) {
o := oracles.NewLocalOracle()
defer o.Close()
start := time.Now()
oracles.SetOracleHookCurrentTime(o, start)
ts, _ := o.GetTimestamp(context.Background())
oracles.SetOracleHookCurrentTime(o, start.Add(10*time.Millisecond))
if o.UntilExpired(uint64(ts), 5) != -5 || o.UntilExpired(uint64(ts), 15) != 5 {
t.Error("until expired should be +-5")
}
}
Loading

0 comments on commit 0130831

Please sign in to comment.