diff --git a/br/pkg/kv/session.go b/br/pkg/kv/session.go index 663627d3dd440..9b1f453f0308b 100644 --- a/br/pkg/kv/session.go +++ b/br/pkg/kv/session.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/topsql/stmtstats" ) // Pair is a pair of key and value. @@ -251,3 +252,8 @@ func (se *session) Value(key fmt.Stringer) interface{} { // StmtAddDirtyTableOP implements the sessionctx.Context interface. func (se *session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {} + +// GetStmtStats implements the sessionctx.Context interface. +func (se *session) GetStmtStats() *stmtstats.StatementStats { + return nil +} diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index bab10d97be600..fb49514bcd879 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/topsql/stmtstats" "go.uber.org/zap" ) @@ -324,6 +325,11 @@ func (se *session) GetBuiltinFunctionUsage() map[string]uint32 { return make(map[string]uint32) } +// GetStmtStats implements the sessionctx.Context interface. +func (se *session) GetStmtStats() *stmtstats.StatementStats { + return nil +} + func (se *session) Close() { memBuf := &se.txn.kvMemBuf if memBuf.buf != nil { diff --git a/executor/adapter.go b/executor/adapter.go index 43573e7b721b9..fa96d82ea5e84 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1306,19 +1306,27 @@ func (a *ExecStmt) GetTextToLog() string { } func (a *ExecStmt) observeStmtBeginForTopSQL() { - if vars := a.Ctx.GetSessionVars(); topsqlstate.TopSQLEnabled() && vars.StmtStats != nil { + vars := a.Ctx.GetSessionVars() + if vars == nil { + return + } + if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() { sqlDigest, planDigest := a.getSQLPlanDigest() - vars.StmtStats.OnExecutionBegin(sqlDigest, planDigest) + stats.OnExecutionBegin(sqlDigest, planDigest) // This is a special logic prepared for TiKV's SQLExecCount. - vars.StmtCtx.KvExecCounter = vars.StmtStats.CreateKvExecCounter(sqlDigest, planDigest) + vars.StmtCtx.KvExecCounter = stats.CreateKvExecCounter(sqlDigest, planDigest) } } func (a *ExecStmt) observeStmtFinishedForTopSQL() { - if vars := a.Ctx.GetSessionVars(); topsqlstate.TopSQLEnabled() && vars.StmtStats != nil { + vars := a.Ctx.GetSessionVars() + if vars == nil { + return + } + if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() { sqlDigest, planDigest := a.getSQLPlanDigest() execDuration := time.Since(vars.StartTime) + vars.DurationParse - vars.StmtStats.OnExecutionFinished(sqlDigest, planDigest, execDuration) + stats.OnExecutionFinished(sqlDigest, planDigest, execDuration) } } diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 0840cceb86c0a..6f3221cd21388 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/topsql/stmtstats" ) // TiDBDriver implements IDriver. @@ -290,6 +291,11 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns return } +// GetStmtStats implements the sessionctx.Context interface. +func (tc *TiDBContext) GetStmtStats() *stmtstats.StatementStats { + return tc.Session.GetStmtStats() +} + type tidbResultSet struct { recordSet sqlexec.RecordSet columns []*ColumnInfo diff --git a/session/session.go b/session/session.go index 509ecfefb970c..7089008367636 100644 --- a/session/session.go +++ b/session/session.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" @@ -231,6 +232,13 @@ type session struct { builtinFunctionUsage telemetry.BuiltinFunctionsUsage // allowed when tikv disk full happened. diskFullOpt kvrpcpb.DiskFullOpt + + // StmtStats is used to count various indicators of each SQL in this session + // at each point in time. These data will be periodically taken away by the + // background goroutine. The background goroutine will continue to aggregate + // all the local data in each session, and finally report them to the remote + // regularly. + stmtStats *stmtstats.StatementStats } var parserPool = &sync.Pool{New: func() interface{} { return parser.New() }} @@ -2372,9 +2380,9 @@ func (s *session) Close() { s.RollbackTxn(ctx) if s.sessionVars != nil { s.sessionVars.WithdrawAllPreparedStmt() - if s.sessionVars.StmtStats != nil { - s.sessionVars.StmtStats.SetFinished() - } + } + if s.stmtStats != nil { + s.stmtStats.SetFinished() } s.ClearDiskFullOpt() } @@ -2768,6 +2776,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { client: store.GetClient(), mppClient: store.GetMPPClient(), builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage), + stmtStats: stmtstats.CreateStatementStats(), } if plannercore.PreparedPlanCacheEnabled() { if opt != nil && opt.PreparedPlanCache != nil { @@ -2801,6 +2810,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er client: store.GetClient(), mppClient: store.GetMPPClient(), builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage), + stmtStats: stmtstats.CreateStatementStats(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, @@ -3257,3 +3267,7 @@ func (s *session) GetBuiltinFunctionUsage() map[string]uint32 { func (s *session) getSnapshotInterceptor() kv.SnapshotInterceptor { return temptable.SessionSnapshotInterceptor(s) } + +func (s *session) GetStmtStats() *stmtstats.StatementStats { + return s.stmtStats +} diff --git a/sessionctx/context.go b/sessionctx/context.go index 313bb543c1b83..e8a3bda5146f9 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/sli" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tipb/go-binlog" "github.com/tikv/client-go/v2/oracle" ) @@ -140,6 +141,8 @@ type Context interface { // GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe. // Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using. GetBuiltinFunctionUsage() map[string]uint32 + // GetStmtStats returns stmtstats.StatementStats owned by implementation. + GetStmtStats() *stmtstats.StatementStats } type basicCtxType int diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 73c900b60a4d4..ae6d14df23f6b 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -51,7 +51,6 @@ import ( "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" - "github.com/pingcap/tidb/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/twmb/murmur3" @@ -998,13 +997,6 @@ type SessionVars struct { // EnablePaging indicates whether enable paging in coprocessor requests. EnablePaging bool - // StmtStats is used to count various indicators of each SQL in this session - // at each point in time. These data will be periodically taken away by the - // background goroutine. The background goroutine will continue to aggregate - // all the local data in each session, and finally report them to the remote - // regularly. - StmtStats *stmtstats.StatementStats - // ReadConsistency indicates the read consistency requirement. ReadConsistency ReadConsistencyLevel @@ -1244,7 +1236,6 @@ func NewSessionVars() *SessionVars { MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, EnablePlacementChecks: DefEnablePlacementCheck, Rng: utilMath.NewWithTime(), - StmtStats: stmtstats.CreateStatementStats(), StatsLoadSyncWait: StatsLoadSyncWait.Load(), } vars.KVVars = tikvstore.NewVariables(&vars.Killed) diff --git a/util/mock/context.go b/util/mock/context.go index 0e27bbb4a4b32..262191d3bfdc4 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tipb/go-binlog" "github.com/tikv/client-go/v2/tikv" ) @@ -342,6 +343,11 @@ func (c *Context) HasLockedTables() bool { func (c *Context) PrepareTSFuture(ctx context.Context) { } +// GetStmtStats implements the sessionctx.Context interface. +func (c *Context) GetStmtStats() *stmtstats.StatementStats { + return nil +} + // Close implements the sessionctx.Context interface. func (c *Context) Close() { } diff --git a/util/topsql/stmtstats/aggregator.go b/util/topsql/stmtstats/aggregator.go index 5fe752b12c3bd..97daaf0f5964b 100644 --- a/util/topsql/stmtstats/aggregator.go +++ b/util/topsql/stmtstats/aggregator.go @@ -59,23 +59,25 @@ func (m *aggregator) run() { case <-m.ctx.Done(): return case <-tick.C: - if state.TopSQLEnabled() { - m.aggregate() - } + m.aggregate(state.TopSQLEnabled()) } } } // aggregate data from all associated StatementStats. // If StatementStats has been closed, collect will remove it from the map. -func (m *aggregator) aggregate() { +func (m *aggregator) aggregate(take bool) { total := StatementStatsMap{} m.statsSet.Range(func(statsR, _ interface{}) bool { stats := statsR.(*StatementStats) if stats.Finished() { m.unregister(stats) + total.Merge(stats.Take()) + return true + } + if take { + total.Merge(stats.Take()) } - total.Merge(stats.Take()) return true }) if len(total) > 0 { diff --git a/util/topsql/stmtstats/aggregator_test.go b/util/topsql/stmtstats/aggregator_test.go index 3a1c246dc964d..a0149eed85a38 100644 --- a/util/topsql/stmtstats/aggregator_test.go +++ b/util/topsql/stmtstats/aggregator_test.go @@ -61,7 +61,7 @@ func Test_aggregator_register_collect(t *testing.T) { a.registerCollector(newMockCollector(func(data StatementStatsMap) { total.Merge(data) })) - a.aggregate() + a.aggregate(true) assert.NotEmpty(t, total) assert.Equal(t, uint64(1), total[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) assert.Equal(t, uint64(time.Millisecond.Nanoseconds()), total[SQLPlanDigest{SQLDigest: "SQL-1"}].SumDurationNs)