From eea350aa91d86b2de9ea3c06e8de7a5f43127bd3 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 20 May 2021 18:01:51 +0800 Subject: [PATCH 01/11] Support replacing whole struct --- session/txn.go | 104 +++++++++++++++++++----------------- session/txninfo/txn_info.go | 49 ++++++++++++++--- 2 files changed, 97 insertions(+), 56 deletions(-) diff --git a/session/txn.go b/session/txn.go index 294725f8efaa0..81480e68f1bfc 100644 --- a/session/txn.go +++ b/session/txn.go @@ -64,15 +64,18 @@ type LazyTxn struct { // we need these fields because kv.Transaction provides no thread safety promise // but we hope getting TxnInfo is a thread safe op - infoStartTS uint64 - // current executing state - State txninfo.TxnRunningState - // last trying to block start time - blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil - // how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() - EntriesCount uint64 - // how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() - EntriesSize uint64 + atomicTxnInfo unsafe.Pointer + txnInfo *txninfo.TxnInfo + + //infoStartTS uint64 + //// current executing state + //State txninfo.TxnRunningState + //// last trying to block start time + //blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil + //// how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() + //EntriesCount uint64 + //// how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() + //EntriesSize uint64 } // GetTableInfo returns the cached index name. @@ -87,9 +90,10 @@ func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) - atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) - atomic.StoreUint64(&txn.EntriesCount, 0) - atomic.StoreUint64(&txn.EntriesSize, 0) + txn.txnInfo = &txninfo.TxnInfo{ + State: txninfo.TxnRunningNormal, + } + txn.storeTxnInfo(txn.txnInfo) } func (txn *LazyTxn) initStmtBuf() { @@ -125,8 +129,27 @@ func (txn *LazyTxn) cleanupStmtBuf() { buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) +} + +func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) { + atomic.StorePointer(&txn.atomicTxnInfo, unsafe.Pointer(info)) +} + +func (txn *LazyTxn) recreateTxnInfo(startTS uint64, state txninfo.TxnRunningState, entriesCount, entriesSize uint64) { + info := &txninfo.TxnInfo{ + StartTS: startTS, + State: state, + EntriesCount: entriesCount, + EntriesSize: entriesSize, + } + txn.txnInfo = info + txn.storeTxnInfo(info) +} + +func (txn *LazyTxn) loadTxnInfo() *txninfo.TxnInfo { + return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.atomicTxnInfo)) } // Size implements the MemBuffer interface. @@ -182,20 +205,14 @@ func (txn *LazyTxn) GoString() string { func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { txn.Transaction = kvTxn - atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) - atomic.StoreUint64(&txn.infoStartTS, kvTxn.StartTS()) txn.initStmtBuf() - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + txn.recreateTxnInfo(kvTxn.StartTS(), txninfo.TxnRunningNormal, uint64(txn.Transaction.Len()), uint64(txn.Transaction.Size())) txn.txnFuture = nil } func (txn *LazyTxn) changeInvalidToPending(future *txnFuture) { txn.Transaction = nil txn.txnFuture = future - atomic.StoreUint64(&txn.infoStartTS, 0) - atomic.StoreUint64(&txn.EntriesCount, uint64(0)) - atomic.StoreUint64(&txn.EntriesSize, uint64(0)) } func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { @@ -213,11 +230,9 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { return err } txn.Transaction = t - atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) - atomic.StoreUint64(&txn.infoStartTS, t.StartTS()) txn.initStmtBuf() - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + + txn.recreateTxnInfo(t.StartTS(), txninfo.TxnRunningNormal, uint64(txn.Transaction.Len()), uint64(txn.Transaction.Size())) return nil } @@ -228,9 +243,8 @@ func (txn *LazyTxn) changeToInvalid() { txn.stagingHandle = kv.InvalidStagingHandle txn.Transaction = nil txn.txnFuture = nil - atomic.StoreUint64(&txn.infoStartTS, 0) - atomic.StoreUint64(&txn.EntriesCount, 0) - atomic.StoreUint64(&txn.EntriesSize, 0) + + txn.recreateTxnInfo(0, txninfo.TxnRunningNormal, 0, 0) } var hasMockAutoIncIDRetry = int64(0) @@ -270,7 +284,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } - atomic.StoreInt32(&txn.State, txninfo.TxnCommitting) + atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnCommitting) failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) @@ -302,7 +316,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { // Rollback overrides the Transaction interface. func (txn *LazyTxn) Rollback() error { defer txn.reset() - atomic.StoreInt32(&txn.State, txninfo.TxnRollingBack) + atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnRollingBack) // mockSlowRollback is used to mock a rollback which takes a long time failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() @@ -310,15 +324,14 @@ func (txn *LazyTxn) Rollback() error { // LockKeys Wrap the inner transaction's `LockKeys` to record the status func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { - originState := atomic.LoadInt32(&txn.State) - atomic.StoreInt32(&txn.State, txninfo.TxnLockWaiting) + originState := atomic.SwapInt32(&txn.txnInfo.State, txninfo.TxnLockWaiting) t := time.Now() - atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(&t)) + atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(&t)) err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) - atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(nil)) - atomic.StoreInt32(&txn.State, originState) - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txn.txnInfo.State, originState) + atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) return err } @@ -377,17 +390,8 @@ func keyNeedToLock(k, v []byte, flags tikvstore.KeyFlags) bool { // Info dump the TxnState to Datum for displaying in `TIDB_TRX` // This function is supposed to be thread safe func (txn *LazyTxn) Info() *txninfo.TxnInfo { - startTs := atomic.LoadUint64(&txn.infoStartTS) - if startTs == 0 { - return nil - } - return &txninfo.TxnInfo{ - StartTS: startTs, - State: atomic.LoadInt32(&txn.State), - BlockStartTime: (*time.Time)(atomic.LoadPointer(&txn.blockStartTime)), - EntriesCount: atomic.LoadUint64(&txn.EntriesCount), - EntriesSize: atomic.LoadUint64(&txn.EntriesSize), - } + info := txn.loadTxnInfo() + return info.Clone() } // UpdateEntriesCountAndSize updates the EntriesCount and EntriesSize @@ -395,8 +399,8 @@ func (txn *LazyTxn) Info() *txninfo.TxnInfo { // txn.Transaction can be changed during this function's execution if running parallel. func (txn *LazyTxn) UpdateEntriesCountAndSize() { if txn.Valid() { - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) } } diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 77a2d8c90cd05..63cb258bc5a09 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -14,7 +14,9 @@ package txninfo import ( + "sync/atomic" "time" + "unsafe" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/store/tikv/oracle" @@ -43,13 +45,18 @@ var TxnRunningStateStrs = []string{ // TxnInfo is information about a running transaction // This is supposed to be the datasource of `TIDB_TRX` in infoschema type TxnInfo struct { + // The following fields are immutable and can be safely read across threads. + StartTS uint64 - // digest of SQL current running + // digest of SQL currently running CurrentSQLDigest string // current executing State + + // The following fields are mutable and needs atomic read for goroutines other than the transaction itself. + State TxnRunningState - // last trying to block start time - BlockStartTime *time.Time + // last trying to block start time. Invalid if State is not TxnLockWaiting. It's an unsafe pointer to time.Time or nil. + BlockStartTime unsafe.Pointer // How many entries are in MemDB EntriesCount uint64 // MemDB used memory @@ -65,14 +72,44 @@ type TxnInfo struct { CurrentDB string } +// UnsafeClone clones the TxnInfo while assuming there are no concurrent write on it. +func (info *TxnInfo) UnsafeClone() *TxnInfo { + return &TxnInfo{ + StartTS: info.StartTS, + CurrentSQLDigest: info.CurrentSQLDigest, + State: info.State, + BlockStartTime: info.BlockStartTime, + EntriesCount: info.EntriesCount, + EntriesSize: info.EntriesSize, + ConnectionID: info.ConnectionID, + Username: info.Username, + CurrentDB: info.CurrentDB, + } +} + +// Clone clones the TxnInfo. It's safe to call concurrently with the transaction. +func (info *TxnInfo) Clone() *TxnInfo { + return &TxnInfo{ + StartTS: info.StartTS, + CurrentSQLDigest: info.CurrentSQLDigest, + State: atomic.LoadInt32(&info.State), + BlockStartTime: atomic.LoadPointer(&info.BlockStartTime), + EntriesCount: atomic.LoadUint64(&info.EntriesCount), + EntriesSize: atomic.LoadUint64(&info.EntriesSize), + ConnectionID: info.ConnectionID, + Username: info.Username, + CurrentDB: info.CurrentDB, + } +} + // ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table func (info *TxnInfo) ToDatum() []types.Datum { humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) var blockStartTime interface{} - if info.BlockStartTime == nil { + if t := (*time.Time)(atomic.LoadPointer(&info.BlockStartTime)); t == nil { blockStartTime = nil } else { - blockStartTime = types.NewTime(types.FromGoTime(*info.BlockStartTime), mysql.TypeTimestamp, 0) + blockStartTime = types.NewTime(types.FromGoTime(*t), mysql.TypeTimestamp, types.MaxFsp) } e, err := types.ParseEnumValue(TxnRunningStateStrs, uint64(info.State+1)) if err != nil { @@ -81,7 +118,7 @@ func (info *TxnInfo) ToDatum() []types.Datum { state := types.NewMysqlEnumDatum(e) datums := types.MakeDatums( info.StartTS, - types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, 0), + types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, types.MaxFsp), info.CurrentSQLDigest, ) datums = append(datums, state) From b1b74221e9b67cc5677ec04421fcdfa36180adfb Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 20 May 2021 21:27:07 +0800 Subject: [PATCH 02/11] fix column def Signed-off-by: MyonKeminta --- infoschema/tables.go | 6 +++--- infoschema/tables_test.go | 20 ++++++++++++++++---- session/session.go | 4 +++- session/txn.go | 31 ++++++++++++++++++++++++++++--- session/txninfo/txn_info.go | 30 +++++++++++++----------------- 5 files changed, 63 insertions(+), 28 deletions(-) diff --git a/infoschema/tables.go b/infoschema/tables.go index 40451046fe8ec..6983ea0c53bbb 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1346,10 +1346,10 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{ var tableTiDBTrxCols = []columnInfo{ {name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag}, - {name: "START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Start time of the transaction"}, + {name: "START_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "Start time of the transaction"}, {name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, {name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"}, - {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Current lock waiting's start time"}, + {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "Current lock waiting's start time"}, {name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"}, {name: "SIZE", tp: mysql.TypeLonglong, size: 64, comment: "MemDB used memory"}, {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, @@ -1358,7 +1358,7 @@ var tableTiDBTrxCols = []columnInfo{ } var tableDeadlocksCols = []columnInfo{ - {name: "DEADLOCK_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The ID to dinstinguish different deadlock events"}, + {name: "DEADLOCK_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The ID to distinguish different deadlock events"}, {name: "OCCUR_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "The physical time when the deadlock occurs"}, {name: "RETRYABLE", tp: mysql.TypeTiny, size: 1, flag: mysql.NotNullFlag, comment: "Whether the deadlock is retryable. Retryable deadlocks are usually not reported to the client"}, {name: "TRY_LOCK_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's trying to acquire the lock"}, diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 2d6506b56d5f4..bdfbb56eaf1fd 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -23,6 +23,7 @@ import ( "runtime" "strings" "time" + "unsafe" "github.com/gorilla/mux" . "github.com/pingcap/check" @@ -1514,7 +1515,7 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) { func (s *testTableSuite) TestTrx(c *C) { tk := s.newTestKitWithRoot(c) _, digest := parser.NormalizeDigest("select * from trx for update;") - sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} + sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 2)} sm.txnInfo[0] = &txninfo.TxnInfo{ StartTS: 424768545227014155, CurrentSQLDigest: digest, @@ -1526,10 +1527,21 @@ func (s *testTableSuite) TestTrx(c *C) { Username: "root", CurrentDB: "test", } + blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.UTC) + sm.txnInfo[1] = &txninfo.TxnInfo{ + StartTS: 425070846483628033, + CurrentSQLDigest: "", + AllSQLDigests: nil, + State: txninfo.TxnLockWaiting, + BlockStartTime: unsafe.Pointer(&blockTime2), + ConnectionID: 10, + Username: "root", + CurrentDB: "test", + } tk.Se.SetSessionManager(sm) - tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( - testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest + " Normal 1 19 2 root test"), - ) + tk.MustQuery("select * from information_schema.TIDB_TRX;").Check(testkit.Rows( + "424768545227014155 2021-05-07 12:56:48.001000 " + digest + " Normal 1 19 2 root test", + "425070846483628033 2021-05-20 13:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 2 root test")) } func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) { diff --git a/session/session.go b/session/session.go index f116daf96dd04..b43dd91b3ee8e 100644 --- a/session/session.go +++ b/session/session.go @@ -450,7 +450,6 @@ func (s *session) TxnInfo() *txninfo.TxnInfo { return nil } processInfo := s.ShowProcess() - txnInfo.CurrentSQLDigest = processInfo.Digest txnInfo.ConnectionID = processInfo.ID txnInfo.Username = processInfo.User txnInfo.CurrentDB = processInfo.DB @@ -1499,6 +1498,9 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex // Uncorrelated subqueries will execute once when building plan, so we reset process info before building plan. cmd32 := atomic.LoadUint32(&s.GetSessionVars().CommandValue) s.SetProcessInfo(stmtNode.Text(), time.Now(), byte(cmd32), 0) + _, digest := s.sessionVars.StmtCtx.SQLDigest() + s.txn.onStmtStart(digest) + defer s.txn.onStmtEnd() // Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). compiler := executor.Compiler{Ctx: s} diff --git a/session/txn.go b/session/txn.go index 81480e68f1bfc..e6ac9e06949ca 100644 --- a/session/txn.go +++ b/session/txn.go @@ -134,6 +134,7 @@ func (txn *LazyTxn) cleanupStmtBuf() { } func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) { + txn.txnInfo = info atomic.StorePointer(&txn.atomicTxnInfo, unsafe.Pointer(info)) } @@ -144,7 +145,6 @@ func (txn *LazyTxn) recreateTxnInfo(startTS uint64, state txninfo.TxnRunningStat EntriesCount: entriesCount, EntriesSize: entriesSize, } - txn.txnInfo = info txn.storeTxnInfo(info) } @@ -247,6 +247,28 @@ func (txn *LazyTxn) changeToInvalid() { txn.recreateTxnInfo(0, txninfo.TxnRunningNormal, 0, 0) } +func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { + if len(currentSQLDigest) == 0 { + return + } + + info := txn.txnInfo.Clone() + info.CurrentSQLDigest = currentSQLDigest + // Keeps at most 50 history sqls to avoid consuming too much memory. + const maxTransactionStmtHistory int = 50 + if len(info.AllSQLDigests) < maxTransactionStmtHistory { + info.AllSQLDigests = append(info.AllSQLDigests, currentSQLDigest) + } + + txn.storeTxnInfo(info) +} + +func (txn *LazyTxn) onStmtEnd() { + info := txn.txnInfo.Clone() + info.CurrentSQLDigest = "" + txn.storeTxnInfo(info) +} + var hasMockAutoIncIDRetry = int64(0) func enableMockAutoIncIDRetry() { @@ -390,8 +412,11 @@ func keyNeedToLock(k, v []byte, flags tikvstore.KeyFlags) bool { // Info dump the TxnState to Datum for displaying in `TIDB_TRX` // This function is supposed to be thread safe func (txn *LazyTxn) Info() *txninfo.TxnInfo { - info := txn.loadTxnInfo() - return info.Clone() + info := txn.loadTxnInfo().Clone() + if info.StartTS == 0 { + return nil + } + return info } // UpdateEntriesCountAndSize updates the EntriesCount and EntriesSize diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 63cb258bc5a09..c0b3f8e352f39 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -50,10 +50,11 @@ type TxnInfo struct { StartTS uint64 // digest of SQL currently running CurrentSQLDigest string - // current executing State + AllSQLDigests []string // The following fields are mutable and needs atomic read for goroutines other than the transaction itself. + // current executing State State TxnRunningState // last trying to block start time. Invalid if State is not TxnLockWaiting. It's an unsafe pointer to time.Time or nil. BlockStartTime unsafe.Pointer @@ -72,26 +73,12 @@ type TxnInfo struct { CurrentDB string } -// UnsafeClone clones the TxnInfo while assuming there are no concurrent write on it. -func (info *TxnInfo) UnsafeClone() *TxnInfo { - return &TxnInfo{ - StartTS: info.StartTS, - CurrentSQLDigest: info.CurrentSQLDigest, - State: info.State, - BlockStartTime: info.BlockStartTime, - EntriesCount: info.EntriesCount, - EntriesSize: info.EntriesSize, - ConnectionID: info.ConnectionID, - Username: info.Username, - CurrentDB: info.CurrentDB, - } -} - // Clone clones the TxnInfo. It's safe to call concurrently with the transaction. func (info *TxnInfo) Clone() *TxnInfo { return &TxnInfo{ StartTS: info.StartTS, CurrentSQLDigest: info.CurrentSQLDigest, + AllSQLDigests: info.AllSQLDigests, State: atomic.LoadInt32(&info.State), BlockStartTime: atomic.LoadPointer(&info.BlockStartTime), EntriesCount: atomic.LoadUint64(&info.EntriesCount), @@ -105,21 +92,30 @@ func (info *TxnInfo) Clone() *TxnInfo { // ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table func (info *TxnInfo) ToDatum() []types.Datum { humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) + + var currentDigest interface{} + if len(info.CurrentSQLDigest) != 0 { + currentDigest = info.CurrentSQLDigest + } + var blockStartTime interface{} if t := (*time.Time)(atomic.LoadPointer(&info.BlockStartTime)); t == nil { blockStartTime = nil } else { blockStartTime = types.NewTime(types.FromGoTime(*t), mysql.TypeTimestamp, types.MaxFsp) } + e, err := types.ParseEnumValue(TxnRunningStateStrs, uint64(info.State+1)) if err != nil { panic("this should never happen") } + state := types.NewMysqlEnumDatum(e) + datums := types.MakeDatums( info.StartTS, types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, types.MaxFsp), - info.CurrentSQLDigest, + currentDigest, ) datums = append(datums, state) datums = append(datums, types.MakeDatums( From 0126dd90e491e3d18c0f025e30f6c9de3ec76dbf Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 21 May 2021 17:39:54 +0800 Subject: [PATCH 03/11] Fix tests --- infoschema/tables_test.go | 8 +++--- session/session_test.go | 50 ++++++++++++++++++++++++++++--------- session/txn.go | 44 ++++++++++++++++++++++++++------ session/txninfo/txn_info.go | 3 ++- 4 files changed, 80 insertions(+), 25 deletions(-) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index bdfbb56eaf1fd..85ef1043196bb 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1535,13 +1535,13 @@ func (s *testTableSuite) TestTrx(c *C) { State: txninfo.TxnLockWaiting, BlockStartTime: unsafe.Pointer(&blockTime2), ConnectionID: 10, - Username: "root", - CurrentDB: "test", + Username: "user1", + CurrentDB: "db1", } tk.Se.SetSessionManager(sm) tk.MustQuery("select * from information_schema.TIDB_TRX;").Check(testkit.Rows( - "424768545227014155 2021-05-07 12:56:48.001000 " + digest + " Normal 1 19 2 root test", - "425070846483628033 2021-05-20 13:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 2 root test")) + "424768545227014155 2021-05-07 04:56:48.001000 "+digest+" Normal 1 19 2 root test", + "425070846483628033 2021-05-20 13:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 10 user1 db1")) } func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) { diff --git a/session/session_test.go b/session/session_test.go index a6c7908237bca..52f445558e4e0 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "path" + "strconv" "strings" "sync" "sync/atomic" @@ -42,7 +43,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/session" - txninfo "github.com/pingcap/tidb/session/txninfo" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -82,7 +83,7 @@ var _ = SerialSuites(&testSessionSerialSuite{}) var _ = SerialSuites(&testBackupRestoreSuite{}) var _ = Suite(&testClusteredSuite{}) var _ = SerialSuites(&testClusteredSerialSuite{}) -var _ = SerialSuites(&testTxnStateSuite{}) +var _ = SerialSuites(&testTxnStateSerialSuite{}) type testSessionSuiteBase struct { cluster cluster.Cluster @@ -4376,33 +4377,56 @@ func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) { tk.MustQuery("select * from g_tmp").Check(testkit.Rows()) } -type testTxnStateSuite struct { +type testTxnStateSerialSuite struct { testSessionSuiteBase } -func (s *testTxnStateSuite) TestBasic(c *C) { +func (s *testTxnStateSerialSuite) TestBasic(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") tk.MustExec("insert into t(a) values (1);") info := tk.Se.TxnInfo() c.Assert(info, IsNil) + tk.MustExec("begin pessimistic;") - tk.MustExec("select * from t for update;") + startTSStr := tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string) + startTS, err := strconv.ParseUint(startTSStr, 10, 64) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + tk.MustExec("select * from t for update;") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) info = tk.Se.TxnInfo() _, expectedDigest := parser.NormalizeDigest("select * from t for update;") c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.State, Equals, txninfo.TxnLockWaiting) + c.Assert((*time.Time)(info.BlockStartTime), NotNil) + c.Assert(info.StartTS, Equals, startTS) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + + info = tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, "") c.Assert(info.State, Equals, txninfo.TxnRunningNormal) - c.Assert(info.BlockStartTime, IsNil) + c.Assert((*time.Time)(info.BlockStartTime), IsNil) + c.Assert(info.StartTS, Equals, startTS) + // len and size will be covered in TestLenAndSize c.Assert(info.ConnectionID, Equals, tk.Se.GetSessionVars().ConnectionID) c.Assert(info.Username, Equals, "") c.Assert(info.CurrentDB, Equals, "test") + c.Assert(info.StartTS, Equals, startTS) tk.MustExec("commit;") info = tk.Se.TxnInfo() c.Assert(info, IsNil) } -func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { +func (s *testTxnStateSerialSuite) TestEntriesCountAndSize(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") tk.MustExec("begin pessimistic;") @@ -4417,7 +4441,7 @@ func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { tk.MustExec("commit;") } -func (s *testTxnStateSuite) TestBlocked(c *C) { +func (s *testTxnStateSerialSuite) TestBlocked(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") @@ -4435,7 +4459,7 @@ func (s *testTxnStateSuite) TestBlocked(c *C) { tk.MustExec("commit;") } -func (s *testTxnStateSuite) TestCommitting(c *C) { +func (s *testTxnStateSerialSuite) TestCommitting(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") @@ -4447,8 +4471,10 @@ func (s *testTxnStateSuite) TestCommitting(c *C) { tk2.MustExec("begin pessimistic") c.Assert(tk2.Se.TxnInfo(), NotNil) tk2.MustExec("select * from t where a = 2 for update;") - failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)") - defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit"), IsNil) + }() tk2.MustExec("commit;") ch <- struct{}{} }() @@ -4458,7 +4484,7 @@ func (s *testTxnStateSuite) TestCommitting(c *C) { <-ch } -func (s *testTxnStateSuite) TestRollbacking(c *C) { +func (s *testTxnStateSerialSuite) TestRollbacking(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") tk.MustExec("insert into t(a) values (1), (2);") diff --git a/session/txn.go b/session/txn.go index e6ac9e06949ca..e4940cd1705c3 100644 --- a/session/txn.go +++ b/session/txn.go @@ -138,12 +138,21 @@ func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) { atomic.StorePointer(&txn.atomicTxnInfo, unsafe.Pointer(info)) } -func (txn *LazyTxn) recreateTxnInfo(startTS uint64, state txninfo.TxnRunningState, entriesCount, entriesSize uint64) { +func (txn *LazyTxn) recreateTxnInfo( + startTS uint64, + state txninfo.TxnRunningState, + entriesCount, + entriesSize uint64, + currentSQLDigest string, + allSQLDigests []string, +) { info := &txninfo.TxnInfo{ - StartTS: startTS, - State: state, - EntriesCount: entriesCount, - EntriesSize: entriesSize, + StartTS: startTS, + State: state, + EntriesCount: entriesCount, + EntriesSize: entriesSize, + CurrentSQLDigest: currentSQLDigest, + AllSQLDigests: allSQLDigests, } txn.storeTxnInfo(info) } @@ -206,7 +215,13 @@ func (txn *LazyTxn) GoString() string { func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { txn.Transaction = kvTxn txn.initStmtBuf() - txn.recreateTxnInfo(kvTxn.StartTS(), txninfo.TxnRunningNormal, uint64(txn.Transaction.Len()), uint64(txn.Transaction.Size())) + txn.recreateTxnInfo( + kvTxn.StartTS(), + txninfo.TxnRunningNormal, + uint64(txn.Transaction.Len()), + uint64(txn.Transaction.Size()), + "", + nil) txn.txnFuture = nil } @@ -232,7 +247,14 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { txn.Transaction = t txn.initStmtBuf() - txn.recreateTxnInfo(t.StartTS(), txninfo.TxnRunningNormal, uint64(txn.Transaction.Len()), uint64(txn.Transaction.Size())) + // The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them. + txn.recreateTxnInfo( + t.StartTS(), + txninfo.TxnRunningNormal, + uint64(txn.Transaction.Len()), + uint64(txn.Transaction.Size()), + txn.txnInfo.CurrentSQLDigest, + txn.txnInfo.AllSQLDigests) return nil } @@ -244,7 +266,13 @@ func (txn *LazyTxn) changeToInvalid() { txn.Transaction = nil txn.txnFuture = nil - txn.recreateTxnInfo(0, txninfo.TxnRunningNormal, 0, 0) + txn.recreateTxnInfo( + 0, + txninfo.TxnRunningNormal, + 0, + 0, + "", + nil) } func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index c0b3f8e352f39..55eabff2ae429 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -91,7 +91,8 @@ func (info *TxnInfo) Clone() *TxnInfo { // ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table func (info *TxnInfo) ToDatum() []types.Datum { - humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) + // TODO: The timezone represented to the user is not correct and it will be always UTC time. + humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6).UTC() var currentDigest interface{} if len(info.CurrentSQLDigest) != 0 { From cee475e8ae2cbf91354699ac381f89938e72754c Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 24 May 2021 14:55:45 +0800 Subject: [PATCH 04/11] Add basic tests for all sqls and autocommit transactions --- session/session_test.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/session/session_test.go b/session/session_test.go index 52f445558e4e0..925e3861c7a35 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4389,7 +4389,7 @@ func (s *testTxnStateSerialSuite) TestBasic(c *C) { c.Assert(info, IsNil) tk.MustExec("begin pessimistic;") - startTSStr := tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string) + startTSStr := tk.MustQuery("select @@tidb_current_ts;").Rows()[0][0].(string) startTS, err := strconv.ParseUint(startTSStr, 10, 64) c.Assert(err, IsNil) @@ -4415,6 +4415,9 @@ func (s *testTxnStateSerialSuite) TestBasic(c *C) { c.Assert(info.State, Equals, txninfo.TxnRunningNormal) c.Assert((*time.Time)(info.BlockStartTime), IsNil) c.Assert(info.StartTS, Equals, startTS) + _, beginDigest := parser.NormalizeDigest("begin pessimistic;") + _, selectTSDigest := parser.NormalizeDigest("select @@tidb_current_ts;") + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest, selectTSDigest, expectedDigest}) // len and size will be covered in TestLenAndSize c.Assert(info.ConnectionID, Equals, tk.Se.GetSessionVars().ConnectionID) @@ -4424,6 +4427,27 @@ func (s *testTxnStateSerialSuite) TestBasic(c *C) { tk.MustExec("commit;") info = tk.Se.TxnInfo() c.Assert(info, IsNil) + + // Test autocommit transaction + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "pause"), IsNil) + go func() { + tk.MustExec("insert into t values (2)") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + info = tk.Se.TxnInfo() + _, expectedDigest = parser.NormalizeDigest("insert into t values (2)") + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.State, Equals, txninfo.TxnCommitting) + c.Assert((*time.Time)(info.BlockStartTime), IsNil) + c.Assert(info.StartTS, Greater, startTS) + c.Assert(len(info.AllSQLDigests), Equals, 1) + c.Assert(info.AllSQLDigests[0], Equals, expectedDigest) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) + <-ch + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) } func (s *testTxnStateSerialSuite) TestEntriesCountAndSize(c *C) { From 4183553de2580daf52b6c9bf26988d93b036cda5 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 24 May 2021 17:33:34 +0800 Subject: [PATCH 05/11] Add more tests Signed-off-by: MyonKeminta --- session/session_test.go | 71 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/session/session_test.go b/session/session_test.go index 925e3861c7a35..e4cafe1726197 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4424,7 +4424,21 @@ func (s *testTxnStateSerialSuite) TestBasic(c *C) { c.Assert(info.Username, Equals, "") c.Assert(info.CurrentDB, Equals, "test") c.Assert(info.StartTS, Equals, startTS) - tk.MustExec("commit;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "pause"), IsNil) + go func() { + tk.MustExec("commit;") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + _, commitDigest := parser.NormalizeDigest("commit;") + info = tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, commitDigest) + c.Assert(info.State, Equals, txninfo.TxnCommitting) + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest, selectTSDigest, expectedDigest, commitDigest}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) + <-ch info = tk.Se.TxnInfo() c.Assert(info, IsNil) @@ -4526,6 +4540,61 @@ func (s *testTxnStateSerialSuite) TestRollbacking(c *C) { <-ch } +func (s *testTxnStateSerialSuite) TestTxnInfoWithPreparedStmt(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int)") + tk.MustExec("prepare s1 from 'insert into t values (?)'") + tk.MustExec("set @v = 1") + + tk.MustExec("begin pessimistic") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + tk.MustExec("execute s1 using @v") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + info := tk.Se.TxnInfo() + _, expectDigest := parser.NormalizeDigest("insert into t values (?)") + c.Assert(info.CurrentSQLDigest, Equals, expectDigest) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + info = tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, "") + _, beginDigest := parser.NormalizeDigest("begin pessimistic") + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest, expectDigest}) + + tk.MustExec("rollback") +} + +func (s *testTxnStateSerialSuite) TestTxnInfoWithScalarSubquery(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t (a int, b int)") + tk.MustExec("insert into t values (1, 10), (2, 1)") + + tk.MustExec("begin pessimistic") + _, beginDigest := parser.NormalizeDigest("begin pessimistic") + tk.MustExec("select * from t where a = (select b from t where a = 2)") + _, s1Digest := parser.NormalizeDigest("select * from t where a = (select b from t where a = 2)") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + tk.MustExec("update t set b = b + 1 where a = (select b from t where a = 2)") + ch <- nil + }() + _, s2Digest := parser.NormalizeDigest("update t set b = b + 1 where a = (select b from t where a = 1)") + time.Sleep(100 * time.Millisecond) + info := tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, s2Digest) + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest, s1Digest, s2Digest}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + tk.MustExec("rollback") +} + func (s *testSessionSuite) TestReadDMLBatchSize(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("set global tidb_dml_batch_size=1000") From b81d4295092195cf034fa6a790c3e9358b40fc25 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 24 May 2021 17:53:41 +0800 Subject: [PATCH 06/11] Add ALL_SQLS field to TIDB_TRX --- infoschema/infoschema_test.go | 1 + infoschema/tables.go | 1 + infoschema/tables_test.go | 6 +++--- session/txninfo/txn_info.go | 6 +++++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 87276ef1452b9..61f34032942fe 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -298,6 +298,7 @@ func (*testSuite) TestInfoTables(c *C) { "COLLATION_CHARACTER_SET_APPLICABILITY", "PROCESSLIST", "TIDB_TRX", + "DEADLOCKS", } for _, t := range infoTables { tb, err1 := is.TableByName(util.InformationSchemaName, model.NewCIStr(t)) diff --git a/infoschema/tables.go b/infoschema/tables.go index 6983ea0c53bbb..3039e2b0d9820 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1355,6 +1355,7 @@ var tableTiDBTrxCols = []columnInfo{ {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, {name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"}, {name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"}, + {name: "ALL_SQLS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, } var tableDeadlocksCols = []columnInfo{ diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 85ef1043196bb..0fa2a202d8c38 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1531,7 +1531,7 @@ func (s *testTableSuite) TestTrx(c *C) { sm.txnInfo[1] = &txninfo.TxnInfo{ StartTS: 425070846483628033, CurrentSQLDigest: "", - AllSQLDigests: nil, + AllSQLDigests: []string{"sql1", "sql2"}, State: txninfo.TxnLockWaiting, BlockStartTime: unsafe.Pointer(&blockTime2), ConnectionID: 10, @@ -1540,8 +1540,8 @@ func (s *testTableSuite) TestTrx(c *C) { } tk.Se.SetSessionManager(sm) tk.MustQuery("select * from information_schema.TIDB_TRX;").Check(testkit.Rows( - "424768545227014155 2021-05-07 04:56:48.001000 "+digest+" Normal 1 19 2 root test", - "425070846483628033 2021-05-20 13:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 10 user1 db1")) + "424768545227014155 2021-05-07 04:56:48.001000 "+digest+" Normal 1 19 2 root test []", + "425070846483628033 2021-05-20 13:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 10 user1 db1 [sql1, sql2]")) } func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) { diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 55eabff2ae429..5c2ea7af83c73 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -14,6 +14,7 @@ package txninfo import ( + "strings" "sync/atomic" "time" "unsafe" @@ -111,6 +112,8 @@ func (info *TxnInfo) ToDatum() []types.Datum { panic("this should never happen") } + allSQLs := "[" + strings.Join(info.AllSQLDigests, ", ") + "]" + state := types.NewMysqlEnumDatum(e) datums := types.MakeDatums( @@ -125,6 +128,7 @@ func (info *TxnInfo) ToDatum() []types.Datum { info.EntriesSize, info.ConnectionID, info.Username, - info.CurrentDB)...) + info.CurrentDB, + allSQLs)...) return datums } From 318e1ce752a157c6f870bffbb8b5fbe8f7e6be5b Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 24 May 2021 19:22:30 +0800 Subject: [PATCH 07/11] Update comments --- session/txn.go | 16 +++++----------- session/txninfo/txn_info.go | 17 ++++++++++------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/session/txn.go b/session/txn.go index e4940cd1705c3..31080908be665 100644 --- a/session/txn.go +++ b/session/txn.go @@ -64,18 +64,12 @@ type LazyTxn struct { // we need these fields because kv.Transaction provides no thread safety promise // but we hope getting TxnInfo is a thread safe op + // atomicTxnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct, + // it's stored as an unsafe.Pointer. atomicTxnInfo unsafe.Pointer - txnInfo *txninfo.TxnInfo - - //infoStartTS uint64 - //// current executing state - //State txninfo.TxnRunningState - //// last trying to block start time - //blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil - //// how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() - //EntriesCount uint64 - //// how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() - //EntriesSize uint64 + // txnInfo points to the same thing as atomicTxnInfo. It's just used internally by LazyTxn to avoid casting + // atomicTxnInfo from the untyped unsafe.Pointer every time using it. + txnInfo *txninfo.TxnInfo } // GetTableInfo returns the cached index name. diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 5c2ea7af83c73..563b1c8b4e501 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -49,22 +49,25 @@ type TxnInfo struct { // The following fields are immutable and can be safely read across threads. StartTS uint64 - // digest of SQL currently running + // Digest of SQL currently running CurrentSQLDigest string - AllSQLDigests []string + // Digests of all SQLs executed in the transaction. + AllSQLDigests []string - // The following fields are mutable and needs atomic read for goroutines other than the transaction itself. + // The following fields are mutable and needs to be read or written by atomic operations. But since only the + // transaction's thread can modify its value, it's ok for the transaction's thread to read it without atomic + // operations. - // current executing State + // Current execution state of the transaction. State TxnRunningState - // last trying to block start time. Invalid if State is not TxnLockWaiting. It's an unsafe pointer to time.Time or nil. + // Last trying to block start time. Invalid if State is not TxnLockWaiting. It's an unsafe pointer to time.Time or nil. BlockStartTime unsafe.Pointer // How many entries are in MemDB EntriesCount uint64 // MemDB used memory EntriesSize uint64 - // the following fields will be filled in `session` instead of `LazyTxn` + // The following fields will be filled in `session` instead of `LazyTxn` // Which session this transaction belongs to ConnectionID uint64 @@ -90,7 +93,7 @@ func (info *TxnInfo) Clone() *TxnInfo { } } -// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table +// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table. func (info *TxnInfo) ToDatum() []types.Datum { // TODO: The timezone represented to the user is not correct and it will be always UTC time. humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6).UTC() From 9914ae6a92edb4629bcc1bc3930a62373de380fc Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 26 May 2021 12:55:31 +0800 Subject: [PATCH 08/11] update txn digest info on PS protocol --- session/session.go | 9 ++++-- session/session_test.go | 64 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/session/session.go b/session/session.go index 608c0015473eb..31cf5f1f9b577 100644 --- a/session/session.go +++ b/session/session.go @@ -1872,10 +1872,15 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ if err != nil { return nil, err } + s.txn.onStmtStart(preparedStmt.SQLDigest) + var rs sqlexec.RecordSet if ok { - return s.cachedPlanExec(ctx, stmtID, preparedStmt, args) + rs, err = s.cachedPlanExec(ctx, stmtID, preparedStmt, args) + } else { + rs, err = s.preparedStmtExec(ctx, stmtID, preparedStmt, args) } - return s.preparedStmtExec(ctx, stmtID, preparedStmt, args) + s.txn.onStmtEnd() + return rs, err } func (s *session) DropPreparedStmt(stmtID uint32) error { diff --git a/session/session_test.go b/session/session_test.go index a18d36e99ba20..6ed4d62d1c519 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4595,6 +4595,70 @@ func (s *testTxnStateSerialSuite) TestTxnInfoWithScalarSubquery(c *C) { tk.MustExec("rollback") } +func (s *testTxnStateSerialSuite) TestTxnInfoWithPSProtocol(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t (a int primary key)") + + // Test autocommit transaction + + idInsert, _, _, err := tk.Se.PrepareStmt("insert into t values (?)") + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + _, err := tk.Se.ExecutePreparedStmt(context.Background(), idInsert, types.MakeDatums(1)) + c.Assert(err, IsNil) + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + _, digest := parser.NormalizeDigest("insert into t values (1)") + info := tk.Se.TxnInfo() + c.Assert(info, NotNil) + c.Assert(info.StartTS, Greater, uint64(0)) + c.Assert(info.State, Equals, txninfo.TxnCommitting) + c.Assert(info.CurrentSQLDigest, Equals, digest) + c.Assert(info.AllSQLDigests, DeepEquals, []string{digest}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) + <-ch + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) + + // Test non-autocommit transaction + + id1, _, _, err := tk.Se.PrepareStmt("select * from t where a = ?") + c.Assert(err, IsNil) + _, digest1 := parser.NormalizeDigest("select * from t where a = ?") + id2, _, _, err := tk.Se.PrepareStmt("update t set a = a + 1 where a = ?") + c.Assert(err, IsNil) + _, digest2 := parser.NormalizeDigest("update t set a = a + 1 where a = ?") + + tk.MustExec("begin pessimistic") + + _, err = tk.Se.ExecutePreparedStmt(context.Background(), id1, types.MakeDatums(1)) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + go func() { + _, err := tk.Se.ExecutePreparedStmt(context.Background(), id2, types.MakeDatums(1)) + c.Assert(err, IsNil) + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + info = tk.Se.TxnInfo() + c.Assert(info.StartTS, Greater, uint64(0)) + c.Assert(info.CurrentSQLDigest, Equals, digest2) + c.Assert(info.State, Equals, txninfo.TxnLockWaiting) + c.Assert((*time.Time)(info.BlockStartTime), NotNil) + _, beginDigest := parser.NormalizeDigest("begin pessimistic") + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest, digest1, digest2}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + tk.MustExec("rollback") +} + func (s *testSessionSuite) TestReadDMLBatchSize(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("set global tidb_dml_batch_size=1000") From bfe9733484f5b9fd232b56d95885411220d2b305 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 26 May 2021 16:37:16 +0800 Subject: [PATCH 09/11] Renaming Signed-off-by: MyonKeminta --- executor/executor_test.go | 8 ++++---- infoschema/tables.go | 10 +++++----- util/deadlockhistory/deadlock_history.go | 8 ++++---- util/deadlockhistory/deadlock_history_test.go | 14 +++++++------- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 65c60f392f689..47f070641e8d6 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8187,14 +8187,14 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) { TryLockTxn: 101, SQLDigest: "aabbccdd", Key: []byte("k1"), - AllSQLs: nil, + AllSQLDigests: nil, TxnHoldingLock: 102, }, { TryLockTxn: 102, SQLDigest: "ddccbbaa", Key: []byte("k2"), - AllSQLs: []string{"sql1"}, + AllSQLDigests: []string{"sql1"}, TxnHoldingLock: 101, }, }, @@ -8208,12 +8208,12 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) { WaitChain: []deadlockhistory.WaitChainItem{ { TryLockTxn: 201, - AllSQLs: []string{}, + AllSQLDigests: []string{}, TxnHoldingLock: 202, }, { TryLockTxn: 202, - AllSQLs: []string{"sql1", "sql2, sql3"}, + AllSQLDigests: []string{"sql1", "sql2, sql3"}, TxnHoldingLock: 203, }, { diff --git a/infoschema/tables.go b/infoschema/tables.go index a391bf0caf90c..dd5d74cd11b76 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1347,7 +1347,7 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{ var tableTiDBTrxCols = []columnInfo{ {name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag}, {name: "START_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "Start time of the transaction"}, - {name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, + {name: "CURRENT_SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, {name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"}, {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "Current lock waiting's start time"}, {name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"}, @@ -1355,18 +1355,18 @@ var tableTiDBTrxCols = []columnInfo{ {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, {name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"}, {name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"}, - {name: "ALL_SQLS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, + {name: "ALL_SQL_DIGESTS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, } var tableDeadlocksCols = []columnInfo{ {name: "DEADLOCK_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The ID to distinguish different deadlock events"}, {name: "OCCUR_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "The physical time when the deadlock occurs"}, {name: "RETRYABLE", tp: mysql.TypeTiny, size: 1, flag: mysql.NotNullFlag, comment: "Whether the deadlock is retryable. Retryable deadlocks are usually not reported to the client"}, - {name: "TRY_LOCK_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's trying to acquire the lock"}, + {name: "TRY_LOCK_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction ID (start ts) of the transaction that's trying to acquire the lock"}, {name: "CURRENT_SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "The digest of the SQL that's being blocked"}, {name: "KEY", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "The key on which a transaction is waiting for another"}, - {name: "ALL_SQLS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, - {name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"}, + {name: "ALL_SQL_DIGESTS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, + {name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"}, } // GetShardingInfo returns a nil or description string for the sharding information of given TableInfo. diff --git a/util/deadlockhistory/deadlock_history.go b/util/deadlockhistory/deadlock_history.go index c219442cf5bf1..ee2fb496a2a58 100644 --- a/util/deadlockhistory/deadlock_history.go +++ b/util/deadlockhistory/deadlock_history.go @@ -32,7 +32,7 @@ type WaitChainItem struct { TryLockTxn uint64 SQLDigest string Key []byte - AllSQLs []string + AllSQLDigests []string TxnHoldingLock uint64 } @@ -149,8 +149,8 @@ func (d *DeadlockHistory) GetAllDatum() [][]types.Datum { } row[6] = nil - if item.AllSQLs != nil { - row[6] = "[" + strings.Join(item.AllSQLs, ", ") + "]" + if item.AllSQLDigests != nil { + row[6] = "[" + strings.Join(item.AllSQLDigests, ", ") + "]" } row[7] = item.TxnHoldingLock @@ -185,7 +185,7 @@ func ErrDeadlockToDeadlockRecord(dl *tikverr.ErrDeadlock) *DeadlockRecord { TryLockTxn: rawItem.Txn, SQLDigest: hex.EncodeToString(sqlDigest), Key: rawItem.Key, - AllSQLs: nil, + AllSQLDigests: nil, TxnHoldingLock: rawItem.WaitForTxn, }) } diff --git a/util/deadlockhistory/deadlock_history_test.go b/util/deadlockhistory/deadlock_history_test.go index dd9428a9f550a..398d2943ab996 100644 --- a/util/deadlockhistory/deadlock_history_test.go +++ b/util/deadlockhistory/deadlock_history_test.go @@ -148,7 +148,7 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { TryLockTxn: 101, SQLDigest: "sql1", Key: []byte("k1"), - AllSQLs: []string{"sql1", "sql2"}, + AllSQLDigests: []string{"sql1", "sql2"}, TxnHoldingLock: 102, }, // It should work even some information are missing. @@ -164,12 +164,12 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { WaitChain: []WaitChainItem{ { TryLockTxn: 201, - AllSQLs: []string{}, + AllSQLDigests: []string{}, TxnHoldingLock: 202, }, { TryLockTxn: 202, - AllSQLs: []string{"sql1"}, + AllSQLDigests: []string{"sql1"}, TxnHoldingLock: 201, }, }, @@ -201,7 +201,7 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { c.Assert(res[0][3].GetValue(), Equals, uint64(101)) // TRY_LOCK_TRX_ID c.Assert(res[0][4].GetValue(), Equals, "sql1") // SQL_DIGEST c.Assert(res[0][5].GetValue(), Equals, "6B31") // KEY - c.Assert(res[0][6].GetValue(), Equals, "[sql1, sql2]") // ALL_SQLS + c.Assert(res[0][6].GetValue(), Equals, "[sql1, sql2]") // ALL_SQL_DIGESTS c.Assert(res[0][7].GetValue(), Equals, uint64(102)) // TRX_HOLDING_LOCK c.Assert(res[1][0].GetValue(), Equals, uint64(1)) // ID @@ -210,21 +210,21 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { c.Assert(res[1][3].GetValue(), Equals, uint64(102)) // TRY_LOCK_TRX_ID c.Assert(res[1][4].GetValue(), Equals, nil) // SQL_DIGEST c.Assert(res[1][5].GetValue(), Equals, nil) // KEY - c.Assert(res[1][6].GetValue(), Equals, nil) // ALL_SQLS + c.Assert(res[1][6].GetValue(), Equals, nil) // ALL_SQL_DIGESTS c.Assert(res[1][7].GetValue(), Equals, uint64(101)) // TRX_HOLDING_LOCK c.Assert(res[2][0].GetValue(), Equals, uint64(2)) // ID c.Assert(toGoTime(res[2][1]), Equals, time2) // OCCUR_TIME c.Assert(res[2][2].GetValue(), Equals, int64(1)) // RETRYABLE c.Assert(res[2][3].GetValue(), Equals, uint64(201)) // TRY_LOCK_TRX_ID - c.Assert(res[2][6].GetValue(), Equals, "[]") // ALL_SQLS + c.Assert(res[2][6].GetValue(), Equals, "[]") // ALL_SQL_DIGESTS c.Assert(res[2][7].GetValue(), Equals, uint64(202)) // TRX_HOLDING_LOCK c.Assert(res[3][0].GetValue(), Equals, uint64(2)) // ID c.Assert(toGoTime(res[3][1]), Equals, time2) // OCCUR_TIME c.Assert(res[3][2].GetValue(), Equals, int64(1)) // RETRYABLE c.Assert(res[3][3].GetValue(), Equals, uint64(202)) // TRY_LOCK_TRX_ID - c.Assert(res[3][6].GetValue(), Equals, "[sql1]") // ALL_SQLS + c.Assert(res[3][6].GetValue(), Equals, "[sql1]") // ALL_SQL_DIGESTS c.Assert(res[3][7].GetValue(), Equals, uint64(201)) // TRX_HOLDING_LOCK } From e5921e5584b56da7e24c6d25267a7180c28b5658 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 26 May 2021 18:21:37 +0800 Subject: [PATCH 10/11] Address comment --- session/txn.go | 6 +++--- session/txninfo/txn_info.go | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/session/txn.go b/session/txn.go index a3539965ec7d0..9615a84857fd4 100644 --- a/session/txn.go +++ b/session/txn.go @@ -273,7 +273,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { return } - info := txn.txnInfo.Clone() + info := txn.txnInfo.GetSnapInfo() info.CurrentSQLDigest = currentSQLDigest // Keeps at most 50 history sqls to avoid consuming too much memory. const maxTransactionStmtHistory int = 50 @@ -285,7 +285,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { } func (txn *LazyTxn) onStmtEnd() { - info := txn.txnInfo.Clone() + info := txn.txnInfo.GetSnapInfo() info.CurrentSQLDigest = "" txn.storeTxnInfo(info) } @@ -433,7 +433,7 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { // Info dump the TxnState to Datum for displaying in `TIDB_TRX` // This function is supposed to be thread safe func (txn *LazyTxn) Info() *txninfo.TxnInfo { - info := txn.loadTxnInfo().Clone() + info := txn.loadTxnInfo().GetSnapInfo() if info.StartTS == 0 { return nil } diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 563b1c8b4e501..e75a9825d4bfb 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -77,8 +77,10 @@ type TxnInfo struct { CurrentDB string } -// Clone clones the TxnInfo. It's safe to call concurrently with the transaction. -func (info *TxnInfo) Clone() *TxnInfo { +// GetSnapInfo gets a snapshot of the TxnInfo for read. It's safe to call concurrently with the transaction. +// Note that this function doesn't do deep copy and some fields of the result may be unsafe to write. Use it at your own +// risk. +func (info *TxnInfo) GetSnapInfo() *TxnInfo { return &TxnInfo{ StartTS: info.StartTS, CurrentSQLDigest: info.CurrentSQLDigest, From 43cd7b40453d5c7e7458c3b992442ec898a0b61e Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 27 May 2021 12:04:34 +0800 Subject: [PATCH 11/11] Address comment --- session/txn.go | 58 ++++++++++++++++++------------------- session/txninfo/txn_info.go | 4 +-- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/session/txn.go b/session/txn.go index 9615a84857fd4..bb00265044ddf 100644 --- a/session/txn.go +++ b/session/txn.go @@ -63,12 +63,9 @@ type LazyTxn struct { // we need these fields because kv.Transaction provides no thread safety promise // but we hope getting TxnInfo is a thread safe op - // atomicTxnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct, + // txnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct, // it's stored as an unsafe.Pointer. - atomicTxnInfo unsafe.Pointer - // txnInfo points to the same thing as atomicTxnInfo. It's just used internally by LazyTxn to avoid casting - // atomicTxnInfo from the untyped unsafe.Pointer every time using it. - txnInfo *txninfo.TxnInfo + txnInfo unsafe.Pointer } // GetTableInfo returns the cached index name. @@ -83,10 +80,9 @@ func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) - txn.txnInfo = &txninfo.TxnInfo{ + txn.storeTxnInfo(&txninfo.TxnInfo{ State: txninfo.TxnRunningNormal, - } - txn.storeTxnInfo(txn.txnInfo) + }) } func (txn *LazyTxn) initStmtBuf() { @@ -122,13 +118,14 @@ func (txn *LazyTxn) cleanupStmtBuf() { buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() - atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) + + txnInfo := txn.getTxnInfo() + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) } func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) { - txn.txnInfo = info - atomic.StorePointer(&txn.atomicTxnInfo, unsafe.Pointer(info)) + atomic.StorePointer(&txn.txnInfo, unsafe.Pointer(info)) } func (txn *LazyTxn) recreateTxnInfo( @@ -150,8 +147,8 @@ func (txn *LazyTxn) recreateTxnInfo( txn.storeTxnInfo(info) } -func (txn *LazyTxn) loadTxnInfo() *txninfo.TxnInfo { - return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.atomicTxnInfo)) +func (txn *LazyTxn) getTxnInfo() *txninfo.TxnInfo { + return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.txnInfo)) } // Size implements the MemBuffer interface. @@ -241,13 +238,14 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { txn.initStmtBuf() // The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them. + txnInfo := txn.getTxnInfo() txn.recreateTxnInfo( t.StartTS(), txninfo.TxnRunningNormal, uint64(txn.Transaction.Len()), uint64(txn.Transaction.Size()), - txn.txnInfo.CurrentSQLDigest, - txn.txnInfo.AllSQLDigests) + txnInfo.CurrentSQLDigest, + txnInfo.AllSQLDigests) return nil } @@ -273,7 +271,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { return } - info := txn.txnInfo.GetSnapInfo() + info := txn.getTxnInfo().ShallowClone() info.CurrentSQLDigest = currentSQLDigest // Keeps at most 50 history sqls to avoid consuming too much memory. const maxTransactionStmtHistory int = 50 @@ -285,7 +283,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { } func (txn *LazyTxn) onStmtEnd() { - info := txn.txnInfo.GetSnapInfo() + info := txn.getTxnInfo().ShallowClone() info.CurrentSQLDigest = "" txn.storeTxnInfo(info) } @@ -327,7 +325,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } - atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnCommitting) + atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnCommitting) failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) @@ -359,7 +357,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { // Rollback overrides the Transaction interface. func (txn *LazyTxn) Rollback() error { defer txn.reset() - atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnRollingBack) + atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnRollingBack) // mockSlowRollback is used to mock a rollback which takes a long time failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() @@ -367,14 +365,15 @@ func (txn *LazyTxn) Rollback() error { // LockKeys Wrap the inner transaction's `LockKeys` to record the status func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { - originState := atomic.SwapInt32(&txn.txnInfo.State, txninfo.TxnLockWaiting) + txnInfo := txn.getTxnInfo() + originState := atomic.SwapInt32(&txnInfo.State, txninfo.TxnLockWaiting) t := time.Now() - atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(&t)) + atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(&t)) err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) - atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(nil)) - atomic.StoreInt32(&txn.txnInfo.State, originState) - atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) + atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txnInfo.State, originState) + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) return err } @@ -433,7 +432,7 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { // Info dump the TxnState to Datum for displaying in `TIDB_TRX` // This function is supposed to be thread safe func (txn *LazyTxn) Info() *txninfo.TxnInfo { - info := txn.loadTxnInfo().GetSnapInfo() + info := txn.getTxnInfo().ShallowClone() if info.StartTS == 0 { return nil } @@ -445,8 +444,9 @@ func (txn *LazyTxn) Info() *txninfo.TxnInfo { // txn.Transaction can be changed during this function's execution if running parallel. func (txn *LazyTxn) UpdateEntriesCountAndSize() { if txn.Valid() { - atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) + txnInfo := txn.getTxnInfo() + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) } } diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index e75a9825d4bfb..acc52e985f0f9 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -77,10 +77,10 @@ type TxnInfo struct { CurrentDB string } -// GetSnapInfo gets a snapshot of the TxnInfo for read. It's safe to call concurrently with the transaction. +// ShallowClone shallow clones the TxnInfo. It's safe to call concurrently with the transaction. // Note that this function doesn't do deep copy and some fields of the result may be unsafe to write. Use it at your own // risk. -func (info *TxnInfo) GetSnapInfo() *TxnInfo { +func (info *TxnInfo) ShallowClone() *TxnInfo { return &TxnInfo{ StartTS: info.StartTS, CurrentSQLDigest: info.CurrentSQLDigest,