Skip to content

Commit

Permalink
executor: change pessimistic lock wait start for one statement (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and eggeek committed Dec 6, 2019
1 parent cda9be7 commit 26860aa
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout,
sctx.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...)
if err == nil {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"math"
"time"

"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -432,7 +433,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, time.Now(), recordKey)
if err != nil {
return result, err
}
Expand Down
3 changes: 2 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, waitTime int64, keys
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime,
se.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...)
}

// LimitExec represents limit executor
Expand Down
3 changes: 2 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64,
lockWaitTime int64, waitStartTime time.Time, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down
3 changes: 2 additions & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
)
Expand All @@ -39,7 +40,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ time.Time, _ ...Key) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

. "github.com/pingcap/check"
)
Expand All @@ -38,7 +39,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, time.Now(), Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
37 changes: 37 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,3 +718,40 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {

wg.Wait()
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) {
// prepare work
tk := testkit.NewTestKitWithInit(c, s.store)
defer tk.MustExec("drop table if exists tk")
tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)")
tk.MustExec("set global innodb_lock_wait_timeout = 1")

// raise pessimistic transaction in tk2 and trigger failpoint returning ErrWriteConflict
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))

// tk3 gets the pessimistic lock
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from tk where c1 = 1 for update")

tk2.MustExec("begin pessimistic")
done := make(chan error)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict", "return"), IsNil)
start := time.Now()
go func() {
var err error
defer func() {
done <- err
}()
_, err = tk2.Exec("select * from tk where c1 = 1 for update")
}()
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil)
waitErr := <-done
c.Assert(waitErr, NotNil)
c.Check(waitErr.Error(), Equals, tikv.ErrLockWaitTimeout.Error())
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond))
}
14 changes: 12 additions & 2 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ type StatementContext struct {
normalized string
digest string
}
Tables []TableEntry
PointExec bool // for point update cached execution, Constant expression need to set "paramMarker"
Tables []TableEntry
PointExec bool // for point update cached execution, Constant expression need to set "paramMarker"
lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -584,6 +585,15 @@ func (sc *StatementContext) SetFlagsFromPBFlag(flags uint64) {
sc.DividedByZeroAsWarning = (flags & model.FlagDividedByZeroAsWarning) > 0
}

// GetLockWaitStartTime returns the statement pessimistic lock wait start time
func (sc *StatementContext) GetLockWaitStartTime() time.Time {
if sc.lockWaitStartTime == nil {
curTime := time.Now()
sc.lockWaitStartTime = &curTime
}
return *sc.lockWaitStartTime
}

//CopTasksDetails collects some useful information of cop-tasks during execution.
type CopTasksDetails struct {
NumCopTasks int
Expand Down
17 changes: 11 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type actionPrewrite struct{}
type actionCommit struct{}
type actionCleanup struct{}
type actionPessimisticLock struct {
killed *uint32
lockWaitTime int64
killed *uint32
lockWaitTime int64
waitStartTime time.Time
}
type actionPessimisticRollback struct{}

Expand Down Expand Up @@ -687,7 +688,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
IsFirstLock: c.isFirstLock,
WaitTimeout: action.lockWaitTime,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
lockWaitStartTime := time.Now()
lockWaitStartTime := action.waitStartTime
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
if action.lockWaitTime > 0 {
Expand All @@ -698,6 +699,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
req.PessimisticLock().WaitTimeout = timeLeft
}
}
failpoint.Inject("PessimisticLockErrWriteConflict", func() error {
time.Sleep(300 * time.Millisecond)
return kv.ErrWriteConflict
})
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand All @@ -711,7 +716,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
if err != nil {
return errors.Trace(err)
}
err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, batch.keys)
err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, lockWaitStartTime, batch.keys)
return errors.Trace(err)
}
if resp.Resp == nil {
Expand Down Expand Up @@ -1006,8 +1011,8 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
}

func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64,
keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime}, keys)
waitStartTime time.Time, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime, waitStartTime}, keys)
}

func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error {
Expand Down
12 changes: 6 additions & 6 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
c.Assert(txn.Set(key, key), IsNil)
txn.DelOption(kv.PresumeKeyNotExistsError)
txn.DelOption(kv.PresumeKeyNotExists)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key)
c.Assert(err, NotNil)
c.Assert(txn.Delete(key), IsNil)
key2 := kv.Key("key2")
Expand All @@ -573,9 +573,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def"))
err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def"))
err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
c.Assert(txn.lockKeys, HasLen, 2)
}
Expand All @@ -585,11 +585,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
time.Sleep(time.Millisecond * 100)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key)
c.Assert(err, IsNil)
time.Sleep(time.Millisecond * 100)
key2 := kv.Key("key2")
err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2)
err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key2)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl)
Expand Down Expand Up @@ -627,7 +627,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
txn.SetOption(kv.Pessimistic, true)
time.Sleep(time.Millisecond * 100)
forUpdateTS := oracle.ComposeTS(oracle.ExtractPhysical(txn.startTS)+100, 1)
err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, key)
err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, time.Now(), key)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
c.Assert(lockInfo.LockTtl-ManagedLockTTL, GreaterEqual, uint64(100))
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) {
txn := s.beginTxn(c)
err := txn.Set(encodeKey(s.prefix, "key"), []byte("value"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, encodeKey(s.prefix, "key"))
err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), encodeKey(s.prefix, "key"))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error {
}

// lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock
func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keysInput ...kv.Key) error {
func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64,
lockWaitTime int64, waitStartTime time.Time, keysInput ...kv.Key) error {
// Exclude keys that are already locked.
keys := make([][]byte, 0, len(keysInput))
txn.mu.Lock()
Expand Down Expand Up @@ -406,7 +407,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1
err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, keys)
err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, waitStartTime, keys)
if killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
Expand Down

0 comments on commit 26860aa

Please sign in to comment.