Skip to content

Commit

Permalink
session: make TxnInfo() return even if process info is empty (#57044)…
Browse files Browse the repository at this point in the history
… (#57162)

close #57043
  • Loading branch information
ti-chi-bot authored Nov 12, 2024
1 parent fa9805b commit 4ecd2b8
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ tools/bin/revive:
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/mgechev/[email protected]

tools/bin/failpoint-ctl:
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/failpoint/failpoint-ctl@2eaa328
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/failpoint/failpoint-ctl@9b3b6e3

tools/bin/errdoc-gen:
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/errors/errdoc-gen@518f63d
Expand Down
12 changes: 7 additions & 5 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
zap.Int("id", task.id), zap.String("task", task.String()))

return wrapInBeginRollback(se, func(startTS uint64) error {
failpoint.InjectCall("scanRecordExec")
rs, err := p.copCtx.buildTableScan(p.ctx, startTS, task.startKey, task.excludedEndKey())
if err != nil {
return err
Expand Down Expand Up @@ -173,11 +174,12 @@ func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
return errors.Trace(err)
}
defer se.Rollback()
var startTS uint64
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS = sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()
txn, err := se.Txn()
if err != nil {
return err
}
startTS := txn.StartTS()
failpoint.InjectCall("wrapInBeginRollbackStartTS", startTS)
return f(startTS)
}

Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ go_test(
],
embed = [":ingest"],
flaky = True,
shard_count = 15,
shard_count = 16,
deps = [
"//config",
"//ddl/internal/session",
Expand Down
30 changes: 30 additions & 0 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,36 @@ func TestAddIndexIngestClientError(t *testing.T) {
tk.MustGetErrCode("create index i1 on t1((cast(f1 as unsigned array)));", errno.ErrInvalidJSONValueForFuncIndex)
}

func TestAddIndexSetInternalSessions(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer injectMockBackendMgr(t, store)()

tk.MustExec("set global tidb_enable_dist_task = 0;")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1);")
expectInternalTS := []uint64{}
actualInternalTS := []uint64{}
err := failpoint.EnableCall("github.com/pingcap/tidb/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) {
expectInternalTS = append(expectInternalTS, startTS)
})
require.NoError(t, err)
defer failpoint.Disable("github.com/pingcap/tidb/ddl/wrapInBeginRollbackStartTS")
err = failpoint.EnableCall("github.com/pingcap/tidb/ddl/scanRecordExec", func() {
mgr := tk.Session().GetSessionManager()
actualInternalTS = mgr.GetInternalSessionStartTSList()
})
require.NoError(t, err)
defer failpoint.Disable("github.com/pingcap/tidb/ddl/scanRecordExec")
tk.MustExec("alter table t add index idx(a);")
require.Len(t, expectInternalTS, 1)
for _, ts := range expectInternalTS {
require.Contains(t, actualInternalTS, ts)
}
}

func TestAddIndexCancelOnNoneState(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
2 changes: 2 additions & 0 deletions ddl/internal/session/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package session

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -83,6 +84,7 @@ func (sg *Pool) Put(ctx sessionctx.Context) {

// no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to
// Put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
ctx.RollbackTxn(context.Background())
sg.resPool.Put(ctx.(pools.Resource))
infosync.DeleteInternalSession(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,7 +2526,7 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
// Otherwise, you can see only your own transactions.
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
if !hasProcessPriv && loginUser != nil && info.ProcessInfo.Username != loginUser.Username {
continue
}
e.txnInfo = append(e.txnInfo, info)
Expand Down
16 changes: 10 additions & 6 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,9 +1571,11 @@ func TestTiDBTrx(t *testing.T) {
CurrentSQLDigest: digest.String(),
State: txninfo.TxnIdle,
EntriesCount: 1,
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
ProcessInfo: &txninfo.ProcessInfo{
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
},
}

blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local)
Expand All @@ -1582,9 +1584,11 @@ func TestTiDBTrx(t *testing.T) {
CurrentSQLDigest: "",
AllSQLDigests: []string{"sql1", "sql2", digest.String()},
State: txninfo.TxnLockAcquiring,
ConnectionID: 10,
Username: "user1",
CurrentDB: "db1",
ProcessInfo: &txninfo.ProcessInfo{
ConnectionID: 10,
Username: "user1",
CurrentDB: "db1",
},
}
sm.TxnInfo[1].BlockStartTime.Valid = true
sm.TxnInfo[1].BlockStartTime.Time = blockTime2
Expand Down
5 changes: 3 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,15 +818,16 @@ func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo {
return rs
}

// ShowTxnList shows all txn info for displaying in `TIDB_TRX`
// ShowTxnList shows all txn info for displaying in `TIDB_TRX`.
// Internal sessions are not taken into consideration.
func (s *Server) ShowTxnList() []*txninfo.TxnInfo {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
rs := make([]*txninfo.TxnInfo, 0, len(s.clients))
for _, client := range s.clients {
if client.ctx.Session != nil {
info := client.ctx.Session.TxnInfo()
if info != nil {
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
Expand Down
21 changes: 12 additions & 9 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) {
}

// TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only*
// Process field may not initialize if this is a session used internally.
func (s *session) TxnInfo() *txninfo.TxnInfo {
s.txn.mu.RLock()
// Copy on read to get a snapshot, this API shouldn't be frequently called.
Expand All @@ -543,17 +544,18 @@ func (s *session) TxnInfo() *txninfo.TxnInfo {

processInfo := s.ShowProcess()
if processInfo == nil {
return nil
return &txnInfo
}
txnInfo.ProcessInfo = &txninfo.ProcessInfo{
ConnectionID: processInfo.ID,
Username: processInfo.User,
CurrentDB: processInfo.DB,
RelatedTableIDs: make(map[int64]struct{}),
}
txnInfo.ConnectionID = processInfo.ID
txnInfo.Username = processInfo.User
txnInfo.CurrentDB = processInfo.DB
txnInfo.RelatedTableIDs = make(map[int64]struct{})
s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, value interface{}) bool {
txnInfo.RelatedTableIDs[key.(int64)] = struct{}{}
txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{}
return true
})

return &txnInfo
}

Expand Down Expand Up @@ -3852,9 +3854,10 @@ func GetStartTSFromSession(se interface{}) (uint64, uint64) {
txnInfo := tmp.TxnInfo()
if txnInfo != nil {
startTS = txnInfo.StartTS
processInfoID = txnInfo.ConnectionID
if txnInfo.ProcessInfo != nil {
processInfoID = txnInfo.ProcessInfo.ConnectionID
}
}

logutil.BgLogger().Debug(
"GetStartTSFromSession getting startTS of internal session",
zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS)))
Expand Down
29 changes: 24 additions & 5 deletions session/txninfo/txn_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,12 @@ type TxnInfo struct {
// How many entries are in MemDB
EntriesCount uint64

// The following fields will be filled in `session` instead of `LazyTxn`
// The following field will be filled in `session` instead of `LazyTxn`
ProcessInfo *ProcessInfo
}

// ProcessInfo is part of fields of txnInfo, which will be filled in `session` instead of `LazyTxn`
type ProcessInfo struct {
// Which session this transaction belongs to
ConnectionID uint64
// The user who open this session
Expand Down Expand Up @@ -217,13 +221,25 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
return types.NewDatum(info.EntriesCount)
},
SessionIDStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.ConnectionID)
var connectionID uint64
if info.ProcessInfo != nil {
connectionID = info.ProcessInfo.ConnectionID
}
return types.NewDatum(connectionID)
},
UserStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.Username)
var userName string
if info.ProcessInfo != nil {
userName = info.ProcessInfo.Username
}
return types.NewDatum(userName)
},
DBStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.CurrentDB)
var currentDB string
if info.ProcessInfo != nil {
currentDB = info.ProcessInfo.CurrentDB
}
return types.NewDatum(currentDB)
},
AllSQLDigestsStr: func(info *TxnInfo) types.Datum {
allSQLDigests := info.AllSQLDigests
Expand All @@ -239,7 +255,10 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
return types.NewDatum(string(res))
},
RelatedTableIDsStr: func(info *TxnInfo) types.Datum {
relatedTableIDs := info.RelatedTableIDs
var relatedTableIDs map[int64]struct{}
if info.ProcessInfo != nil {
relatedTableIDs = info.ProcessInfo.RelatedTableIDs
}
str := strings.Builder{}
first := true
for tblID := range relatedTableIDs {
Expand Down
1 change: 0 additions & 1 deletion testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ go_library(
"//resourcemanager",
"//session",
"//session/txninfo",
"//sessionctx",
"//sessionctx/variable",
"//store/driver",
"//store/mockstore",
Expand Down
19 changes: 11 additions & 8 deletions testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
)

Expand All @@ -50,7 +49,7 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn))
for _, se := range msm.Conn {
info := se.TxnInfo()
if info != nil {
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
Expand Down Expand Up @@ -143,12 +142,16 @@ func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 {
defer msm.mu.Unlock()
ret := make([]uint64, 0, len(msm.internalSessions))
for internalSess := range msm.internalSessions {
se := internalSess.(sessionctx.Context)
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS := sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()
ret = append(ret, startTS)
// Ref the implementation of `GetInternalSessionStartTSList` on the real session manager. The `TxnInfo` is more
// accurate, because if a session is pending, the `StartTS` in `sessVars.TxnCtx` will not be updated. For example,
// if there is not DDL for a long time, the minimal internal session start ts will not have any progress.
if se, ok := internalSess.(interface{ TxnInfo() *txninfo.TxnInfo }); ok {
txn := se.TxnInfo()
if txn != nil {
ret = append(ret, txn.StartTS)
}
continue
}
}
return ret
}
Expand Down
6 changes: 3 additions & 3 deletions tests/realtikvtest/txntest/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func TestBasicTxnState(t *testing.T) {
require.Equal(t, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String()}, info.AllSQLDigests)

// len and size will be covered in TestLenAndSize
require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ConnectionID)
require.Equal(t, "", info.Username)
require.Equal(t, "test", info.CurrentDB)
require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ProcessInfo.ConnectionID)
require.Equal(t, "", info.ProcessInfo.Username)
require.Equal(t, "test", info.ProcessInfo.CurrentDB)
require.Equal(t, startTS, info.StartTS)

require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause"))
Expand Down

0 comments on commit 4ecd2b8

Please sign in to comment.