Skip to content

Commit

Permalink
table: Add PessimisticLazyDupKeyCheckMode to determine lazy mode in…
Browse files Browse the repository at this point in the history
… pessimistic txn (#55360)

close #54397
  • Loading branch information
lcwangchao authored Aug 14, 2024
1 parent bab3667 commit 0e47f9a
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 22 deletions.
16 changes: 16 additions & 0 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,22 @@ func optimizeDupKeyCheckForNormalInsert(vars *variable.SessionVars, txn kv.Trans
return table.DupKeyCheckInPlace
}

// getPessimisticLazyCheckMode returns the lazy check mode for pessimistic txn.
// The returned `PessimisticLazyDupKeyCheckMode` only takes effect for pessimistic txn with `DupKeyCheckLazy`;
// otherwise, this option will be ignored.
func getPessimisticLazyCheckMode(vars *variable.SessionVars) table.PessimisticLazyDupKeyCheckMode {
if !vars.ConstraintCheckInPlacePessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 {
// We can postpone the duplicated key check to the prewrite stage when both of the following conditions are met:
// - `tidb_constraint_check_in_place_pessimistic='OFF'`.
// - The current transaction should be an explicit transaction because an autocommit txn cannot get
// any benefits from checking the duplicated key in the prewrite stage.
// - The current connection is a user connection, and we always check duplicated key in place for
// internal connections.
return table.DupKeyCheckInPrewrite
}
return table.DupKeyCheckInAcquireLock
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,10 +1412,11 @@ func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int, dupKeyCheck table.DupKeyCheckMode,
) (err error) {
vars := e.Ctx().GetSessionVars()
pessimisticLazyCheck := getPessimisticLazyCheckMode(vars)
if reserveAutoIDCount > 0 {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck)
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck, pessimisticLazyCheck)
} else {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck)
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck, pessimisticLazyCheck)
}
if err != nil {
return err
Expand Down
22 changes: 12 additions & 10 deletions pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func updateRecord(
r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord")
defer r.End()

sc := sctx.GetSessionVars().StmtCtx
sessVars := sctx.GetSessionVars()
sc := sessVars.StmtCtx
changed, handleChanged := false, false
// onUpdateSpecified is for "UPDATE SET ts_field = old_value", the
// timestamp field is explicitly set, but not changed in fact.
Expand Down Expand Up @@ -130,11 +131,11 @@ func updateRecord(
// If no changes, nothing to do, return directly.
if !changed {
// See https://dev.mysql.com/doc/refman/5.7/en/mysql-real-connect.html CLIENT_FOUND_ROWS
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
if sessVars.ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}
keySet := lockRowKey
if sctx.GetSessionVars().LockUnchangedKeys {
if sessVars.LockUnchangedKeys {
keySet |= lockUniqueKeys
}
_, err := addUnchangedKeysForLockByRow(sctx, t, h, oldData, keySet)
Expand All @@ -161,6 +162,7 @@ func updateRecord(
}
}

pessimisticLazyCheck := getPessimisticLazyCheckMode(sessVars)
// If handle changed, remove the old then add the new record, otherwise update the record.
if handleChanged {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
Expand All @@ -179,40 +181,40 @@ func updateRecord(
return false, err
}

_, err = t.AddRecord(sctx.GetTableCtx(), newData, table.IsUpdate, table.WithCtx(ctx), dupKeyMode)
_, err = t.AddRecord(sctx.GetTableCtx(), newData, table.IsUpdate, table.WithCtx(ctx), dupKeyMode, pessimisticLazyCheck)
if err != nil {
return false, err
}
memBuffer.Release(sh)
return true, nil
}(); err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) {
ec := sctx.GetSessionVars().StmtCtx.ErrCtx()
ec := sc.ErrCtx()
return false, ec.HandleError(err)
}
return updated, err
}
} else {
var opts []table.UpdateRecordOption
if sctx.GetSessionVars().InTxn() || sc.InHandleForeignKeyTrigger || sc.ForeignKeyTriggerCtx.HasFKCascades {
if sessVars.InTxn() || sc.InHandleForeignKeyTrigger || sc.ForeignKeyTriggerCtx.HasFKCascades {
// If txn is auto commit and index is untouched, no need to write index value.
// If InHandleForeignKeyTrigger or ForeignKeyTriggerCtx.HasFKCascades is true indicate we may have
// foreign key cascade need to handle later, then we still need to write index value,
// otherwise, the later foreign cascade executor may see data-index inconsistency in txn-mem-buffer.
opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode}
opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode, pessimisticLazyCheck}
} else {
opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode, table.SkipWriteUntouchedIndices}
opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode, pessimisticLazyCheck, table.SkipWriteUntouchedIndices}
}

// Update record to new value and update index.
if err := t.UpdateRecord(sctx.GetTableCtx(), h, oldData, newData, modified, opts...); err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) {
ec := sctx.GetSessionVars().StmtCtx.ErrCtx()
ec := sc.ErrCtx()
return false, ec.HandleError(err)
}
return false, err
}
if sctx.GetSessionVars().LockUnchangedKeys {
if sessVars.LockUnchangedKeys {
// Lock unique keys when handle unchanged
if _, err := addUnchangedKeysForLockByRow(sctx, t, h, oldData, lockUniqueKeys); err != nil {
return false, err
Expand Down
2 changes: 0 additions & 2 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ type MutateContext interface {
AllocatorContext
// GetExprCtx returns the context to build or evaluate expressions
GetExprCtx() exprctx.ExprContext
// GetSessionVars returns the session variables.
GetSessionVars() *variable.SessionVars
// Txn returns the current transaction which is created before executing a statement.
// The returned kv.Transaction is not nil, but it maybe pending or invalid.
// If the active parameter is true, call this function will wait for the pending txn
Expand Down
39 changes: 37 additions & 2 deletions pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more b

// commonMutateOpt is the common options for mutating a table.
type commonMutateOpt struct {
ctx context.Context
dupKeyCheck DupKeyCheckMode
ctx context.Context
dupKeyCheck DupKeyCheckMode
pessimisticLazyDupKeyCheck PessimisticLazyDupKeyCheckMode
}

// Ctx returns the go context in the option
Expand All @@ -135,6 +136,11 @@ func (opt *commonMutateOpt) DupKeyCheck() DupKeyCheckMode {
return opt.dupKeyCheck
}

// PessimisticLazyDupKeyCheck returns the PessimisticLazyDupKeyCheckMode in the option
func (opt *commonMutateOpt) PessimisticLazyDupKeyCheck() PessimisticLazyDupKeyCheckMode {
return opt.pessimisticLazyDupKeyCheck
}

// AddRecordOpt contains the options will be used when adding a record.
type AddRecordOpt struct {
commonMutateOpt
Expand Down Expand Up @@ -296,6 +302,35 @@ func (m DupKeyCheckMode) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.dupKeyCheck = m
}

// PessimisticLazyDupKeyCheckMode only takes effect for pessimistic transaction
// when `DupKeyCheckMode` is set to `DupKeyCheckLazy`.
// It indicates how to check the duplicated key in store.
type PessimisticLazyDupKeyCheckMode uint8

const (
// DupKeyCheckInAcquireLock indicates to check the duplicated key when acquiring the pessimistic lock.
DupKeyCheckInAcquireLock PessimisticLazyDupKeyCheckMode = iota
// DupKeyCheckInPrewrite indicates to check the duplicated key in the prewrite step when committing.
// Please notice that if it is used, the duplicated key error may not be returned immediately after each statement,
// because the duplicated key is not checked when acquiring the pessimistic lock.
DupKeyCheckInPrewrite
)

// applyAddRecordOpt implements the AddRecordOption interface.
func (m PessimisticLazyDupKeyCheckMode) applyAddRecordOpt(opt *AddRecordOpt) {
opt.pessimisticLazyDupKeyCheck = m
}

// applyUpdateRecordOpt implements the UpdateRecordOption interface.
func (m PessimisticLazyDupKeyCheckMode) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.pessimisticLazyDupKeyCheck = m
}

// applyCreateIdxOpt implements the CreateIdxOption interface.
func (m PessimisticLazyDupKeyCheckMode) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.pessimisticLazyDupKeyCheck = m
}

type columnAPI interface {
// Cols returns the columns of the table which is used in select, including hidden columns.
Cols() []*Column
Expand Down
4 changes: 1 addition & 3 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
} else {
ctx = context.TODO()
}
vars := sctx.GetSessionVars()
writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs()
skipCheck := opt.DupKeyCheck() == table.DupKeyCheckSkip
evalCtx := sctx.GetExprCtx().GetEvalCtx()
Expand Down Expand Up @@ -323,8 +322,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
if needPresumeNotExists {
flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists}
}
if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() &&
!vars.InRestrictedSQL && vars.ConnectionID > 0 {
if opt.PessimisticLazyDupKeyCheck() == table.DupKeyCheckInPrewrite && txn.IsPessimistic() {
flags = append(flags, kv.SetNeedConstraintCheckInPrewrite)
}
err = txn.GetMemBuffer().SetWithFlags(key, val, flags...)
Expand Down
4 changes: 1 addition & 3 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,6 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, r []types.Datum, opt *
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

sessVars := sctx.GetSessionVars()
for _, col := range t.Columns {
var value types.Datum
if col.State == model.StateDeleteOnly || col.State == model.StateDeleteReorganization {
Expand Down Expand Up @@ -918,8 +917,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, r []types.Datum, opt *
var flags []kv.FlagsOp
if setPresume {
flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists}
if !sessVars.ConstraintCheckInPlacePessimistic && sessVars.TxnCtx.IsPessimistic && sessVars.InTxn() &&
!sctx.InRestrictedSQL() && sctx.ConnectionID() > 0 {
if opt.PessimisticLazyDupKeyCheck() == table.DupKeyCheckInPrewrite && txn.IsPessimistic() {
flags = append(flags, kv.SetNeedConstraintCheckInPrewrite)
}
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,4 +1206,70 @@ func TestDupKeyCheckMode(t *testing.T) {
}
})
}

t.Run("PessimisticLazyMode", func(t *testing.T) {
defer tk.MustExec("rollback")
// DupKeyCheckInAcquireLock should not add flagNeedConstraintCheckInPrewrite
memBuffer := prepareTxn("pessimistic").GetMemBuffer()
h := expectAddRecordSucc(types.MakeDatums(1, 2, 3), table.DupKeyCheckLazy, table.DupKeyCheckInAcquireLock)
flags := getHandleFlags(h, memBuffer)
require.True(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
flags = getUniqueKeyFlags(h, types.NewIntDatum(2), memBuffer)
require.True(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
tk.MustExec("rollback")

// DupKeyCheckInPrewrite should add flagNeedConstraintCheckInPrewrite
memBuffer = prepareTxn("pessimistic").GetMemBuffer()
h = expectAddRecordSucc(types.MakeDatums(11, 12, 13), table.DupKeyCheckLazy, table.DupKeyCheckInPrewrite)
flags = getHandleFlags(h, memBuffer)
require.True(t, flags.HasPresumeKeyNotExists())
require.True(t, flags.HasNeedConstraintCheckInPrewrite())
flags = getUniqueKeyFlags(h, types.NewIntDatum(12), memBuffer)
require.True(t, flags.HasPresumeKeyNotExists())
require.True(t, flags.HasNeedConstraintCheckInPrewrite())
tk.MustExec("rollback")

// DupKeyCheckInPrewrite should not add flagNeedConstraintCheckInPrewrite for deleted rows
memBuffer = prepareTxn("pessimistic").GetMemBuffer()
tk.MustExec("delete from t where a=1")
h = expectAddRecordSucc(types.MakeDatums(1, 2, 3), table.DupKeyCheckLazy, table.DupKeyCheckInPrewrite)
flags = getHandleFlags(h, memBuffer)
require.False(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
flags = getUniqueKeyFlags(h, types.NewIntDatum(2), memBuffer)
require.False(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
tk.MustExec("rollback")

// PessimisticLazyDupKeyCheckMode can only work with DupKeyCheckLazy
memBuffer = prepareTxn("pessimistic").GetMemBuffer()
h = expectAddRecordSucc(types.MakeDatums(101, 102, 103), table.DupKeyCheckSkip, table.DupKeyCheckInPrewrite)
flags = getHandleFlags(h, memBuffer)
require.False(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
flags = getUniqueKeyFlags(h, types.NewIntDatum(102), memBuffer)
require.False(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
h = expectAddRecordSucc(types.MakeDatums(201, 202, 203), table.DupKeyCheckInPlace, table.DupKeyCheckInPrewrite)
flags = getHandleFlags(h, memBuffer)
require.False(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
flags = getUniqueKeyFlags(h, types.NewIntDatum(202), memBuffer)
require.False(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
tk.MustExec("rollback")

// optimistic mode should ignore PessimisticLazyDupKeyCheckMode
memBuffer = prepareTxn("optimistic").GetMemBuffer()
h = expectAddRecordSucc(types.MakeDatums(1, 2, 3), table.DupKeyCheckLazy, table.DupKeyCheckInPrewrite)
flags = getHandleFlags(h, memBuffer)
require.True(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
flags = getUniqueKeyFlags(h, types.NewIntDatum(2), memBuffer)
require.True(t, flags.HasPresumeKeyNotExists())
require.False(t, flags.HasNeedConstraintCheckInPrewrite())
tk.MustExec("rollback")
})
}

0 comments on commit 0e47f9a

Please sign in to comment.