Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: fix issue of topsql failed catch the running SQL when topsql is enabled in execution #33861

Merged
merged 55 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a63c65b
init
crazycs520 Apr 6, 2022
f216da0
tag tag when exec
crazycs520 Apr 6, 2022
7405de7
set req tag when exec
crazycs520 Apr 6, 2022
11f8be3
record sql meta
crazycs520 Apr 6, 2022
f4dedc8
remove redundant attach tag for execute prepare stmt
crazycs520 Apr 6, 2022
33e2f29
remove attach sql&plan when plan is nil
crazycs520 Apr 6, 2022
67fbc8f
refine code
crazycs520 Apr 7, 2022
0137849
add comment and remove debug log
crazycs520 Apr 7, 2022
f7bc22b
always attach tag
crazycs520 Apr 7, 2022
1112d3c
tiny refine
crazycs520 Apr 8, 2022
e05043b
fix test
crazycs520 Apr 8, 2022
8a7cdfa
optmize for fast plan
crazycs520 Apr 8, 2022
ed1c56a
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
crazycs520 Apr 8, 2022
3fd11c6
remove debug metric and fmt
crazycs520 Apr 8, 2022
c1e41bc
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
crazycs520 Apr 8, 2022
f406d0e
tiny refine
crazycs520 Apr 11, 2022
70c2f3f
tiny refine
crazycs520 Apr 11, 2022
24bd016
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
crazycs520 Apr 11, 2022
57a6506
fix lint
crazycs520 Apr 11, 2022
3e17956
add test
crazycs520 Apr 11, 2022
e448431
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
crazycs520 Apr 12, 2022
6483077
always set tag for req
crazycs520 Apr 13, 2022
ee992f7
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 13, 2022
37e1a52
add comment
crazycs520 Apr 13, 2022
3b68826
Merge branch 'topsql-catch-running1' of https://github.com/crazycs520…
crazycs520 Apr 13, 2022
b6c307b
add comment
crazycs520 Apr 13, 2022
0676fb9
address comment
crazycs520 Apr 13, 2022
4dfa429
Revert "add comment"
crazycs520 Apr 13, 2022
3c2152a
refine test
crazycs520 Apr 13, 2022
ab7b3d3
address comment
crazycs520 Apr 14, 2022
4981f8c
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
crazycs520 Apr 14, 2022
663eb93
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 14, 2022
772c24f
fix test
crazycs520 Apr 14, 2022
c01ae67
Merge branch 'topsql-catch-running1' of https://github.com/crazycs520…
crazycs520 Apr 14, 2022
24e7c33
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 15, 2022
1d43b59
add comment
crazycs520 Apr 15, 2022
2002f46
add OnExecutionBegin
crazycs520 Apr 15, 2022
4af1e45
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 18, 2022
0e64ae3
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 19, 2022
de71fd4
add test
crazycs520 Apr 20, 2022
a7f41bc
rename api
crazycs520 Apr 20, 2022
334b85e
inline and remove function
crazycs520 Apr 20, 2022
22bcad1
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 20, 2022
48e980b
fix lint
crazycs520 Apr 20, 2022
98dd8c3
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 20, 2022
fb64e1d
Merge branch 'master' into topsql-catch-running1
crazycs520 Apr 22, 2022
1b94423
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 24, 2022
eb773bc
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 24, 2022
da2673e
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 24, 2022
b965eb4
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 24, 2022
095f4df
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 25, 2022
43c5da3
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 25, 2022
cc5f652
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 25, 2022
0dd6dab
Merge branch 'master' into topsql-catch-running1
ti-chi-bot Apr 25, 2022
3da532e
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
crazycs520 Apr 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,9 @@ func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if job.Query != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query)
w.cacheSQL = job.Query
w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false)
} else {
topsql.AttachSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
topsql.AttachSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false)
}
}

Expand Down
3 changes: 1 addition & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
Expand Down Expand Up @@ -257,7 +256,7 @@ func init() {
// WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the
// number of SQL executions of each TiKV (if any).
func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context {
if topsqlstate.TopSQLEnabled() && stmtCtx.KvExecCounter != nil {
if stmtCtx.KvExecCounter != nil {
// Unlike calling Transaction or Snapshot interface, in distsql package we directly
// face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of
// calling SetRPCInterceptor on Transaction or Snapshot.
Expand Down
8 changes: 6 additions & 2 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc"
)

// RequestBuilder is used to build a "kv.Request".
Expand Down Expand Up @@ -307,8 +308,11 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde

// SetResourceGroupTagger sets the request resource group tagger.
func (builder *RequestBuilder) SetResourceGroupTagger(sc *stmtctx.StatementContext) *RequestBuilder {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if topsqlstate.TopSQLEnabled() {
builder.Request.ResourceGroupTagger = sc.GetResourceGroupTagger()
tagger := sc.GetResourceGroupTagger()
builder.Request.ResourceGroupTagger = func(req *tikvrpc.Request) {
if topsqlstate.TopSQLEnabled() {
tagger(req)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return builder
}
Expand Down
78 changes: 53 additions & 25 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -234,8 +235,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
ctx = a.observeStmtBeginForTopSQL(ctx)
startTs := uint64(math.MaxUint64)
err := a.Ctx.InitTxnWithStartTS(startTs)
if err != nil {
Expand Down Expand Up @@ -332,17 +332,19 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
return a.InfoSchema.SchemaMetaVersion(), nil
}

func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context {
if !topsqlstate.TopSQLEnabled() {
return ctx
func isFastPlan(p plannercore.Plan) bool {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if proj, ok := p.(*plannercore.PhysicalProjection); ok {
p = proj.Children()[0]
}
vars := a.Ctx.GetSessionVars()
normalizedSQL, sqlDigest := vars.StmtCtx.SQLDigest()
normalizedPlan, planDigest := getPlanDigest(a.Ctx, a.Plan)
if len(normalizedPlan) == 0 {
return ctx
switch p.(type) {
case *plannercore.PointGetPlan:
return true
case *plannercore.PhysicalTableDual:
return true
case *plannercore.Set:
return true
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
return topsql.AttachSQLInfo(ctx, normalizedSQL, sqlDigest, normalizedPlan, planDigest, vars.InRestrictedSQL)
return false
}

// Exec builds an Executor from a plan. If the Executor doesn't return result,
Expand Down Expand Up @@ -404,8 +406,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return nil, err
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
ctx = a.observeStmtBeginForTopSQL(ctx)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand Down Expand Up @@ -1055,7 +1056,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
_, planDigest := getPlanDigest(sessVars.StmtCtx, a.Plan)
slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Expand Down Expand Up @@ -1163,8 +1164,7 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string {
}

// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (string, *parser.Digest) {
sc := sctx.GetSessionVars().StmtCtx
func getPlanDigest(sc *stmtctx.StatementContext, p plannercore.Plan) (string, *parser.Digest) {
normalized, planDigest := sc.GetPlanDigest()
if len(normalized) > 0 && planDigest != nil {
return normalized, planDigest
Expand Down Expand Up @@ -1253,11 +1253,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
_, planDigest := getPlanDigest(stmtCtx, a.Plan)
return planDigest.String()
}
} else {
_, tmp := getPlanDigest(a.Ctx, a.Plan)
_, tmp := getPlanDigest(stmtCtx, a.Plan)
planDigest = tmp.String()
}

Expand Down Expand Up @@ -1337,17 +1337,45 @@ func (a *ExecStmt) GetTextToLog() string {
return sql
}

func (a *ExecStmt) observeStmtBeginForTopSQL() {
func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Context {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note (no need to change in this PR): a better API design should be a hook framework for statement begin and statement finish events, instead of something xxxForTopSQL. (The observer pattern)

cc @mornyx @zhongzc

vars := a.Ctx.GetSessionVars()
if vars == nil {
return
sc := vars.StmtCtx
normalizedSQL, sqlDigest := sc.SQLDigest()
normalizedPlan, planDigest := getPlanDigest(sc, a.Plan)
var sqlDigestByte, planDigestByte []byte
if sqlDigest != nil {
sqlDigestByte = sqlDigest.Bytes()
}
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
stats.OnExecutionBegin(sqlDigest, planDigest)
if planDigest != nil {
planDigestByte = planDigest.Bytes()
}
stats := a.Ctx.GetStmtStats()
if !topsqlstate.TopSQLEnabled() {
if isFastPlan(a.Plan) {
return ctx
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if stats != nil {
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
}
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnExecutionBegin will be skipped executed after this return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since it has a performance impact when TopSQL is disabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there are potential performance improvements here @mornyx @zhongzc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my sysbench point-get test, OnExecutionBegin and OnExecutionFinished have around %3 QPS impact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the previous implementation:

func (a *ExecStmt) observeStmtBeginForTopSQL() {
	vars := a.Ctx.GetSessionVars()
	if vars == nil {
		return
	}
	if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
		sqlDigest, planDigest := a.getSQLPlanDigest()
		stats.OnExecutionBegin(sqlDigest, planDigest)
		// This is a special logic prepared for TiKV's SQLExecCount.
		vars.StmtCtx.KvExecCounter = stats.CreateKvExecCounter(sqlDigest, planDigest)
	}
}

Note if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled(). This means that the overhead of OnExecutionBegin() does not exist before when TopSQL is not enabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about let's shard the lock into N? It should be very easy to implement and effectively enough to verify the lock cost.

Can do a test, I think it will work. Since the session is serial, the only scenario where lock contention occurs is the collection of background goroutine. So in theory, segmented locks can effectively reduce the possibility of sessions being blocked by background goroutine.

Copy link
Contributor

@zhongzc zhongzc Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not from lock. I tried to remove all locks previously and found the performance impact was not reduced much.

Copy link
Contributor

@mornyx mornyx Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not from lock. I tried to remove all locks previously and found the performance impact was not reduced much.

This seems a bit strange, the logic of OnExecutionBegin is very simple (get an item from the map, then increment it). This shouldn't cause a 3% QPS regression... cc @crazycs520

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map-assign and map-access have become the main cause. A more complicated lock implementation will introduce extra overhead and worse performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would a single map-assign and access cause this kind of drop? It doesn't make sense.

}

if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
// This is a special logic prepared for TiKV's SQLExecCount.
vars.StmtCtx.KvExecCounter = stats.CreateKvExecCounter(sqlDigest, planDigest)
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
}

isSQLRegistered := sc.IsSQLRegistered.Load()
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if !isSQLRegistered {
topsql.RegisterSQL(normalizedSQL, sqlDigest, vars.InRestrictedSQL)
}
sc.IsSQLAndPlanRegistered.Store(true)
if len(normalizedPlan) == 0 {
return ctx
}
topsql.RegisterPlan(normalizedPlan, planDigest)
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
}

func (a *ExecStmt) observeStmtFinishedForTopSQL() {
Expand Down
48 changes: 43 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -67,6 +68,7 @@ import (
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
tikvutil "github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -299,6 +301,9 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
if trace.IsEnabled() {
defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End()
}
if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CAS(false, true) {
registerSQLAndPlanInExecForTopSQL(sessVars)
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
err := e.Next(ctx, req)

if err != nil {
Expand Down Expand Up @@ -1797,8 +1802,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
goCtx = pprof.WithLabels(goCtx, pprof.Labels("sql", util.QueryStrForLog(prepareStmt.NormalizedSQL)))
pprof.SetGoroutineLabels(goCtx)
}
if topsqlstate.TopSQLEnabled() && prepareStmt.SQLDigest != nil {
topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil, vars.InRestrictedSQL)
if prepareStmt.SQLDigest != nil {
AttachSQLInfoForTopSQL(goCtx, sc, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, vars.InRestrictedSQL)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
if s, ok := prepareStmt.PreparedAst.Stmt.(*ast.SelectStmt); ok {
if s.LockInfo == nil {
Expand Down Expand Up @@ -1950,6 +1955,28 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
return
}

// AttachSQLInfoForTopSQL attach the sql information for Top SQL. It should be call as soon as possible after got the
// SQL digest.
func AttachSQLInfoForTopSQL(ctx context.Context, sc *stmtctx.StatementContext, normalizedSQL string, sqlDigest *parser.Digest, isInternal bool) context.Context {
if !topsqlstate.TopSQLEnabled() {
return ctx
}
sc.IsSQLRegistered.Store(true)
return topsql.AttachSQLInfo(ctx, normalizedSQL, sqlDigest, isInternal)
}

// registerSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution.
// This uses to catch the running SQL when Top SQL is enabled in execution.
func registerSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) {
stmtCtx := sessVars.StmtCtx
normalizedSQL, sqlDigest := stmtCtx.SQLDigest()
topsql.RegisterSQL(normalizedSQL, sqlDigest, sessVars.InRestrictedSQL)
normalizedPlan, planDigest := stmtCtx.GetPlanDigest()
if len(normalizedPlan) > 0 {
topsql.RegisterPlan(normalizedPlan, planDigest)
}
}

// ResetUpdateStmtCtx resets statement context for UpdateStmt.
func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars *variable.SessionVars) {
sc.InUpdateStmt = true
Expand Down Expand Up @@ -1993,15 +2020,26 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd
}

func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot != nil && topsqlstate.TopSQLEnabled() {
snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
if snapshot != nil {
snapshot.SetOption(kv.ResourceGroupTagger, getResourceGroupTagger(sc))
}
}

func getResourceGroupTagger(sc *stmtctx.StatementContext) tikvrpc.ResourceGroupTagger {
fn := sc.GetResourceGroupTagger()
var tagger tikvrpc.ResourceGroupTagger
tagger = func(req *tikvrpc.Request) {
if topsqlstate.TopSQLEnabled() {
fn(req)
}
}
return tagger
}

// setRPCInterceptorOfExecCounterForTxn binds an interceptor for client-go to count
// the number of SQL executions of each TiKV.
func setRPCInterceptorOfExecCounterForTxn(vars *variable.SessionVars, snapshot kv.Snapshot) {
if snapshot != nil && topsqlstate.TopSQLEnabled() && vars.StmtCtx.KvExecCounter != nil {
if snapshot != nil && vars.StmtCtx.KvExecCounter != nil {
snapshot.SetOption(kv.RPCInterceptor, vars.StmtCtx.KvExecCounter.RPCInterceptor())
}
}
Expand Down
6 changes: 1 addition & 5 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -196,9 +194,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
SchemaVersion: ret.InfoSchema.SchemaMetaVersion(),
}
normalizedSQL, digest := parser.NormalizeDigest(prepared.Stmt.Text())
if topsqlstate.TopSQLEnabled() {
ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, vars.InRestrictedSQL)
}
ctx = AttachSQLInfoForTopSQL(ctx, e.ctx.GetSessionVars().StmtCtx, normalizedSQL, digest, vars.InRestrictedSQL)

var (
normalizedSQL4PC, digest4PC string
Expand Down
16 changes: 7 additions & 9 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)

Expand Down Expand Up @@ -271,14 +270,13 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
if topsqlstate.TopSQLEnabled() {
txn, err := e.ctx.Txn(true)
if err == nil {
txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
if e.ctx.GetSessionVars().StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
txn.SetOption(kv.RPCInterceptor, e.ctx.GetSessionVars().StmtCtx.KvExecCounter.RPCInterceptor())
}
txn, err := e.ctx.Txn(true)
if err == nil {
sc := e.ctx.GetSessionVars().StmtCtx
txn.SetOption(kv.ResourceGroupTagger, getResourceGroupTagger(sc))
if sc.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
txn.SetOption(kv.RPCInterceptor, sc.KvExecCounter.RPCInterceptor())
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
Expand Down
2 changes: 1 addition & 1 deletion server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
if topsqlstate.TopSQLEnabled() {
prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID)
if prepareObj != nil && prepareObj.SQLDigest != nil {
ctx = topsql.AttachSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, "", nil, false)
ctx = topsql.AttachSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, false)
}
}
sql := ""
Expand Down
2 changes: 2 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,11 @@ func TestTopSQLCPUProfile(t *testing.T) {
}()

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachPlan", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachPlan"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop"))
}()

Expand Down
7 changes: 3 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,8 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter
if digest != nil {
// Reset the goroutine label when internal sql execute finish.
// Specifically reset in ExecRestrictedStmt function.
topsql.AttachSQLInfo(ctx, normalized, digest, "", nil, s.sessionVars.InRestrictedSQL)
s.sessionVars.StmtCtx.IsSQLRegistered.Store(true)
topsql.AttachSQLInfo(ctx, normalized, digest, s.sessionVars.InRestrictedSQL)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return stmts[0], nil
Expand Down Expand Up @@ -1826,9 +1827,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
return nil, err
}
normalizedSQL, digest := s.sessionVars.StmtCtx.SQLDigest()
if topsqlstate.TopSQLEnabled() {
ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, s.sessionVars.InRestrictedSQL)
}
ctx = executor.AttachSQLInfoForTopSQL(ctx, s.sessionVars.StmtCtx, normalizedSQL, digest, s.sessionVars.InRestrictedSQL)

if err := s.validateStatementReadOnlyInStaleness(stmtNode); err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ type StatementContext struct {

// RCCheckTS indicates the current read-consistency read select statement will use `RCCheckTS` path.
RCCheckTS bool

// IsSQLRegistered uses to indicate whether the SQL has been registered for TopSQL.
IsSQLRegistered atomic2.Bool
// IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL.
IsSQLAndPlanRegistered atomic2.Bool
Comment on lines +248 to +250
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it happen that IsSQLAndPlanRegistered == true while IsSQLRegistered == false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will happen, but it doesn't matter, the code can make sure if IsSQLAndPlanRegistered is true, then the SQL and plan must be registerd.

}

// StmtHints are SessionVars related sql hints.
Expand Down
Loading