diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 57058eb38a205..cae3be063dd60 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -541,9 +541,9 @@ func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) { if job.Query != w.cacheSQL || w.cacheDigest == nil { w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query) w.cacheSQL = job.Query - w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false) + w.ddlJobCtx = topsql.AttachAndRegisterSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false) } else { - topsql.AttachSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, "", nil, false) + topsql.AttachAndRegisterSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false) } } diff --git a/distsql/distsql.go b/distsql/distsql.go index 06104f0e368f9..357eaa02c1b77 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/tikvrpc/interceptor" @@ -257,7 +256,7 @@ func init() { // WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the // number of SQL executions of each TiKV (if any). func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context { - if topsqlstate.TopSQLEnabled() && stmtCtx.KvExecCounter != nil { + if stmtCtx.KvExecCounter != nil { // Unlike calling Transaction or Snapshot interface, in distsql package we directly // face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of // calling SetRPCInterceptor on Transaction or Snapshot. diff --git a/distsql/request_builder.go b/distsql/request_builder.go index e36a7086761e3..313df4cab23e4 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -36,8 +36,8 @@ import ( "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/tikvrpc" ) // RequestBuilder is used to build a "kv.Request". @@ -263,7 +263,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.TaskID = sv.StmtCtx.TaskID builder.Request.Priority = builder.getKVPriority(sv) builder.Request.ReplicaRead = replicaReadType - builder.SetResourceGroupTagger(sv.StmtCtx) + builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger()) return builder } @@ -306,10 +306,8 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde } // SetResourceGroupTagger sets the request resource group tagger. -func (builder *RequestBuilder) SetResourceGroupTagger(sc *stmtctx.StatementContext) *RequestBuilder { - if topsqlstate.TopSQLEnabled() { - builder.Request.ResourceGroupTagger = sc.GetResourceGroupTagger() - } +func (builder *RequestBuilder) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) *RequestBuilder { + builder.Request.ResourceGroupTagger = tagger return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 8d571656fadd4..3a2c7e60b200f 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -274,6 +274,7 @@ func TestRequestBuilder1(t *testing.T) { ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } + actual.ResourceGroupTagger = nil require.Equal(t, expect, actual) } @@ -355,6 +356,7 @@ func TestRequestBuilder2(t *testing.T) { ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } + actual.ResourceGroupTagger = nil require.Equal(t, expect, actual) } @@ -402,6 +404,7 @@ func TestRequestBuilder3(t *testing.T) { ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } + actual.ResourceGroupTagger = nil require.Equal(t, expect, actual) } @@ -449,6 +452,7 @@ func TestRequestBuilder4(t *testing.T) { ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } + actual.ResourceGroupTagger = nil require.Equal(t, expect, actual) } @@ -559,6 +563,7 @@ func TestRequestBuilder7(t *testing.T) { ReplicaRead: replicaRead.replicaReadType, ReadReplicaScope: kv.GlobalReplicaScope, } + actual.ResourceGroupTagger = nil require.Equal(t, expect, actual) }) } @@ -581,6 +586,7 @@ func TestRequestBuilder8(t *testing.T) { SchemaVar: 0, ReadReplicaScope: kv.GlobalReplicaScope, } + actual.ResourceGroupTagger = nil require.Equal(t, expect, actual) } diff --git a/executor/adapter.go b/executor/adapter.go index 4bee8dbf507b0..728237c5dd555 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -45,6 +45,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/sessiontxn/staleread" @@ -230,7 +231,6 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } - failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() { sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true) // stale read should not reach here @@ -238,8 +238,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is) }) - ctx = a.setPlanLabelForTopSQL(ctx) - a.observeStmtBeginForTopSQL() + ctx = a.observeStmtBeginForTopSQL(ctx) startTs := uint64(math.MaxUint64) err := a.Ctx.InitTxnWithStartTS(startTs) if err != nil { @@ -330,17 +329,26 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { return a.InfoSchema.SchemaMetaVersion(), nil } -func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context { - if !topsqlstate.TopSQLEnabled() { - return ctx +// IsFastPlan exports for testing. +func IsFastPlan(p plannercore.Plan) bool { + if proj, ok := p.(*plannercore.PhysicalProjection); ok { + p = proj.Children()[0] } - vars := a.Ctx.GetSessionVars() - normalizedSQL, sqlDigest := vars.StmtCtx.SQLDigest() - normalizedPlan, planDigest := getPlanDigest(a.Ctx, a.Plan) - if len(normalizedPlan) == 0 { - return ctx + switch p.(type) { + case *plannercore.PointGetPlan: + return true + case *plannercore.PhysicalTableDual: + // Plan of following SQL is PhysicalTableDual: + // select 1; + // select @@autocommit; + return true + case *plannercore.Set: + // Plan of following SQL is Set: + // set @a=1; + // set @@autocommit=1; + return true } - return topsql.AttachSQLInfo(ctx, normalizedSQL, sqlDigest, normalizedPlan, planDigest, vars.InRestrictedSQL) + return false } // Exec builds an Executor from a plan. If the Executor doesn't return result, @@ -407,8 +415,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { return nil, err } // ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`. - ctx = a.setPlanLabelForTopSQL(ctx) - a.observeStmtBeginForTopSQL() + ctx = a.observeStmtBeginForTopSQL(ctx) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -1058,7 +1065,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { statsInfos := plannercore.GetStatsInfo(a.Plan) memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed() - _, planDigest := getPlanDigest(a.Ctx, a.Plan) + _, planDigest := getPlanDigest(sessVars.StmtCtx, a.Plan) slowItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql.String(), @@ -1166,8 +1173,7 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { } // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (string, *parser.Digest) { - sc := sctx.GetSessionVars().StmtCtx +func getPlanDigest(sc *stmtctx.StatementContext, p plannercore.Plan) (string, *parser.Digest) { normalized, planDigest := sc.GetPlanDigest() if len(normalized) > 0 && planDigest != nil { return normalized, planDigest @@ -1256,11 +1262,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) { var planDigestGen func() string if a.Plan.TP() == plancodec.TypePointGet { planDigestGen = func() string { - _, planDigest := getPlanDigest(a.Ctx, a.Plan) + _, planDigest := getPlanDigest(stmtCtx, a.Plan) return planDigest.String() } } else { - _, tmp := getPlanDigest(a.Ctx, a.Plan) + _, tmp := getPlanDigest(stmtCtx, a.Plan) planDigest = tmp.String() } @@ -1340,17 +1346,50 @@ func (a *ExecStmt) GetTextToLog() string { return sql } -func (a *ExecStmt) observeStmtBeginForTopSQL() { +func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Context { vars := a.Ctx.GetSessionVars() - if vars == nil { - return + sc := vars.StmtCtx + normalizedSQL, sqlDigest := sc.SQLDigest() + normalizedPlan, planDigest := getPlanDigest(sc, a.Plan) + var sqlDigestByte, planDigestByte []byte + if sqlDigest != nil { + sqlDigestByte = sqlDigest.Bytes() } - if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() { - sqlDigest, planDigest := a.getSQLPlanDigest() - stats.OnExecutionBegin(sqlDigest, planDigest) + if planDigest != nil { + planDigestByte = planDigest.Bytes() + } + stats := a.Ctx.GetStmtStats() + if !topsqlstate.TopSQLEnabled() { + // To reduce the performance impact on fast plan. + // Drop them does not cause notable accuracy issue in TopSQL. + if IsFastPlan(a.Plan) { + return ctx + } + // Always attach the SQL and plan info uses to catch the running SQL when Top SQL is enabled in execution. + if stats != nil { + stats.OnExecutionBegin(sqlDigestByte, planDigestByte) + // This is a special logic prepared for TiKV's SQLExecCount. + sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte) + } + return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) + } + + if stats != nil { + stats.OnExecutionBegin(sqlDigestByte, planDigestByte) // This is a special logic prepared for TiKV's SQLExecCount. - vars.StmtCtx.KvExecCounter = stats.CreateKvExecCounter(sqlDigest, planDigest) + sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte) + } + + isSQLRegistered := sc.IsSQLRegistered.Load() + if !isSQLRegistered { + topsql.RegisterSQL(normalizedSQL, sqlDigest, vars.InRestrictedSQL) + } + sc.IsSQLAndPlanRegistered.Store(true) + if len(normalizedPlan) == 0 { + return ctx } + topsql.RegisterPlan(normalizedPlan, planDigest) + return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) } func (a *ExecStmt) observeStmtFinishedForTopSQL() { diff --git a/executor/analyze.go b/executor/analyze.go index fbaad341ccf42..f8cf4626f6410 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -484,7 +484,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang } else { kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges) } - kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx) + kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) kvReq, err := kvReqBuilder. SetAnalyzeRequest(e.analyzePB). SetStartTS(e.snapshot). @@ -857,7 +857,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil) - builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx) + builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) // Always set KeepOrder of the request to be true, in order to compute // correct `correlation` of columns. kvReq, err := reqBuilder. diff --git a/executor/checksum.go b/executor/checksum.go index 013fd3be2226f..4c62176b9116f 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -241,7 +241,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6 } var builder distsql.RequestBuilder - builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx) + builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil). SetChecksumRequest(checksum). SetStartTS(c.StartTs). @@ -258,7 +258,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6 ranges := ranger.FullRange() var builder distsql.RequestBuilder - builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx) + builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges). SetChecksumRequest(checksum). SetStartTS(c.StartTs). diff --git a/executor/executor.go b/executor/executor.go index 6361420b19614..b4009fb6489b1 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -300,6 +300,9 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error { if trace.IsEnabled() { defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End() } + if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CAS(false, true) { + registerSQLAndPlanInExecForTopSQL(sessVars) + } err := e.Next(ctx, req) if err != nil { @@ -1799,7 +1802,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { pprof.SetGoroutineLabels(goCtx) } if topsqlstate.TopSQLEnabled() && prepareStmt.SQLDigest != nil { - topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil, vars.InRestrictedSQL) + sc.IsSQLRegistered.Store(true) + topsql.AttachAndRegisterSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, vars.InRestrictedSQL) } if s, ok := prepareStmt.PreparedAst.Stmt.(*ast.SelectStmt); ok { if s.LockInfo == nil { @@ -1951,6 +1955,18 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { return } +// registerSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution. +// This uses to catch the running SQL when Top SQL is enabled in execution. +func registerSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) { + stmtCtx := sessVars.StmtCtx + normalizedSQL, sqlDigest := stmtCtx.SQLDigest() + topsql.RegisterSQL(normalizedSQL, sqlDigest, sessVars.InRestrictedSQL) + normalizedPlan, planDigest := stmtCtx.GetPlanDigest() + if len(normalizedPlan) > 0 { + topsql.RegisterPlan(normalizedPlan, planDigest) + } +} + // ResetUpdateStmtCtx resets statement context for UpdateStmt. func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars *variable.SessionVars) { sc.InUpdateStmt = true @@ -1994,7 +2010,7 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd } func setOptionForTopSQL(sc *stmtctx.StatementContext, snapshot kv.Snapshot) { - if snapshot == nil || !topsqlstate.TopSQLEnabled() { + if snapshot == nil { return } snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger()) diff --git a/executor/executor_test.go b/executor/executor_test.go index 57714dd792fbc..8554286da0d35 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5939,3 +5939,39 @@ func TestSummaryFailedUpdate(t *testing.T) { tk.MustExec("set @@tidb_mem_quota_query=1000000000") tk.MustQuery("select stmt_type from information_schema.statements_summary where digest_text = 'update `t` set `t` . `a` = `t` . `a` - ? where `t` . `a` in ( select `a` from `t` where `a` < ? )'").Check(testkit.Rows("Update")) } + +func TestIsFastPlan(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int)") + + cases := []struct { + sql string + isFastPlan bool + }{ + {"select a from t where id=1", true}, + {"select a+id from t where id=1", true}, + {"select 1", true}, + {"select @@autocommit", true}, + {"set @@autocommit=1", true}, + {"set @a=1", true}, + {"select * from t where a=1", false}, + {"select * from t", false}, + } + + for _, ca := range cases { + if strings.HasPrefix(ca.sql, "select") { + tk.MustQuery(ca.sql) + } else { + tk.MustExec(ca.sql) + } + info := tk.Session().ShowProcess() + require.NotNil(t, info) + p, ok := info.Plan.(plannercore.Plan) + require.True(t, ok) + ok = executor.IsFastPlan(p) + require.Equal(t, ca.isFastPlan, ok) + } +} diff --git a/executor/prepared.go b/executor/prepared.go index b807ed5e5f922..66dd5c38ee073 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -198,7 +198,8 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { } normalizedSQL, digest := parser.NormalizeDigest(prepared.Stmt.Text()) if topsqlstate.TopSQLEnabled() { - ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, vars.InRestrictedSQL) + e.ctx.GetSessionVars().StmtCtx.IsSQLRegistered.Store(true) + ctx = topsql.AttachAndRegisterSQLInfo(ctx, normalizedSQL, digest, vars.InRestrictedSQL) } var ( diff --git a/executor/update.go b/executor/update.go index ed5c76f44c278..faf5b1d15e1bf 100644 --- a/executor/update.go +++ b/executor/update.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -271,14 +270,13 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } - if topsqlstate.TopSQLEnabled() { - txn, err := e.ctx.Txn(true) - if err == nil { - txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) - if e.ctx.GetSessionVars().StmtCtx.KvExecCounter != nil { - // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. - txn.SetOption(kv.RPCInterceptor, e.ctx.GetSessionVars().StmtCtx.KvExecCounter.RPCInterceptor()) - } + txn, err := e.ctx.Txn(true) + if err == nil { + sc := e.ctx.GetSessionVars().StmtCtx + txn.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger()) + if sc.KvExecCounter != nil { + // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. + txn.SetOption(kv.RPCInterceptor, sc.KvExecCounter.RPCInterceptor()) } } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { diff --git a/server/conn_stmt.go b/server/conn_stmt.go index f1e519eecbe55..4e9191b2af514 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -281,7 +281,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err if topsqlstate.TopSQLEnabled() { prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID) if prepareObj != nil && prepareObj.SQLDigest != nil { - ctx = topsql.AttachSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, "", nil, false) + ctx = topsql.AttachAndRegisterSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, false) } } sql := "" diff --git a/server/tidb_test.go b/server/tidb_test.go index 880e0f1f5c669..cebec5158628a 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -32,6 +32,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "testing" "time" @@ -1293,6 +1294,77 @@ func TestPessimisticInsertSelectForUpdate(t *testing.T) { require.Nil(t, rs) // should be no delay } +func TestTopSQLCatchRunningSQL(t *testing.T) { + ts, cleanup := createTidbTestTopSQLSuite(t) + defer cleanup() + + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("drop database if exists topsql") + dbt.MustExec("create database topsql") + dbt.MustExec("use topsql;") + dbt.MustExec("create table t (a int, b int);") + + for i := 0; i < 5000; i++ { + dbt.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i)) + } + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachPlan", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachPlan")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")) + }() + + mc := mockTopSQLTraceCPU.NewTopSQLCollector() + topsql.SetupTopSQLForTest(mc) + sqlCPUCollector := collector.NewSQLCPUCollector(mc) + sqlCPUCollector.Start() + defer sqlCPUCollector.Stop() + + query := "select count(*) from t as t0 join t as t1 on t0.a != t1.a;" + needEnableTopSQL := int64(0) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + if atomic.LoadInt64(&needEnableTopSQL) == 1 { + time.Sleep(2 * time.Millisecond) + topsqlstate.EnableTopSQL() + atomic.StoreInt64(&needEnableTopSQL, 0) + } + time.Sleep(time.Millisecond) + } + }() + execFn := func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + atomic.StoreInt64(&needEnableTopSQL, 1) + mustQuery(t, dbt, query) + topsqlstate.DisableTopSQL() + } + check := func() { + require.NoError(t, ctx.Err()) + stats := mc.GetSQLStatsBySQLWithRetry(query, true) + require.Greaterf(t, len(stats), 0, query) + } + ts.testCase(t, mc, execFn, check) + cancel() + wg.Wait() +} + func TestTopSQLCPUProfile(t *testing.T) { ts, cleanup := createTidbTestTopSQLSuite(t) defer cleanup() @@ -1304,9 +1376,11 @@ func TestTopSQLCPUProfile(t *testing.T) { }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachPlan", `return(true)`)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachPlan")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")) }() diff --git a/session/session.go b/session/session.go index 49104944e3339..ba76f7bc2c360 100644 --- a/session/session.go +++ b/session/session.go @@ -1584,7 +1584,8 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter if digest != nil { // Reset the goroutine label when internal sql execute finish. // Specifically reset in ExecRestrictedStmt function. - topsql.AttachSQLInfo(ctx, normalized, digest, "", nil, s.sessionVars.InRestrictedSQL) + s.sessionVars.StmtCtx.IsSQLRegistered.Store(true) + topsql.AttachAndRegisterSQLInfo(ctx, normalized, digest, s.sessionVars.InRestrictedSQL) } } return stmts[0], nil @@ -1827,7 +1828,8 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex } normalizedSQL, digest := s.sessionVars.StmtCtx.SQLDigest() if topsqlstate.TopSQLEnabled() { - ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, s.sessionVars.InRestrictedSQL) + s.sessionVars.StmtCtx.IsSQLRegistered.Store(true) + ctx = topsql.AttachAndRegisterSQLInfo(ctx, normalizedSQL, digest, s.sessionVars.InRestrictedSQL) } if err := s.validateStatementReadOnlyInStaleness(stmtNode); err != nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index fdfddc8171971..3320ce672e13c 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -243,6 +243,11 @@ type StatementContext struct { // RCCheckTS indicates the current read-consistency read select statement will use `RCCheckTS` path. RCCheckTS bool + + // IsSQLRegistered uses to indicate whether the SQL has been registered for TopSQL. + IsSQLRegistered atomic2.Bool + // IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL. + IsSQLAndPlanRegistered atomic2.Bool } // StmtHints are SessionVars related sql hints. diff --git a/util/topsql/collector/cpu.go b/util/topsql/collector/cpu.go index c7382b0c56a1f..eb7ba6183c3a6 100644 --- a/util/topsql/collector/cpu.go +++ b/util/topsql/collector/cpu.go @@ -252,11 +252,13 @@ func (s *sqlStats) tune() { s.plans[""] += optimize } -// CtxWithDigest wrap the ctx with sql digest, if plan digest is not null, wrap with plan digest too. -func CtxWithDigest(ctx context.Context, sqlDigest, planDigest []byte) context.Context { - if len(planDigest) == 0 { - return pprof.WithLabels(ctx, pprof.Labels(labelSQLDigest, string(hack.String(sqlDigest)))) - } +// CtxWithSQLDigest wrap the ctx with sql digest. +func CtxWithSQLDigest(ctx context.Context, sqlDigest []byte) context.Context { + return pprof.WithLabels(ctx, pprof.Labels(labelSQLDigest, string(hack.String(sqlDigest)))) +} + +// CtxWithSQLAndPlanDigest wrap the ctx with sql digest and plan digest. +func CtxWithSQLAndPlanDigest(ctx context.Context, sqlDigest, planDigest []byte) context.Context { return pprof.WithLabels(ctx, pprof.Labels(labelSQLDigest, string(hack.String(sqlDigest)), labelPlanDigest, string(hack.String(planDigest)))) } diff --git a/util/topsql/stmtstats/kv_exec_count.go b/util/topsql/stmtstats/kv_exec_count.go index 7da4dc8eebdcd..0accbef30852f 100644 --- a/util/topsql/stmtstats/kv_exec_count.go +++ b/util/topsql/stmtstats/kv_exec_count.go @@ -17,6 +17,7 @@ package stmtstats import ( "sync" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" ) @@ -50,7 +51,9 @@ type KvExecCounter struct { func (c *KvExecCounter) RPCInterceptor() interceptor.RPCInterceptor { return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - c.mark(target) + if topsqlstate.TopSQLEnabled() { + c.mark(target) + } return next(target, req) } } diff --git a/util/topsql/stmtstats/kv_exec_count_test.go b/util/topsql/stmtstats/kv_exec_count_test.go index c55a5300c0891..789be4991ba19 100644 --- a/util/topsql/stmtstats/kv_exec_count_test.go +++ b/util/topsql/stmtstats/kv_exec_count_test.go @@ -17,11 +17,13 @@ package stmtstats import ( "testing" + "github.com/pingcap/tidb/util/topsql/state" "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/tikvrpc" ) func TestKvExecCounter(t *testing.T) { + state.EnableTopSQL() stats := CreateStatementStats() counter := stats.CreateKvExecCounter([]byte("SQL-1"), []byte("")) interceptor := counter.RPCInterceptor() diff --git a/util/topsql/stmtstats/stmtstats.go b/util/topsql/stmtstats/stmtstats.go index e2dee3df20459..ceebc667fde2d 100644 --- a/util/topsql/stmtstats/stmtstats.go +++ b/util/topsql/stmtstats/stmtstats.go @@ -33,6 +33,9 @@ type StatementObserver interface { OnExecutionBegin(sqlDigest, planDigest []byte) // OnExecutionFinished should be called after the statement is executed. + // WARNING: Currently Only call StatementObserver API when TopSQL is enabled, + // there is no guarantee that both OnExecutionBegin and OnExecutionFinished will be called for a SQL, + // such as TopSQL is enabled during a SQL execution. OnExecutionFinished(sqlDigest, planDigest []byte, execDuration time.Duration) } diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 36b26ee05cd11..f416de28ed7f6 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -79,25 +79,33 @@ func Close() { stmtstats.CloseAggregator() } -// AttachSQLInfo attach the sql information info top sql. -func AttachSQLInfo(ctx context.Context, normalizedSQL string, sqlDigest *parser.Digest, normalizedPlan string, planDigest *parser.Digest, isInternal bool) context.Context { - if len(normalizedSQL) == 0 || sqlDigest == nil || len(sqlDigest.Bytes()) == 0 { - return ctx +// RegisterSQL uses to register SQL information into Top SQL. +func RegisterSQL(normalizedSQL string, sqlDigest *parser.Digest, isInternal bool) { + if sqlDigest != nil { + sqlDigestBytes := sqlDigest.Bytes() + linkSQLTextWithDigest(sqlDigestBytes, normalizedSQL, isInternal) } - var sqlDigestBytes, planDigestBytes []byte - sqlDigestBytes = sqlDigest.Bytes() +} + +// RegisterPlan uses to register plan information into Top SQL. +func RegisterPlan(normalizedPlan string, planDigest *parser.Digest) { if planDigest != nil { - planDigestBytes = planDigest.Bytes() + planDigestBytes := planDigest.Bytes() + linkPlanTextWithDigest(planDigestBytes, normalizedPlan) } - ctx = collector.CtxWithDigest(ctx, sqlDigestBytes, planDigestBytes) - pprof.SetGoroutineLabels(ctx) +} - if len(normalizedPlan) == 0 || len(planDigestBytes) == 0 { - // If plan digest is '', indicate it is the first time to attach the SQL info, since it only know the sql digest. - linkSQLTextWithDigest(sqlDigestBytes, normalizedSQL, isInternal) - } else { - linkPlanTextWithDigest(planDigestBytes, normalizedPlan) +// AttachAndRegisterSQLInfo attach the sql information into Top SQL and register the SQL meta information. +func AttachAndRegisterSQLInfo(ctx context.Context, normalizedSQL string, sqlDigest *parser.Digest, isInternal bool) context.Context { + if sqlDigest == nil || len(sqlDigest.Bytes()) == 0 { + return ctx } + sqlDigestBytes := sqlDigest.Bytes() + ctx = collector.CtxWithSQLDigest(ctx, sqlDigestBytes) + pprof.SetGoroutineLabels(ctx) + + linkSQLTextWithDigest(sqlDigestBytes, normalizedSQL, isInternal) + failpoint.Inject("mockHighLoadForEachSQL", func(val failpoint.Value) { // In integration test, some SQL run very fast that Top SQL pprof profile unable to sample data of those SQL, // So need mock some high cpu load to make sure pprof profile successfully samples the data of those SQL. @@ -107,7 +115,31 @@ func AttachSQLInfo(ctx context.Context, normalizedSQL string, sqlDigest *parser. sqlPrefixes := []string{"insert", "update", "delete", "load", "replace", "select", "begin", "commit", "analyze", "explain", "trace", "create", "set global"} if MockHighCPULoad(normalizedSQL, sqlPrefixes, 1) { - logutil.BgLogger().Info("attach SQL info", zap.String("sql", normalizedSQL), zap.Bool("has-plan", len(normalizedPlan) > 0)) + logutil.BgLogger().Info("attach SQL info", zap.String("sql", normalizedSQL)) + } + } + }) + return ctx +} + +// AttachSQLAndPlanInfo attach the sql and plan information into Top SQL +func AttachSQLAndPlanInfo(ctx context.Context, sqlDigest *parser.Digest, planDigest *parser.Digest) context.Context { + if sqlDigest == nil || len(sqlDigest.Bytes()) == 0 { + return ctx + } + var planDigestBytes []byte + sqlDigestBytes := sqlDigest.Bytes() + if planDigest != nil { + planDigestBytes = planDigest.Bytes() + } + ctx = collector.CtxWithSQLAndPlanDigest(ctx, sqlDigestBytes, planDigestBytes) + pprof.SetGoroutineLabels(ctx) + + failpoint.Inject("mockHighLoadForEachPlan", func(val failpoint.Value) { + // Work like mockHighLoadForEachSQL failpoint. + if val.(bool) { + if MockHighCPULoad("", []string{""}, 1) { + logutil.BgLogger().Info("attach SQL info") } } }) diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 401ae47820ed8..1d9c1ccdda5f9 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -187,10 +187,11 @@ func TestMaxSQLAndPlanTest(t *testing.T) { // Test for normal sql and plan sql := "select * from t" sqlDigest := mock.GenSQLDigest(sql) - topsql.AttachSQLInfo(ctx, sql, sqlDigest, "", nil, false) + topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false) plan := "TableReader table:t" planDigest := genDigest(plan) - topsql.AttachSQLInfo(ctx, sql, sqlDigest, plan, planDigest, false) + topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) + topsql.RegisterPlan(plan, planDigest) cSQL := collector.GetSQL(sqlDigest.Bytes()) require.Equal(t, sql, cSQL) @@ -200,10 +201,11 @@ func TestMaxSQLAndPlanTest(t *testing.T) { // Test for huge sql and plan sql = genStr(topsql.MaxSQLTextSize + 10) sqlDigest = mock.GenSQLDigest(sql) - topsql.AttachSQLInfo(ctx, sql, sqlDigest, "", nil, false) + topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false) plan = genStr(topsql.MaxBinaryPlanSize + 10) planDigest = genDigest(plan) - topsql.AttachSQLInfo(ctx, sql, sqlDigest, plan, planDigest, false) + topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) + topsql.RegisterPlan(plan, planDigest) cSQL = collector.GetSQL(sqlDigest.Bytes()) require.Equal(t, sql[:topsql.MaxSQLTextSize], cSQL) @@ -379,10 +381,11 @@ func TestPubSubWhenReporterIsStopped(t *testing.T) { func mockExecuteSQL(sql, plan string) { ctx := context.Background() sqlDigest := mock.GenSQLDigest(sql) - topsql.AttachSQLInfo(ctx, sql, sqlDigest, "", nil, false) + topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false) mockExecute(time.Millisecond * 100) planDigest := genDigest(plan) - topsql.AttachSQLInfo(ctx, sql, sqlDigest, plan, planDigest, false) + topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) + topsql.RegisterPlan(plan, planDigest) mockExecute(time.Millisecond * 300) }