Skip to content

Commit

Permalink
topsql: fix issue of topsql failed catch the running SQL when topsql …
Browse files Browse the repository at this point in the history
…is enabled in execution (#33861)

close #33859
  • Loading branch information
crazycs520 authored Apr 26, 2022
1 parent 59566fa commit fb342ff
Show file tree
Hide file tree
Showing 21 changed files with 301 additions and 82 deletions.
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,9 @@ func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if job.Query != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query)
w.cacheSQL = job.Query
w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
w.ddlJobCtx = topsql.AttachAndRegisterSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false)
} else {
topsql.AttachSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
topsql.AttachAndRegisterSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false)
}
}

Expand Down
3 changes: 1 addition & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
Expand Down Expand Up @@ -257,7 +256,7 @@ func init() {
// WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the
// number of SQL executions of each TiKV (if any).
func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context {
if topsqlstate.TopSQLEnabled() && stmtCtx.KvExecCounter != nil {
if stmtCtx.KvExecCounter != nil {
// Unlike calling Transaction or Snapshot interface, in distsql package we directly
// face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of
// calling SetRPCInterceptor on Transaction or Snapshot.
Expand Down
10 changes: 4 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc"
)

// RequestBuilder is used to build a "kv.Request".
Expand Down Expand Up @@ -263,7 +263,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = replicaReadType
builder.SetResourceGroupTagger(sv.StmtCtx)
builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger())
return builder
}

Expand Down Expand Up @@ -306,10 +306,8 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde
}

// SetResourceGroupTagger sets the request resource group tagger.
func (builder *RequestBuilder) SetResourceGroupTagger(sc *stmtctx.StatementContext) *RequestBuilder {
if topsqlstate.TopSQLEnabled() {
builder.Request.ResourceGroupTagger = sc.GetResourceGroupTagger()
}
func (builder *RequestBuilder) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) *RequestBuilder {
builder.Request.ResourceGroupTagger = tagger
return builder
}

Expand Down
6 changes: 6 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func TestRequestBuilder1(t *testing.T) {
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

Expand Down Expand Up @@ -355,6 +356,7 @@ func TestRequestBuilder2(t *testing.T) {
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

Expand Down Expand Up @@ -402,6 +404,7 @@ func TestRequestBuilder3(t *testing.T) {
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

Expand Down Expand Up @@ -449,6 +452,7 @@ func TestRequestBuilder4(t *testing.T) {
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

Expand Down Expand Up @@ -559,6 +563,7 @@ func TestRequestBuilder7(t *testing.T) {
ReplicaRead: replicaRead.replicaReadType,
ReadReplicaScope: kv.GlobalReplicaScope,
}
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
})
}
Expand All @@ -581,6 +586,7 @@ func TestRequestBuilder8(t *testing.T) {
SchemaVar: 0,
ReadReplicaScope: kv.GlobalReplicaScope,
}
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

Expand Down
91 changes: 65 additions & 26 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
Expand Down Expand Up @@ -230,16 +231,14 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
staleread.AssertStmtStaleness(a.Ctx, false)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
ctx = a.observeStmtBeginForTopSQL(ctx)
startTs := uint64(math.MaxUint64)
err := a.Ctx.InitTxnWithStartTS(startTs)
if err != nil {
Expand Down Expand Up @@ -330,17 +329,26 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
return a.InfoSchema.SchemaMetaVersion(), nil
}

func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context {
if !topsqlstate.TopSQLEnabled() {
return ctx
// IsFastPlan exports for testing.
func IsFastPlan(p plannercore.Plan) bool {
if proj, ok := p.(*plannercore.PhysicalProjection); ok {
p = proj.Children()[0]
}
vars := a.Ctx.GetSessionVars()
normalizedSQL, sqlDigest := vars.StmtCtx.SQLDigest()
normalizedPlan, planDigest := getPlanDigest(a.Ctx, a.Plan)
if len(normalizedPlan) == 0 {
return ctx
switch p.(type) {
case *plannercore.PointGetPlan:
return true
case *plannercore.PhysicalTableDual:
// Plan of following SQL is PhysicalTableDual:
// select 1;
// select @@autocommit;
return true
case *plannercore.Set:
// Plan of following SQL is Set:
// set @a=1;
// set @@autocommit=1;
return true
}
return topsql.AttachSQLInfo(ctx, normalizedSQL, sqlDigest, normalizedPlan, planDigest, vars.InRestrictedSQL)
return false
}

// Exec builds an Executor from a plan. If the Executor doesn't return result,
Expand Down Expand Up @@ -407,8 +415,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return nil, err
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
ctx = a.observeStmtBeginForTopSQL(ctx)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand Down Expand Up @@ -1058,7 +1065,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
_, planDigest := getPlanDigest(sessVars.StmtCtx, a.Plan)
slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Expand Down Expand Up @@ -1166,8 +1173,7 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string {
}

// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (string, *parser.Digest) {
sc := sctx.GetSessionVars().StmtCtx
func getPlanDigest(sc *stmtctx.StatementContext, p plannercore.Plan) (string, *parser.Digest) {
normalized, planDigest := sc.GetPlanDigest()
if len(normalized) > 0 && planDigest != nil {
return normalized, planDigest
Expand Down Expand Up @@ -1256,11 +1262,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
_, planDigest := getPlanDigest(stmtCtx, a.Plan)
return planDigest.String()
}
} else {
_, tmp := getPlanDigest(a.Ctx, a.Plan)
_, tmp := getPlanDigest(stmtCtx, a.Plan)
planDigest = tmp.String()
}

Expand Down Expand Up @@ -1340,17 +1346,50 @@ func (a *ExecStmt) GetTextToLog() string {
return sql
}

func (a *ExecStmt) observeStmtBeginForTopSQL() {
func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Context {
vars := a.Ctx.GetSessionVars()
if vars == nil {
return
sc := vars.StmtCtx
normalizedSQL, sqlDigest := sc.SQLDigest()
normalizedPlan, planDigest := getPlanDigest(sc, a.Plan)
var sqlDigestByte, planDigestByte []byte
if sqlDigest != nil {
sqlDigestByte = sqlDigest.Bytes()
}
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
stats.OnExecutionBegin(sqlDigest, planDigest)
if planDigest != nil {
planDigestByte = planDigest.Bytes()
}
stats := a.Ctx.GetStmtStats()
if !topsqlstate.TopSQLEnabled() {
// To reduce the performance impact on fast plan.
// Drop them does not cause notable accuracy issue in TopSQL.
if IsFastPlan(a.Plan) {
return ctx
}
// Always attach the SQL and plan info uses to catch the running SQL when Top SQL is enabled in execution.
if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
// This is a special logic prepared for TiKV's SQLExecCount.
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
}
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
}

if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
// This is a special logic prepared for TiKV's SQLExecCount.
vars.StmtCtx.KvExecCounter = stats.CreateKvExecCounter(sqlDigest, planDigest)
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
}

isSQLRegistered := sc.IsSQLRegistered.Load()
if !isSQLRegistered {
topsql.RegisterSQL(normalizedSQL, sqlDigest, vars.InRestrictedSQL)
}
sc.IsSQLAndPlanRegistered.Store(true)
if len(normalizedPlan) == 0 {
return ctx
}
topsql.RegisterPlan(normalizedPlan, planDigest)
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
}

func (a *ExecStmt) observeStmtFinishedForTopSQL() {
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
} else {
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
}
kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx)
kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
kvReq, err := kvReqBuilder.
SetAnalyzeRequest(e.analyzePB).
SetStartTS(e.snapshot).
Expand Down Expand Up @@ -857,7 +857,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx)
builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := reqBuilder.
Expand Down
4 changes: 2 additions & 2 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
}

var builder distsql.RequestBuilder
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx)
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand All @@ -258,7 +258,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
ranges := ranger.FullRange()

var builder distsql.RequestBuilder
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx)
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand Down
20 changes: 18 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
if trace.IsEnabled() {
defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End()
}
if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CAS(false, true) {
registerSQLAndPlanInExecForTopSQL(sessVars)
}
err := e.Next(ctx, req)

if err != nil {
Expand Down Expand Up @@ -1799,7 +1802,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
pprof.SetGoroutineLabels(goCtx)
}
if topsqlstate.TopSQLEnabled() && prepareStmt.SQLDigest != nil {
topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil, vars.InRestrictedSQL)
sc.IsSQLRegistered.Store(true)
topsql.AttachAndRegisterSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, vars.InRestrictedSQL)
}
if s, ok := prepareStmt.PreparedAst.Stmt.(*ast.SelectStmt); ok {
if s.LockInfo == nil {
Expand Down Expand Up @@ -1951,6 +1955,18 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
return
}

// registerSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution.
// This uses to catch the running SQL when Top SQL is enabled in execution.
func registerSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) {
stmtCtx := sessVars.StmtCtx
normalizedSQL, sqlDigest := stmtCtx.SQLDigest()
topsql.RegisterSQL(normalizedSQL, sqlDigest, sessVars.InRestrictedSQL)
normalizedPlan, planDigest := stmtCtx.GetPlanDigest()
if len(normalizedPlan) > 0 {
topsql.RegisterPlan(normalizedPlan, planDigest)
}
}

// ResetUpdateStmtCtx resets statement context for UpdateStmt.
func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars *variable.SessionVars) {
sc.InUpdateStmt = true
Expand Down Expand Up @@ -1994,7 +2010,7 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd
}

func setOptionForTopSQL(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot == nil || !topsqlstate.TopSQLEnabled() {
if snapshot == nil {
return
}
snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
Expand Down
36 changes: 36 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5939,3 +5939,39 @@ func TestSummaryFailedUpdate(t *testing.T) {
tk.MustExec("set @@tidb_mem_quota_query=1000000000")
tk.MustQuery("select stmt_type from information_schema.statements_summary where digest_text = 'update `t` set `t` . `a` = `t` . `a` - ? where `t` . `a` in ( select `a` from `t` where `a` < ? )'").Check(testkit.Rows("Update"))
}

func TestIsFastPlan(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, a int)")

cases := []struct {
sql string
isFastPlan bool
}{
{"select a from t where id=1", true},
{"select a+id from t where id=1", true},
{"select 1", true},
{"select @@autocommit", true},
{"set @@autocommit=1", true},
{"set @a=1", true},
{"select * from t where a=1", false},
{"select * from t", false},
}

for _, ca := range cases {
if strings.HasPrefix(ca.sql, "select") {
tk.MustQuery(ca.sql)
} else {
tk.MustExec(ca.sql)
}
info := tk.Session().ShowProcess()
require.NotNil(t, info)
p, ok := info.Plan.(plannercore.Plan)
require.True(t, ok)
ok = executor.IsFastPlan(p)
require.Equal(t, ca.isFastPlan, ok)
}
}
Loading

0 comments on commit fb342ff

Please sign in to comment.