diff --git a/cmd/explaintest/r/explain_easy.result b/cmd/explaintest/r/explain_easy.result index ba83ace8edb2b..9ca1062c69484 100644 --- a/cmd/explaintest/r/explain_easy.result +++ b/cmd/explaintest/r/explain_easy.result @@ -434,6 +434,9 @@ id count task operator info Projection_3 10000.00 root or(NULL, gt(test.t.a, 1)) └─TableReader_5 10000.00 root data:TableScan_4 └─TableScan_4 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo +explain select * from t where a = 1 for update; +id count task operator info +Point_Get_1 1.00 root table:t, handle:1 drop table if exists ta, tb; create table ta (a varchar(20)); create table tb (a varchar(20)); diff --git a/cmd/explaintest/t/explain_easy.test b/cmd/explaintest/t/explain_easy.test index a8cd985164413..c5044c96069cd 100644 --- a/cmd/explaintest/t/explain_easy.test +++ b/cmd/explaintest/t/explain_easy.test @@ -79,6 +79,7 @@ drop table if exists t; create table t(a bigint primary key); explain select * from t where a = 1 and a = 2; explain select null or a > 1 from t; +explain select * from t where a = 1 for update; drop table if exists ta, tb; create table ta (a varchar(20)); diff --git a/executor/adapter.go b/executor/adapter.go index fa535d62239ff..6f6af7de5545a 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -401,7 +401,12 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { for { _, err = a.handleNoDelayExecutor(ctx, e) if err != nil { - return err + // It is possible the DML has point get plan that locks the key. + e, err = a.handlePessimisticLockError(ctx, err) + if err != nil { + return err + } + continue } keys, err1 := txn.(pessimisticTxn).KeysNeedToLock() if err1 != nil { @@ -412,21 +417,18 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { } forUpdateTS := txnCtx.GetForUpdateTS() err = txn.LockKeys(ctx, forUpdateTS, keys...) + if err == nil { + return nil + } e, err = a.handlePessimisticLockError(ctx, err) if err != nil { return err } - if e == nil { - return nil - } } } // handlePessimisticLockError updates TS and rebuild executor if the err is write conflict. func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) { - if err == nil { - return nil, nil - } txnCtx := a.Ctx.GetSessionVars().TxnCtx var newForUpdateTS uint64 if deadlock, ok := errors.Cause(err).(*tikv.ErrDeadlock); ok { diff --git a/executor/point_get.go b/executor/point_get.go index 587791fc8741e..80730ca2c48c4 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -43,7 +43,9 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { idxVals: p.IndexValues, handle: p.Handle, startTS: startTS, + lock: p.Lock, } + b.isSelectForUpdate = p.IsForUpdate e.base().initCap = 1 e.base().maxChunkSize = 1 return e @@ -60,6 +62,7 @@ type PointGetExecutor struct { startTS uint64 snapshot kv.Snapshot done bool + lock bool } // Open implements the Executor interface. @@ -95,7 +98,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return err1 } if len(handleVal) == 0 { - return nil + return e.lockKeyIfNeeded(ctx, idxKey) } e.handle, err1 = tables.DecodeHandle(handleVal) if err1 != nil { @@ -122,6 +125,10 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil && !kv.ErrNotExist.Equal(err) { return err } + err = e.lockKeyIfNeeded(ctx, key) + if err != nil { + return err + } if len(val) == 0 { if e.idxInfo != nil { return kv.ErrNotExist.GenWithStack("inconsistent extra index %s, handle %d not found in table", @@ -132,6 +139,17 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return e.decodeRowValToChunk(val, req) } +func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error { + if e.lock { + txn, err := e.ctx.Txn(true) + if err != nil { + return err + } + return txn.LockKeys(ctx, e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS(), kv.Key(key)) + } + return nil +} + func (e *PointGetExecutor) encodeIndexKey() (_ []byte, err error) { sc := e.ctx.GetSessionVars().StmtCtx for i := range e.idxVals { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 6ad194e01a58a..6bf2b6e197066 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -48,6 +48,8 @@ type PointGetPlan struct { expr expression.Expression ctx sessionctx.Context IsTableDual bool + Lock bool + IsForUpdate bool } type nameValuePair struct { @@ -141,6 +143,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { tableDual.SetSchema(fp.Schema()) return tableDual.Init(ctx, &property.StatsInfo{}) } + if x.LockTp == ast.SelectLockForUpdate { + fp.Lock = true + fp.IsForUpdate = true + } return fp } case *ast.UpdateStmt: @@ -159,7 +165,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { // 3. All the columns must be public and generated. // 4. The condition is an access path that the range is a unique key. func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetPlan { - if selStmt.Having != nil || selStmt.LockTp != ast.SelectLockNone { + if selStmt.Having != nil { return nil } else if selStmt.Limit != nil { count, offset, err := extractLimitCountOffset(ctx, selStmt.Limit) @@ -452,6 +458,9 @@ func tryUpdatePointPlan(ctx sessionctx.Context, updateStmt *ast.UpdateStmt) Plan if fastSelect.IsTableDual { return PhysicalTableDual{}.Init(ctx, &property.StatsInfo{}) } + if ctx.GetSessionVars().TxnCtx.IsPessimistic { + fastSelect.Lock = true + } orderedList := buildOrderedList(ctx, fastSelect, updateStmt.List) if orderedList == nil { return nil @@ -512,6 +521,9 @@ func tryDeletePointPlan(ctx sessionctx.Context, delStmt *ast.DeleteStmt) Plan { if fastSelect.IsTableDual { return PhysicalTableDual{}.Init(ctx, &property.StatsInfo{}) } + if ctx.GetSessionVars().TxnCtx.IsPessimistic { + fastSelect.Lock = true + } delPlan := Delete{ SelectPlan: fastSelect, }.Init(ctx) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 30cd2146075e8..1fb6332b1c8b6 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -278,3 +278,47 @@ func (s *testPessimisticSuite) TestInsertOnDup(c *C) { tk.MustExec("commit") tk.MustQuery("select * from dup").Check(testkit.Rows("1 2")) } + +func (s *testPessimisticSuite) TestPointGetKeyLock(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists point") + tk.MustExec("create table point (id int primary key, u int unique, c int)") + syncCh := make(chan struct{}) + + tk.MustExec("begin pessimistic") + tk.MustExec("update point set c = c + 1 where id = 1") + tk.MustExec("delete from point where u = 2") + go func() { + tk2.MustExec("begin pessimistic") + _, err1 := tk2.Exec("insert point values (1, 1, 1)") + c.Check(kv.ErrKeyExists.Equal(err1), IsTrue) + _, err1 = tk2.Exec("insert point values (2, 2, 2)") + c.Check(kv.ErrKeyExists.Equal(err1), IsTrue) + tk2.MustExec("rollback") + <-syncCh + }() + time.Sleep(time.Millisecond * 10) + tk.MustExec("insert point values (1, 1, 1)") + tk.MustExec("insert point values (2, 2, 2)") + tk.MustExec("commit") + syncCh <- struct{}{} + + tk.MustExec("begin pessimistic") + tk.MustExec("select * from point where id = 3 for update") + tk.MustExec("select * from point where u = 4 for update") + go func() { + tk2.MustExec("begin pessimistic") + _, err1 := tk2.Exec("insert point values (3, 3, 3)") + c.Check(kv.ErrKeyExists.Equal(err1), IsTrue) + _, err1 = tk2.Exec("insert point values (4, 4, 4)") + c.Check(kv.ErrKeyExists.Equal(err1), IsTrue) + tk2.MustExec("rollback") + <-syncCh + }() + time.Sleep(time.Millisecond * 10) + tk.MustExec("insert point values (3, 3, 3)") + tk.MustExec("insert point values (4, 4, 4)") + tk.MustExec("commit") + syncCh <- struct{}{} +}