Skip to content

Commit

Permalink
executor: locks key in point get executor for pessimistic transaction (
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and jackysp committed Jul 1, 2019
1 parent 4a6ae82 commit d8ffce9
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 9 deletions.
3 changes: 3 additions & 0 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ id count task operator info
Projection_3 10000.00 root or(NULL, gt(test.t.a, 1))
└─TableReader_5 10000.00 root data:TableScan_4
└─TableScan_4 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
explain select * from t where a = 1 for update;
id count task operator info
Point_Get_1 1.00 root table:t, handle:1
drop table if exists ta, tb;
create table ta (a varchar(20));
create table tb (a varchar(20));
Expand Down
1 change: 1 addition & 0 deletions cmd/explaintest/t/explain_easy.test
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ drop table if exists t;
create table t(a bigint primary key);
explain select * from t where a = 1 and a = 2;
explain select null or a > 1 from t;
explain select * from t where a = 1 for update;

drop table if exists ta, tb;
create table ta (a varchar(20));
Expand Down
16 changes: 9 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,12 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
for {
_, err = a.handleNoDelayExecutor(ctx, e)
if err != nil {
return err
// It is possible the DML has point get plan that locks the key.
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return err
}
continue
}
keys, err1 := txn.(pessimisticTxn).KeysNeedToLock()
if err1 != nil {
Expand All @@ -412,21 +417,18 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
if err == nil {
return nil
}
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return err
}
if e == nil {
return nil
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
if err == nil {
return nil, nil
}
txnCtx := a.Ctx.GetSessionVars().TxnCtx
var newForUpdateTS uint64
if deadlock, ok := errors.Cause(err).(*tikv.ErrDeadlock); ok {
Expand Down
20 changes: 19 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
lock: p.Lock,
}
b.isSelectForUpdate = p.IsForUpdate
e.base().initCap = 1
e.base().maxChunkSize = 1
return e
Expand All @@ -60,6 +62,7 @@ type PointGetExecutor struct {
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err1
}
if len(handleVal) == 0 {
return nil
return e.lockKeyIfNeeded(ctx, idxKey)
}
e.handle, err1 = tables.DecodeHandle(handleVal)
if err1 != nil {
Expand All @@ -122,6 +125,10 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
}
err = e.lockKeyIfNeeded(ctx, key)
if err != nil {
return err
}
if len(val) == 0 {
if e.idxInfo != nil {
return kv.ErrNotExist.GenWithStack("inconsistent extra index %s, handle %d not found in table",
Expand All @@ -132,6 +139,17 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return e.decodeRowValToChunk(val, req)
}

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
return txn.LockKeys(ctx, e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS(), kv.Key(key))
}
return nil
}

func (e *PointGetExecutor) encodeIndexKey() (_ []byte, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
for i := range e.idxVals {
Expand Down
14 changes: 13 additions & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PointGetPlan struct {
expr expression.Expression
ctx sessionctx.Context
IsTableDual bool
Lock bool
IsForUpdate bool
}

type nameValuePair struct {
Expand Down Expand Up @@ -141,6 +143,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
tableDual.SetSchema(fp.Schema())
return tableDual.Init(ctx, &property.StatsInfo{})
}
if x.LockTp == ast.SelectLockForUpdate {
fp.Lock = true
fp.IsForUpdate = true
}
return fp
}
case *ast.UpdateStmt:
Expand All @@ -159,7 +165,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
// 3. All the columns must be public and generated.
// 4. The condition is an access path that the range is a unique key.
func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetPlan {
if selStmt.Having != nil || selStmt.LockTp != ast.SelectLockNone {
if selStmt.Having != nil {
return nil
} else if selStmt.Limit != nil {
count, offset, err := extractLimitCountOffset(ctx, selStmt.Limit)
Expand Down Expand Up @@ -452,6 +458,9 @@ func tryUpdatePointPlan(ctx sessionctx.Context, updateStmt *ast.UpdateStmt) Plan
if fastSelect.IsTableDual {
return PhysicalTableDual{}.Init(ctx, &property.StatsInfo{})
}
if ctx.GetSessionVars().TxnCtx.IsPessimistic {
fastSelect.Lock = true
}
orderedList := buildOrderedList(ctx, fastSelect, updateStmt.List)
if orderedList == nil {
return nil
Expand Down Expand Up @@ -512,6 +521,9 @@ func tryDeletePointPlan(ctx sessionctx.Context, delStmt *ast.DeleteStmt) Plan {
if fastSelect.IsTableDual {
return PhysicalTableDual{}.Init(ctx, &property.StatsInfo{})
}
if ctx.GetSessionVars().TxnCtx.IsPessimistic {
fastSelect.Lock = true
}
delPlan := Delete{
SelectPlan: fastSelect,
}.Init(ctx)
Expand Down
44 changes: 44 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,47 @@ func (s *testPessimisticSuite) TestInsertOnDup(c *C) {
tk.MustExec("commit")
tk.MustQuery("select * from dup").Check(testkit.Rows("1 2"))
}

func (s *testPessimisticSuite) TestPointGetKeyLock(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists point")
tk.MustExec("create table point (id int primary key, u int unique, c int)")
syncCh := make(chan struct{})

tk.MustExec("begin pessimistic")
tk.MustExec("update point set c = c + 1 where id = 1")
tk.MustExec("delete from point where u = 2")
go func() {
tk2.MustExec("begin pessimistic")
_, err1 := tk2.Exec("insert point values (1, 1, 1)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
_, err1 = tk2.Exec("insert point values (2, 2, 2)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
tk2.MustExec("rollback")
<-syncCh
}()
time.Sleep(time.Millisecond * 10)
tk.MustExec("insert point values (1, 1, 1)")
tk.MustExec("insert point values (2, 2, 2)")
tk.MustExec("commit")
syncCh <- struct{}{}

tk.MustExec("begin pessimistic")
tk.MustExec("select * from point where id = 3 for update")
tk.MustExec("select * from point where u = 4 for update")
go func() {
tk2.MustExec("begin pessimistic")
_, err1 := tk2.Exec("insert point values (3, 3, 3)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
_, err1 = tk2.Exec("insert point values (4, 4, 4)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
tk2.MustExec("rollback")
<-syncCh
}()
time.Sleep(time.Millisecond * 10)
tk.MustExec("insert point values (3, 3, 3)")
tk.MustExec("insert point values (4, 4, 4)")
tk.MustExec("commit")
syncCh <- struct{}{}
}

0 comments on commit d8ffce9

Please sign in to comment.