Skip to content

Commit

Permalink
topsql: optimize life cycle of stmtstats (pingcap#31727) (pingcap#31749)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jan 17, 2022
1 parent ca14998 commit 3a44f6d
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 23 deletions.
6 changes: 6 additions & 0 deletions br/pkg/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/topsql/stmtstats"
)

// Pair is a pair of key and value.
Expand Down Expand Up @@ -251,3 +252,8 @@ func (se *session) Value(key fmt.Stringer) interface{} {

// StmtAddDirtyTableOP implements the sessionctx.Context interface.
func (se *session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {}

// GetStmtStats implements the sessionctx.Context interface.
func (se *session) GetStmtStats() *stmtstats.StatementStats {
return nil
}
6 changes: 6 additions & 0 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -324,6 +325,11 @@ func (se *session) GetBuiltinFunctionUsage() map[string]uint32 {
return make(map[string]uint32)
}

// GetStmtStats implements the sessionctx.Context interface.
func (se *session) GetStmtStats() *stmtstats.StatementStats {
return nil
}

func (se *session) Close() {
memBuf := &se.txn.kvMemBuf
if memBuf.buf != nil {
Expand Down
18 changes: 13 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,19 +1306,27 @@ func (a *ExecStmt) GetTextToLog() string {
}

func (a *ExecStmt) observeStmtBeginForTopSQL() {
if vars := a.Ctx.GetSessionVars(); topsqlstate.TopSQLEnabled() && vars.StmtStats != nil {
vars := a.Ctx.GetSessionVars()
if vars == nil {
return
}
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
vars.StmtStats.OnExecutionBegin(sqlDigest, planDigest)
stats.OnExecutionBegin(sqlDigest, planDigest)
// This is a special logic prepared for TiKV's SQLExecCount.
vars.StmtCtx.KvExecCounter = vars.StmtStats.CreateKvExecCounter(sqlDigest, planDigest)
vars.StmtCtx.KvExecCounter = stats.CreateKvExecCounter(sqlDigest, planDigest)
}
}

func (a *ExecStmt) observeStmtFinishedForTopSQL() {
if vars := a.Ctx.GetSessionVars(); topsqlstate.TopSQLEnabled() && vars.StmtStats != nil {
vars := a.Ctx.GetSessionVars()
if vars == nil {
return
}
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
execDuration := time.Since(vars.StartTime) + vars.DurationParse
vars.StmtStats.OnExecutionFinished(sqlDigest, planDigest, execDuration)
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration)
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/topsql/stmtstats"
)

// TiDBDriver implements IDriver.
Expand Down Expand Up @@ -290,6 +291,11 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns
return
}

// GetStmtStats implements the sessionctx.Context interface.
func (tc *TiDBContext) GetStmtStats() *stmtstats.StatementStats {
return tc.Session.GetStmtStats()
}

type tidbResultSet struct {
recordSet sqlexec.RecordSet
columns []*ColumnInfo
Expand Down
20 changes: 17 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"

Expand Down Expand Up @@ -231,6 +232,13 @@ type session struct {
builtinFunctionUsage telemetry.BuiltinFunctionsUsage
// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt

// StmtStats is used to count various indicators of each SQL in this session
// at each point in time. These data will be periodically taken away by the
// background goroutine. The background goroutine will continue to aggregate
// all the local data in each session, and finally report them to the remote
// regularly.
stmtStats *stmtstats.StatementStats
}

var parserPool = &sync.Pool{New: func() interface{} { return parser.New() }}
Expand Down Expand Up @@ -2372,9 +2380,9 @@ func (s *session) Close() {
s.RollbackTxn(ctx)
if s.sessionVars != nil {
s.sessionVars.WithdrawAllPreparedStmt()
if s.sessionVars.StmtStats != nil {
s.sessionVars.StmtStats.SetFinished()
}
}
if s.stmtStats != nil {
s.stmtStats.SetFinished()
}
s.ClearDiskFullOpt()
}
Expand Down Expand Up @@ -2768,6 +2776,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
client: store.GetClient(),
mppClient: store.GetMPPClient(),
builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage),
stmtStats: stmtstats.CreateStatementStats(),
}
if plannercore.PreparedPlanCacheEnabled() {
if opt != nil && opt.PreparedPlanCache != nil {
Expand Down Expand Up @@ -2801,6 +2810,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
client: store.GetClient(),
mppClient: store.GetMPPClient(),
builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage),
stmtStats: stmtstats.CreateStatementStats(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand Down Expand Up @@ -3257,3 +3267,7 @@ func (s *session) GetBuiltinFunctionUsage() map[string]uint32 {
func (s *session) getSnapshotInterceptor() kv.SnapshotInterceptor {
return temptable.SessionSnapshotInterceptor(s)
}

func (s *session) GetStmtStats() *stmtstats.StatementStats {
return s.stmtStats
}
3 changes: 3 additions & 0 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tipb/go-binlog"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -140,6 +141,8 @@ type Context interface {
// GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe.
// Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using.
GetBuiltinFunctionUsage() map[string]uint32
// GetStmtStats returns stmtstats.StatementStats owned by implementation.
GetStmtStats() *stmtstats.StatementStats
}

type basicCtxType int
Expand Down
9 changes: 0 additions & 9 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/twmb/murmur3"
Expand Down Expand Up @@ -998,13 +997,6 @@ type SessionVars struct {
// EnablePaging indicates whether enable paging in coprocessor requests.
EnablePaging bool

// StmtStats is used to count various indicators of each SQL in this session
// at each point in time. These data will be periodically taken away by the
// background goroutine. The background goroutine will continue to aggregate
// all the local data in each session, and finally report them to the remote
// regularly.
StmtStats *stmtstats.StatementStats

// ReadConsistency indicates the read consistency requirement.
ReadConsistency ReadConsistencyLevel

Expand Down Expand Up @@ -1244,7 +1236,6 @@ func NewSessionVars() *SessionVars {
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
EnablePlacementChecks: DefEnablePlacementCheck,
Rng: utilMath.NewWithTime(),
StmtStats: stmtstats.CreateStatementStats(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
Expand Down
6 changes: 6 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tipb/go-binlog"
"github.com/tikv/client-go/v2/tikv"
)
Expand Down Expand Up @@ -342,6 +343,11 @@ func (c *Context) HasLockedTables() bool {
func (c *Context) PrepareTSFuture(ctx context.Context) {
}

// GetStmtStats implements the sessionctx.Context interface.
func (c *Context) GetStmtStats() *stmtstats.StatementStats {
return nil
}

// Close implements the sessionctx.Context interface.
func (c *Context) Close() {
}
Expand Down
12 changes: 7 additions & 5 deletions util/topsql/stmtstats/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,25 @@ func (m *aggregator) run() {
case <-m.ctx.Done():
return
case <-tick.C:
if state.TopSQLEnabled() {
m.aggregate()
}
m.aggregate(state.TopSQLEnabled())
}
}
}

// aggregate data from all associated StatementStats.
// If StatementStats has been closed, collect will remove it from the map.
func (m *aggregator) aggregate() {
func (m *aggregator) aggregate(take bool) {
total := StatementStatsMap{}
m.statsSet.Range(func(statsR, _ interface{}) bool {
stats := statsR.(*StatementStats)
if stats.Finished() {
m.unregister(stats)
total.Merge(stats.Take())
return true
}
if take {
total.Merge(stats.Take())
}
total.Merge(stats.Take())
return true
})
if len(total) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion util/topsql/stmtstats/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_aggregator_register_collect(t *testing.T) {
a.registerCollector(newMockCollector(func(data StatementStatsMap) {
total.Merge(data)
}))
a.aggregate()
a.aggregate(true)
assert.NotEmpty(t, total)
assert.Equal(t, uint64(1), total[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount)
assert.Equal(t, uint64(time.Millisecond.Nanoseconds()), total[SQLPlanDigest{SQLDigest: "SQL-1"}].SumDurationNs)
Expand Down

0 comments on commit 3a44f6d

Please sign in to comment.