Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#42210
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
zyguan authored and ti-chi-bot committed Mar 15, 2023
1 parent daf2b17 commit 195da4d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 17 deletions.
48 changes: 48 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,10 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedRowKey(r.handleKey.newKey)
}
continue
}
} else if !kv.IsErrNotFound(err) {
Expand All @@ -1138,12 +1142,45 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
if err == nil {
<<<<<<< HEAD
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
skip = true
break
}
if !kv.IsErrNotFound(err) {
=======
if replace {
_, handle, err := tables.FetchDuplicatedHandle(
ctx,
uk.newKey,
true,
txn,
e.Table.Meta().ID,
uk.commonHandle,
)
if err != nil {
return err
}
if handle == nil {
continue
}
_, err = e.removeRow(ctx, txn, handle, r, true)
if err != nil {
return err
}
} else {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
}
skip = true
break
}
} else if !kv.IsErrNotFound(err) {
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
return err
}
}
Expand Down Expand Up @@ -1188,7 +1225,18 @@ func (e *InsertValues) removeRow(ctx context.Context, txn kv.Transaction, r toBe
return err
}
if identical {
<<<<<<< HEAD
return nil
=======
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return false, err
}
return true, nil
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
}

err = r.t.RemoveRecord(e.ctx, handle, oldRow)
Expand Down
66 changes: 66 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,3 +2017,69 @@ func TestIssue32213(t *testing.T) {
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999"))
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
}
}
36 changes: 20 additions & 16 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, nil
_, err := appendUnchangedRowForLock(sctx, t, h, oldData)
return false, err
}

// Fill values into on-update-now fields, only if they are really changed.
Expand Down Expand Up @@ -226,6 +212,24 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
return true, nil
}

func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return false, nil
}
physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, row)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx.AddUnchangedRowKey(unchangedRowKey)
return true, nil
}

func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error {
tableInfo := t.Meta()
if !tableInfo.ContainsAutoRandomBits() {
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func TestOptimisticConflicts(t *testing.T) {
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")
tk.MustExec("select * from conflict where id in (1, 2, 3)")

tk2.MustExec("update conflict set c = c - 1")

Expand Down

0 comments on commit 195da4d

Please sign in to comment.