diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 1eb7e1478b2f9..22b60187f0a77 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util" @@ -70,6 +71,10 @@ type mockSessionManager struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) for _, item := range msm.PS { diff --git a/domain/domain_test.go b/domain/domain_test.go index 7c9d9ff633bc5..a4432b0fb1fe6 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv" @@ -241,6 +242,10 @@ type mockSessionManager struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) for _, item := range msm.PS { diff --git a/executor/adapter.go b/executor/adapter.go index 5f5229195c3f9..44d00cd1efa1e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -55,6 +55,7 @@ import ( "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -377,6 +378,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { if txn.Valid() { txnStartTS = txn.StartTS() } + return &recordSet{ executor: e, stmt: a, @@ -590,6 +592,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { } e, err = a.handlePessimisticLockError(ctx, err) if err != nil { + // todo: Report deadlock if ErrDeadlock.Equal(err) { metrics.StatementDeadlockDetectDuration.Observe(time.Since(startLocking).Seconds()) } diff --git a/executor/builder.go b/executor/builder.go index 40282d1030b2c..3324e52f894ff 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1531,7 +1531,9 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo strings.ToLower(infoschema.TablePlacementPolicy), strings.ToLower(infoschema.TableClientErrorsSummaryGlobal), strings.ToLower(infoschema.TableClientErrorsSummaryByUser), - strings.ToLower(infoschema.TableClientErrorsSummaryByHost): + strings.ToLower(infoschema.TableClientErrorsSummaryByHost), + strings.ToLower(infoschema.TableTiDBTrx), + strings.ToLower(infoschema.ClusterTableTiDBTrx): return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), table: v.Table, diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 7cc5a8a69d66e..5591dcefde54d 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" plannerutil "github.com/pingcap/tidb/planner/util" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -60,6 +61,10 @@ type mockSessionManager struct { serverID uint64 } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/explainfor_test.go b/executor/explainfor_test.go index a113200a925d8..e29a7a3e24cee 100644 --- a/executor/explainfor_test.go +++ b/executor/explainfor_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/auth" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/israce" @@ -38,6 +39,10 @@ type mockSessionManager1 struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo { + return nil +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 0ec0c48885ecf..ae338bdd644d2 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -149,6 +149,10 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex infoschema.TableClientErrorsSummaryByUser, infoschema.TableClientErrorsSummaryByHost: err = e.setDataForClientErrorsSummary(sctx, e.table.Name.O) + case infoschema.TableTiDBTrx: + e.setDataForTiDBTrx(sctx) + case infoschema.ClusterTableTiDBTrx: + err = e.setDataForClusterTiDBTrx(sctx) } if err != nil { return nil, err @@ -2011,6 +2015,40 @@ func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context return nil } +func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) { + sm := ctx.GetSessionManager() + if sm == nil { + return + } + + loginUser := ctx.GetSessionVars().User + var hasProcessPriv bool + if pm := privilege.GetPrivilegeManager(ctx); pm != nil { + if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { + hasProcessPriv = true + } + } + infoList := sm.ShowTxnList() + for _, info := range infoList { + // If you have the PROCESS privilege, you can see all running transactions. + // Otherwise, you can see only your own transactions. + if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username { + continue + } + e.rows = append(e.rows, info.ToDatum()) + } +} + +func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) error { + e.setDataForTiDBTrx(ctx) + rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows) + if err != nil { + return err + } + e.rows = rows + return nil +} + type hugeMemTableRetriever struct { dummyCloser table *model.TableInfo diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index c3e125824873d..e19eb9d9b3064 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" @@ -728,6 +729,10 @@ type mockSessionManager struct { serverID uint64 } +func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { return sm.processInfoMap } diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 1f8edf79d942e..e0e2c19ee0f22 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/domain" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/israce" @@ -135,6 +136,10 @@ type mockSessionManager2 struct { killed bool } +func (sm *mockSessionManager2) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (sm *mockSessionManager2) ShowProcessList() map[uint64]*util.ProcessInfo { pl := make(map[uint64]*util.ProcessInfo) if pi, ok := sm.GetProcessInfo(0); ok { diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 916f218db1f9d..bb8f05e5eff54 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" @@ -796,6 +797,10 @@ type mockSessionManager1 struct { Se session.Session } +func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/infoschema/cluster.go b/infoschema/cluster.go index f113e90a0f587..2d196fe5b0023 100644 --- a/infoschema/cluster.go +++ b/infoschema/cluster.go @@ -37,6 +37,8 @@ const ( ClusterTableStatementsSummary = "CLUSTER_STATEMENTS_SUMMARY" // ClusterTableStatementsSummaryHistory is the string constant of cluster statement summary history table. ClusterTableStatementsSummaryHistory = "CLUSTER_STATEMENTS_SUMMARY_HISTORY" + // ClusterTableTiDBTrx is the string constant of cluster transaction running table. + ClusterTableTiDBTrx = "CLUSTER_TIDB_TRX" ) // memTableToClusterTables means add memory table to cluster table. @@ -45,6 +47,7 @@ var memTableToClusterTables = map[string]string{ TableProcesslist: ClusterTableProcesslist, TableStatementsSummary: ClusterTableStatementsSummary, TableStatementsSummaryHistory: ClusterTableStatementsSummaryHistory, + TableTiDBTrx: ClusterTableTiDBTrx, } func init() { diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index c3892e6527962..6aa0c5526f467 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -332,6 +332,7 @@ func (*testSuite) TestInfoTables(c *C) { "TABLESPACES", "COLLATION_CHARACTER_SET_APPLICABILITY", "PROCESSLIST", + "TIDB_TRX", } for _, t := range infoTables { tb, err1 := is.TableByName(util.InformationSchemaName, model.NewCIStr(t)) diff --git a/infoschema/tables.go b/infoschema/tables.go index bfca649e89fdd..2d5112ada05c0 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -31,11 +31,13 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" @@ -161,6 +163,8 @@ const ( TableClientErrorsSummaryByUser = "CLIENT_ERRORS_SUMMARY_BY_USER" // TableClientErrorsSummaryByHost is the string constant of client errors table. TableClientErrorsSummaryByHost = "CLIENT_ERRORS_SUMMARY_BY_HOST" + // TableTiDBTrx is current running transaction status table. + TableTiDBTrx = "TIDB_TRX" ) var tableIDMap = map[string]int64{ @@ -233,22 +237,25 @@ var tableIDMap = map[string]int64{ TableClientErrorsSummaryGlobal: autoid.InformationSchemaDBID + 67, TableClientErrorsSummaryByUser: autoid.InformationSchemaDBID + 68, TableClientErrorsSummaryByHost: autoid.InformationSchemaDBID + 69, + TableTiDBTrx: autoid.InformationSchemaDBID + 70, + ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71, } type columnInfo struct { - name string - tp byte - size int - decimal int - flag uint - deflt interface{} - comment string + name string + tp byte + size int + decimal int + flag uint + deflt interface{} + comment string + enumElems []string } func buildColumnInfo(col columnInfo) *model.ColumnInfo { mCharset := charset.CharsetBin mCollation := charset.CharsetBin - if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob { + if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob || col.tp == mysql.TypeEnum { mCharset = charset.CharsetUTF8MB4 mCollation = charset.CollationUTF8MB4 } @@ -259,6 +266,7 @@ func buildColumnInfo(col columnInfo) *model.ColumnInfo { Flen: col.size, Decimal: col.decimal, Flag: col.flag, + Elems: col.enumElems, } return &model.ColumnInfo{ Name: model.NewCIStr(col.name), @@ -1332,6 +1340,19 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{ {name: "LAST_SEEN", tp: mysql.TypeTimestamp, size: 26}, } +var tableTiDBTrxCols = []columnInfo{ + {name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag}, + {name: "START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Start time of the transaction"}, + {name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, + {name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"}, + {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Current lock waiting's start time"}, + {name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"}, + {name: "SIZE", tp: mysql.TypeLonglong, size: 64, comment: "MemDB used memory"}, + {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, + {name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"}, + {name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"}, +} + // GetShardingInfo returns a nil or description string for the sharding information of given TableInfo. // The returned description string may be: // - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified. @@ -1701,6 +1722,7 @@ var tableNameToColumns = map[string][]columnInfo{ TableClientErrorsSummaryGlobal: tableClientErrorsSummaryGlobalCols, TableClientErrorsSummaryByUser: tableClientErrorsSummaryByUserCols, TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols, + TableTiDBTrx: tableTiDBTrxCols, } func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) { diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index f30f25ba6abfa..6cc24300c1be4 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/fn" + "github.com/pingcap/parser" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -42,6 +43,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util" @@ -121,7 +123,7 @@ func (s *testClusterTableSuite) setUpRPCService(c *C, addr string) (*grpc.Server lis, err := net.Listen("tcp", addr) c.Assert(err, IsNil) // Fix issue 9836 - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "root", @@ -276,7 +278,7 @@ func (s *testTableSuite) TestInfoschemaFieldValue(c *C) { tk1.MustQuery("select distinct(table_schema) from information_schema.tables").Check(testkit.Rows("INFORMATION_SCHEMA")) // Fix issue 9836 - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "root", @@ -433,6 +435,11 @@ func (s *testTableSuite) TestCurrentTimestampAsDefault(c *C) { type mockSessionManager struct { processInfoMap map[uint64]*util.ProcessInfo + txnInfo []*txninfo.TxnInfo +} + +func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + return sm.txnInfo } func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { @@ -459,7 +466,7 @@ func (s *testTableSuite) TestSomeTables(c *C) { c.Assert(err, IsNil) tk := testkit.NewTestKit(c, s.store) tk.Se = se - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "user-1", @@ -516,7 +523,7 @@ func (s *testTableSuite) TestSomeTables(c *C) { fmt.Sprintf("3 user-3 127.0.0.1:12345 test Init DB 9223372036 %s %s", "in transaction", "check port"), )) - sm = &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)} + sm = &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "user-1", @@ -1509,3 +1516,24 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) { err = tk.ExecToErr("FLUSH CLIENT_ERRORS_SUMMARY") c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the RELOAD privilege(s) for this operation") } + +func (s *testTableSuite) TestTrx(c *C) { + tk := s.newTestKitWithRoot(c) + _, digest := parser.NormalizeDigest("select * from trx for update;") + sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} + sm.txnInfo[0] = &txninfo.TxnInfo{ + StartTS: 424768545227014155, + CurrentSQLDigest: digest, + State: txninfo.TxnRunningNormal, + BlockStartTime: nil, + EntriesCount: 1, + EntriesSize: 19, + ConnectionID: 2, + Username: "root", + CurrentDB: "test", + } + tk.Se.SetSessionManager(sm) + tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( + testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest + " Normal 1 19 2 root test"), + ) +} diff --git a/kv/kv.go b/kv/kv.go index a6a23a88df01d..1fad79d641009 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -154,6 +154,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. + // Will block until all keys are locked successfully or an error occurs. LockKeys(ctx context.Context, lockCtx *LockCtx, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. diff --git a/server/server.go b/server/server.go index f7a6021a11221..29f5307895cc2 100644 --- a/server/server.go +++ b/server/server.go @@ -37,7 +37,6 @@ import ( "math/rand" "net" "net/http" - "unsafe" // For pprof _ "net/http/pprof" @@ -46,6 +45,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/blacktear23/go-proxyprotocol" "github.com/pingcap/errors" @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/plugin" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" @@ -557,6 +558,22 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo { return rs } +// ShowTxnList shows all txn info for displaying in `TIDB_TRX` +func (s *Server) ShowTxnList() []*txninfo.TxnInfo { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + rs := make([]*txninfo.TxnInfo, 0, len(s.clients)) + for _, client := range s.clients { + if client.ctx.Session != nil { + info := client.ctx.Session.TxnInfo() + if info != nil { + rs = append(rs, info) + } + } + } + return rs +} + // GetProcessInfo implements the SessionManager interface. func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) { s.rwlock.RLock() diff --git a/session/session.go b/session/session.go index 2f842f92e183a..0b4cb309f434b 100644 --- a/session/session.go +++ b/session/session.go @@ -41,6 +41,9 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tipb/go-binlog" + "go.uber.org/zap" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -58,6 +61,7 @@ import ( "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -81,8 +85,6 @@ import ( "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" - "github.com/pingcap/tipb/go-binlog" - "go.uber.org/zap" ) var ( @@ -145,6 +147,8 @@ type Session interface { Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool AuthWithoutVerification(user *auth.UserIdentity) bool ShowProcess() *util.ProcessInfo + // Return the information of the txn current running + TxnInfo() *txninfo.TxnInfo // PrepareTxnCtx is exported for test. PrepareTxnCtx(context.Context) // FieldList returns fields list of a table. @@ -183,7 +187,7 @@ func (h *StmtHistory) Count() int { type session struct { // processInfo is used by ShowProcess(), and should be modified atomically. processInfo atomic.Value - txn TxnState + txn LazyTxn mu struct { sync.RWMutex @@ -442,6 +446,19 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { return fields, nil } +func (s *session) TxnInfo() *txninfo.TxnInfo { + txnInfo := s.txn.Info() + if txnInfo == nil { + return nil + } + processInfo := s.ShowProcess() + txnInfo.CurrentSQLDigest = processInfo.Digest + txnInfo.ConnectionID = processInfo.ID + txnInfo.Username = processInfo.User + txnInfo.CurrentDB = processInfo.DB + return txnInfo +} + func (s *session) doCommit(ctx context.Context) error { if !s.txn.Valid() { return nil @@ -524,6 +541,7 @@ func (s *session) doCommit(ctx context.Context) error { if err = memBuffer.Delete(iter.Key()); err != nil { return errors.Trace(err) } + s.txn.UpdateEntriesCountAndSize() if err = iter.Next(); err != nil { return errors.Trace(err) } diff --git a/session/session_test.go b/session/session_test.go index 5fa7779fc65c3..3baee4f0ef6f1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -42,6 +42,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -83,6 +84,7 @@ var _ = SerialSuites(&testSessionSerialSuite{}) var _ = SerialSuites(&testBackupRestoreSuite{}) var _ = Suite(&testClusteredSuite{}) var _ = SerialSuites(&testClusteredSerialSuite{}) +var _ = SerialSuites(&testTxnStateSuite{}) type testSessionSuiteBase struct { cluster cluster.Cluster @@ -4303,3 +4305,103 @@ func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) { // The global temporary table data is discard after the transaction commit. tk.MustQuery("select * from g_tmp").Check(testkit.Rows()) } + +type testTxnStateSuite struct { + testSessionSuiteBase +} + +func (s *testTxnStateSuite) TestBasic(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1);") + info := tk.Se.TxnInfo() + c.Assert(info, IsNil) + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t for update;") + info = tk.Se.TxnInfo() + _, expectedDigest := parser.NormalizeDigest("select * from t for update;") + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.State, Equals, txninfo.TxnRunningNormal) + c.Assert(info.BlockStartTime, IsNil) + // len and size will be covered in TestLenAndSize + c.Assert(info.ConnectionID, Equals, tk.Se.GetSessionVars().ConnectionID) + c.Assert(info.Username, Equals, "") + c.Assert(info.CurrentDB, Equals, "test") + tk.MustExec("commit;") + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) +} + +func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("begin pessimistic;") + tk.MustExec("insert into t(a) values (1);") + info := tk.Se.TxnInfo() + c.Assert(info.EntriesCount, Equals, uint64(1)) + c.Assert(info.EntriesSize, Equals, uint64(29)) + tk.MustExec("insert into t(a) values (2);") + info = tk.Se.TxnInfo() + c.Assert(info.EntriesCount, Equals, uint64(2)) + c.Assert(info.EntriesSize, Equals, uint64(58)) + tk.MustExec("commit;") +} + +func (s *testTxnStateSuite) TestBlocked(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1);") + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t where a = 1 for update;") + go func() { + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from t where a = 1 for update;") + tk2.MustExec("commit;") + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk2.Se.TxnInfo().State, Equals, txninfo.TxnLockWaiting) + c.Assert(tk2.Se.TxnInfo().BlockStartTime, NotNil) + tk.MustExec("commit;") +} + +func (s *testTxnStateSuite) TestCommitting(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1), (2);") + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t where a = 1 for update;") + ch := make(chan struct{}) + go func() { + tk2.MustExec("begin pessimistic") + c.Assert(tk2.Se.TxnInfo(), NotNil) + tk2.MustExec("select * from t where a = 2 for update;") + failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)") + defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit") + tk2.MustExec("commit;") + ch <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk2.Se.TxnInfo().State, Equals, txninfo.TxnCommitting) + tk.MustExec("commit;") + <-ch +} + +func (s *testTxnStateSuite) TestRollbacking(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1), (2);") + ch := make(chan struct{}) + go func() { + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t(a) values (3);") + failpoint.Enable("github.com/pingcap/tidb/session/mockSlowRollback", "sleep(200)") + defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowRollback") + tk.MustExec("rollback;") + ch <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk.Se.TxnInfo().State, Equals, txninfo.TxnRollingBack) + <-ch +} diff --git a/session/txn.go b/session/txn.go index aebed7ed920b2..133cafb976aae 100644 --- a/session/txn.go +++ b/session/txn.go @@ -20,6 +20,8 @@ import ( "runtime/trace" "strings" "sync/atomic" + "time" + "unsafe" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" @@ -28,6 +30,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" tikvstore "github.com/pingcap/tidb/store/tikv/kv" @@ -39,12 +42,12 @@ import ( "go.uber.org/zap" ) -// TxnState wraps kv.Transaction to provide a new kv.Transaction. +// LazyTxn wraps kv.Transaction to provide a new kv.Transaction. // 1. It holds all statement related modification in the buffer before flush to the txn, // so if execute statement meets error, the txn won't be made dirty. // 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need. -type TxnState struct { - // States of a TxnState should be one of the followings: +type LazyTxn struct { + // States of a LazyTxn should be one of the followings: // Invalid: kv.Transaction == nil && txnFuture == nil // Pending: kv.Transaction == nil && txnFuture != nil // Valid: kv.Transaction != nil && txnFuture == nil @@ -55,23 +58,40 @@ type TxnState struct { stagingHandle kv.StagingHandle mutations map[int64]*binlog.TableMutation writeSLI sli.TxnWriteThroughputSLI + + // following atomic fields are used for filling TxnInfo + // we need these fields because kv.Transaction provides no thread safety promise + // but we hope getting TxnInfo is a thread safe op + + infoStartTS uint64 + // current executing state + State txninfo.TxnRunningState + // last trying to block start time + blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil + // how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() + EntriesCount uint64 + // how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() + EntriesSize uint64 } // GetTableInfo returns the cached index name. -func (txn *TxnState) GetTableInfo(id int64) *model.TableInfo { +func (txn *LazyTxn) GetTableInfo(id int64) *model.TableInfo { return txn.Transaction.GetTableInfo(id) } // CacheTableInfo caches the index name. -func (txn *TxnState) CacheTableInfo(id int64, info *model.TableInfo) { +func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { txn.Transaction.CacheTableInfo(id, info) } -func (txn *TxnState) init() { +func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.EntriesCount, 0) + atomic.StoreUint64(&txn.EntriesSize, 0) } -func (txn *TxnState) initStmtBuf() { +func (txn *LazyTxn) initStmtBuf() { if txn.Transaction == nil { return } @@ -81,14 +101,14 @@ func (txn *TxnState) initStmtBuf() { } // countHint is estimated count of mutations. -func (txn *TxnState) countHint() int { +func (txn *LazyTxn) countHint() int { if txn.stagingHandle == kv.InvalidStagingHandle { return 0 } return txn.Transaction.GetMemBuffer().Len() - txn.initCnt } -func (txn *TxnState) flushStmtBuf() { +func (txn *LazyTxn) flushStmtBuf() { if txn.stagingHandle == kv.InvalidStagingHandle { return } @@ -97,17 +117,19 @@ func (txn *TxnState) flushStmtBuf() { txn.initCnt = buf.Len() } -func (txn *TxnState) cleanupStmtBuf() { +func (txn *LazyTxn) cleanupStmtBuf() { if txn.stagingHandle == kv.InvalidStagingHandle { return } buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) } // Size implements the MemBuffer interface. -func (txn *TxnState) Size() int { +func (txn *LazyTxn) Size() int { if txn.Transaction == nil { return 0 } @@ -115,19 +137,19 @@ func (txn *TxnState) Size() int { } // Valid implements the kv.Transaction interface. -func (txn *TxnState) Valid() bool { +func (txn *LazyTxn) Valid() bool { return txn.Transaction != nil && txn.Transaction.Valid() } -func (txn *TxnState) pending() bool { +func (txn *LazyTxn) pending() bool { return txn.Transaction == nil && txn.txnFuture != nil } -func (txn *TxnState) validOrPending() bool { +func (txn *LazyTxn) validOrPending() bool { return txn.txnFuture != nil || txn.Valid() } -func (txn *TxnState) String() string { +func (txn *LazyTxn) String() string { if txn.Transaction != nil { return txn.Transaction.String() } @@ -138,7 +160,7 @@ func (txn *TxnState) String() string { } // GoString implements the "%#v" format for fmt.Printf. -func (txn *TxnState) GoString() string { +func (txn *LazyTxn) GoString() string { var s strings.Builder s.WriteString("Txn{") if txn.pending() { @@ -157,18 +179,25 @@ func (txn *TxnState) GoString() string { return s.String() } -func (txn *TxnState) changeInvalidToValid(kvTxn kv.Transaction) { +func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { txn.Transaction = kvTxn + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.infoStartTS, kvTxn.StartTS()) txn.initStmtBuf() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) txn.txnFuture = nil } -func (txn *TxnState) changeInvalidToPending(future *txnFuture) { +func (txn *LazyTxn) changeInvalidToPending(future *txnFuture) { txn.Transaction = nil txn.txnFuture = future + atomic.StoreUint64(&txn.infoStartTS, 0) + atomic.StoreUint64(&txn.EntriesCount, uint64(0)) + atomic.StoreUint64(&txn.EntriesSize, uint64(0)) } -func (txn *TxnState) changePendingToValid(ctx context.Context) error { +func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { if txn.txnFuture == nil { return errors.New("transaction future is not set") } @@ -183,17 +212,24 @@ func (txn *TxnState) changePendingToValid(ctx context.Context) error { return err } txn.Transaction = t + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.infoStartTS, t.StartTS()) txn.initStmtBuf() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) return nil } -func (txn *TxnState) changeToInvalid() { +func (txn *LazyTxn) changeToInvalid() { if txn.stagingHandle != kv.InvalidStagingHandle { txn.Transaction.GetMemBuffer().Cleanup(txn.stagingHandle) } txn.stagingHandle = kv.InvalidStagingHandle txn.Transaction = nil txn.txnFuture = nil + atomic.StoreUint64(&txn.infoStartTS, 0) + atomic.StoreUint64(&txn.EntriesCount, 0) + atomic.StoreUint64(&txn.EntriesSize, 0) } var hasMockAutoIncIDRetry = int64(0) @@ -223,7 +259,7 @@ func ResetMockAutoRandIDRetryCount(failTimes int64) { } // Commit overrides the Transaction interface. -func (txn *TxnState) Commit(ctx context.Context) error { +func (txn *LazyTxn) Commit(ctx context.Context) error { defer txn.reset() if len(txn.mutations) != 0 || txn.countHint() != 0 { logutil.BgLogger().Error("the code should never run here", @@ -233,6 +269,10 @@ func (txn *TxnState) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } + atomic.StoreInt32(&txn.State, txninfo.TxnCommitting) + + failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) + // mockCommitError8942 is used for PR #8942. failpoint.Inject("mockCommitError8942", func(val failpoint.Value) { if val.(bool) { @@ -259,17 +299,34 @@ func (txn *TxnState) Commit(ctx context.Context) error { } // Rollback overrides the Transaction interface. -func (txn *TxnState) Rollback() error { +func (txn *LazyTxn) Rollback() error { defer txn.reset() + atomic.StoreInt32(&txn.State, txninfo.TxnRollingBack) + // mockSlowRollback is used to mock a rollback which takes a long time + failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() } -func (txn *TxnState) reset() { +// LockKeys Wrap the inner transaction's `LockKeys` to record the status +func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { + originState := atomic.LoadInt32(&txn.State) + atomic.StoreInt32(&txn.State, txninfo.TxnLockWaiting) + t := time.Now() + atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(&t)) + err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) + atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txn.State, originState) + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + return err +} + +func (txn *LazyTxn) reset() { txn.cleanup() txn.changeToInvalid() } -func (txn *TxnState) cleanup() { +func (txn *LazyTxn) cleanup() { txn.cleanupStmtBuf() txn.initStmtBuf() for key := range txn.mutations { @@ -278,7 +335,7 @@ func (txn *TxnState) cleanup() { } // KeysNeedToLock returns the keys need to be locked. -func (txn *TxnState) KeysNeedToLock() ([]kv.Key, error) { +func (txn *LazyTxn) KeysNeedToLock() ([]kv.Key, error) { if txn.stagingHandle == kv.InvalidStagingHandle { return nil, nil } @@ -316,6 +373,32 @@ func keyNeedToLock(k, v []byte, flags tikvstore.KeyFlags) bool { return !isNonUniqueIndex } +// Info dump the TxnState to Datum for displaying in `TIDB_TRX` +// This function is supposed to be thread safe +func (txn *LazyTxn) Info() *txninfo.TxnInfo { + startTs := atomic.LoadUint64(&txn.infoStartTS) + if startTs == 0 { + return nil + } + return &txninfo.TxnInfo{ + StartTS: startTs, + State: atomic.LoadInt32(&txn.State), + BlockStartTime: (*time.Time)(atomic.LoadPointer(&txn.blockStartTime)), + EntriesCount: atomic.LoadUint64(&txn.EntriesCount), + EntriesSize: atomic.LoadUint64(&txn.EntriesSize), + } +} + +// UpdateEntriesCountAndSize updates the EntriesCount and EntriesSize +// Note this function is not thread safe, because +// txn.Transaction can be changed during this function's execution if running parallel. +func (txn *LazyTxn) UpdateEntriesCountAndSize() { + if txn.Valid() { + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + } +} + func getBinlogMutation(ctx sessionctx.Context, tableID int64) *binlog.TableMutation { bin := binloginfo.GetPrewriteValue(ctx, true) for i := range bin.Mutations { diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go new file mode 100644 index 0000000000000..77a2d8c90cd05 --- /dev/null +++ b/session/txninfo/txn_info.go @@ -0,0 +1,96 @@ +// Copyright 2021 PingCAP, Inc. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txninfo + +import ( + "time" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/types" +) + +// TxnRunningState is the current state of a transaction +type TxnRunningState = int32 + +const ( + // TxnRunningNormal means the transaction is running normally + TxnRunningNormal TxnRunningState = iota + // TxnLockWaiting means the transaction is blocked on a lock + TxnLockWaiting + // TxnCommitting means the transaction is (at least trying to) committing + TxnCommitting + // TxnRollingBack means the transaction is rolling back + TxnRollingBack +) + +// TxnRunningStateStrs is the names of the TxnRunningStates +var TxnRunningStateStrs = []string{ + "Normal", "LockWaiting", "Committing", "RollingBack", +} + +// TxnInfo is information about a running transaction +// This is supposed to be the datasource of `TIDB_TRX` in infoschema +type TxnInfo struct { + StartTS uint64 + // digest of SQL current running + CurrentSQLDigest string + // current executing State + State TxnRunningState + // last trying to block start time + BlockStartTime *time.Time + // How many entries are in MemDB + EntriesCount uint64 + // MemDB used memory + EntriesSize uint64 + + // the following fields will be filled in `session` instead of `LazyTxn` + + // Which session this transaction belongs to + ConnectionID uint64 + // The user who open this session + Username string + // The schema this transaction works on + CurrentDB string +} + +// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table +func (info *TxnInfo) ToDatum() []types.Datum { + humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) + var blockStartTime interface{} + if info.BlockStartTime == nil { + blockStartTime = nil + } else { + blockStartTime = types.NewTime(types.FromGoTime(*info.BlockStartTime), mysql.TypeTimestamp, 0) + } + e, err := types.ParseEnumValue(TxnRunningStateStrs, uint64(info.State+1)) + if err != nil { + panic("this should never happen") + } + state := types.NewMysqlEnumDatum(e) + datums := types.MakeDatums( + info.StartTS, + types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, 0), + info.CurrentSQLDigest, + ) + datums = append(datums, state) + datums = append(datums, types.MakeDatums( + blockStartTime, + info.EntriesCount, + info.EntriesSize, + info.ConnectionID, + info.Username, + info.CurrentDB)...) + return datums +} diff --git a/util/processinfo.go b/util/processinfo.go index 29716d914c3de..ebbf17094b80d 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/execdetails" @@ -161,6 +162,7 @@ func serverStatus2Str(state uint16) string { // kill statement rely on this interface. type SessionManager interface { ShowProcessList() map[uint64]*ProcessInfo + ShowTxnList() []*txninfo.TxnInfo GetProcessInfo(id uint64) (*ProcessInfo, bool) Kill(connectionID uint64, query bool) KillAllConnections()