Skip to content

Commit

Permalink
*: pass sql, plan digest down to KV request (#24854)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed May 25, 2021
1 parent 8c25a00 commit 2580240
Show file tree
Hide file tree
Showing 46 changed files with 432 additions and 227 deletions.
9 changes: 5 additions & 4 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func normalizeWithDefaultDB(c *C, sql, db string) (string, string) {
testParser := parser.New()
stmt, err := testParser.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
return parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", ""))
normalized, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", ""))
return normalized, digest.String()
}

func (s *testSuite) TestBindParse(c *C) {
Expand All @@ -182,7 +183,7 @@ func (s *testSuite) TestBindParse(c *C) {
c.Check(bindHandle.Size(), Equals, 1)

sql, hash := parser.NormalizeDigest("select * from test . t")
bindData := bindHandle.GetBindRecord(hash, sql, "test")
bindData := bindHandle.GetBindRecord(hash.String(), sql, "test")
c.Check(bindData, NotNil)
c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t`")
bind := bindData.Bindings[0]
Expand Down Expand Up @@ -656,7 +657,7 @@ func (s *testSuite) TestBindingSymbolList(c *C) {
// Normalize
sql, hash := parser.NormalizeDigest("select a, b from test . t where a = 1 limit 0, 1")

bindData := s.domain.BindHandle().GetBindRecord(hash, sql, "test")
bindData := s.domain.BindHandle().GetBindRecord(hash.String(), sql, "test")
c.Assert(bindData, NotNil)
c.Check(bindData.OriginalSQL, Equals, "select `a` , `b` from `test` . `t` where `a` = ? limit ...")
bind := bindData.Bindings[0]
Expand Down Expand Up @@ -776,7 +777,7 @@ func (s *testSuite) TestErrorBind(c *C) {
c.Assert(err, IsNil, Commentf("err %v", err))

sql, hash := parser.NormalizeDigest("select * from test . t where i > ?")
bindData := s.domain.BindHandle().GetBindRecord(hash, sql, "test")
bindData := s.domain.BindHandle().GetBindRecord(hash.String(), sql, "test")
c.Check(bindData, NotNil)
c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t` where `i` > ?")
bind := bindData.Bindings[0]
Expand Down
12 changes: 6 additions & 6 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
}

sqlDigest := parser.DigestNormalized(record.OriginalSQL)
h.setBindRecord(sqlDigest, record)
h.setBindRecord(sqlDigest.String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
Expand Down Expand Up @@ -256,7 +256,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}

record.Db = strings.ToLower(record.Db)
oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL), record.OriginalSQL, record.Db)
oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record.OriginalSQL, record.Db)
var duplicateBinding *Binding
if oldRecord != nil {
binding := oldRecord.FindBinding(record.Bindings[0].ID)
Expand Down Expand Up @@ -294,7 +294,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return
}

h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
Expand Down Expand Up @@ -367,7 +367,7 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
h.removeBindRecord(parser.DigestNormalized(originalSQL), record)
h.removeBindRecord(parser.DigestNormalized(originalSQL).String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
Expand Down Expand Up @@ -515,7 +515,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
defer h.sctx.Unlock()
h.sctx.GetSessionVars().CurrentDB = bindRecord.Db
err := bindRecord.prepareHints(h.sctx.Context)
return hash, bindRecord, err
return hash.String(), bindRecord, err
}

// setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord,
Expand Down Expand Up @@ -624,7 +624,7 @@ func (h *BindHandle) CaptureBaselines() {
}
dbName := utilparser.GetDefaultDB(stmt, bindableStmt.Schema)
normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName, bindableStmt.Query))
if r := h.GetBindRecord(digest, normalizedSQL, dbName); r != nil && r.HasUsingBinding() {
if r := h.GetBindRecord(digest.String(), normalizedSQL, dbName); r != nil && r.HasUsingBinding() {
continue
}
bindSQL := GenerateBindSQL(context.TODO(), stmt, bindableStmt.PlanHint, true, dbName)
Expand Down
6 changes: 3 additions & 3 deletions bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *SessionHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRe
}

// update the BindMeta to the cache.
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record)
return nil
}

Expand All @@ -78,14 +78,14 @@ func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding)
} else {
newRecord = record
}
h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL), newRecord)
h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), newRecord)
updateMetrics(metrics.ScopeSession, oldRecord, newRecord, false)
return nil
}

// GetBindRecord return the BindMeta of the (normdOrigSQL,db) if BindMeta exist.
func (h *SessionHandle) GetBindRecord(normdOrigSQL, db string) *BindRecord {
hash := parser.DigestNormalized(normdOrigSQL)
hash := parser.DigestNormalized(normdOrigSQL).String()
bindRecords := h.ch[hash]
for _, bindRecord := range bindRecords {
if bindRecord.OriginalSQL == normdOrigSQL {
Expand Down
21 changes: 21 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type Config struct {
DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"`
SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"`
StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"`
TopSQL TopSQL `toml:"top-sql" json:"top-sql"`
// RepairMode indicates that the TiDB is in the repair mode for table meta.
RepairMode bool `toml:"repair-mode" json:"repair-mode"`
RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"`
Expand Down Expand Up @@ -527,6 +528,16 @@ type StmtSummary struct {
HistorySize int `toml:"history-size" json:"history-size"`
}

// TopSQL is the config for top sql.
type TopSQL struct {
// Enable statement summary or not.
Enable bool `toml:"enable" json:"enable"`
// The refresh interval of statement summary.
RefreshInterval int `toml:"refresh-interval" json:"refresh-interval"`
// The maximum number of statements kept in memory.
MaxStmtCount uint `toml:"max-stmt-count" json:"max-stmt-count"`
}

// IsolationRead is the config for isolation read.
type IsolationRead struct {
// Engines filters tidb-server access paths by engine type.
Expand Down Expand Up @@ -656,6 +667,11 @@ var defaultConf = Config{
RefreshInterval: 1800,
HistorySize: 24,
},
TopSQL: TopSQL{
Enable: true,
RefreshInterval: 1,
MaxStmtCount: 5000,
},
IsolationRead: IsolationRead{
Engines: []string{"tikv", "tiflash", "tidb"},
},
Expand Down Expand Up @@ -943,6 +959,11 @@ func TableLockEnabled() bool {
return GetGlobalConfig().EnableTableLock
}

// TopSQLEnabled uses to check whether enabled the top SQL feature.
func TopSQLEnabled() bool {
return GetGlobalConfig().TopSQL.Enable
}

// TableLockDelayClean uses to get the time of delay clean table lock.
var TableLockDelayClean = func() uint64 {
return GetGlobalConfig().DelayCleanTableLock
Expand Down
10 changes: 10 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -239,6 +240,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
},
}
}
builder.SetResourceGroupTag(sv.StmtCtx)
return builder
}

Expand Down Expand Up @@ -274,6 +276,14 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde
return builder
}

// SetResourceGroupTag sets the request resource group tag.
func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder {
if config.TopSQLEnabled() {
builder.Request.ResourceGroupTag = sc.GetResourceGroupTag()
}
return builder
}

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = kv.GlobalTxnScope
Expand Down
33 changes: 18 additions & 15 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return nil, err
}

getPlanDigest(a.Ctx, a.Plan)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
return nil, err
Expand Down Expand Up @@ -919,11 +921,11 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
planDigest := getPlanDigest(a.Ctx, a.Plan)
slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Digest: digest,
Digest: digest.String(),
TimeTotal: costTime,
TimeParse: sessVars.DurationParse,
TimeCompile: sessVars.DurationCompile,
Expand Down Expand Up @@ -981,7 +983,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
}
domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{
SQL: sql.String(),
Digest: digest,
Digest: digest.String(),
Start: sessVars.StartTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Expand Down Expand Up @@ -1011,14 +1013,15 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string {
}

// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (normalized, planDigest string) {
normalized, planDigest = sctx.GetSessionVars().StmtCtx.GetPlanDigest()
if len(normalized) > 0 {
return
}
normalized, planDigest = plannercore.NormalizePlan(p)
sctx.GetSessionVars().StmtCtx.SetPlanDigest(normalized, planDigest)
return
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string {
sc := sctx.GetSessionVars().StmtCtx
_, planDigest := sc.GetPlanDigest()
if planDigest != nil {
return planDigest.String()
}
normalized, planDigest := plannercore.NormalizePlan(p)
sc.SetPlanDigest(normalized, planDigest)
return planDigest.String()
}

// getEncodedPlan gets the encoded plan, and generates the hint string if indicated.
Expand Down Expand Up @@ -1079,7 +1082,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
}
prevSQL = sessVars.PrevStmt.String()
}
sessVars.SetPrevStmtDigest(digest)
sessVars.SetPrevStmtDigest(digest.String())

// No need to encode every time, so encode lazily.
planGenerator := func() (string, string) {
Expand All @@ -1092,11 +1095,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
planDigest := getPlanDigest(a.Ctx, a.Plan)
return planDigest
}
} else {
_, planDigest = getPlanDigest(a.Ctx, a.Plan)
planDigest = getPlanDigest(a.Ctx, a.Plan)
}

execDetail := stmtCtx.GetExecDetails()
Expand All @@ -1120,7 +1123,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
Charset: charset,
Collation: collation,
NormalizedSQL: normalizedSQL,
Digest: digest,
Digest: digest.String(),
PrevSQL: prevSQL,
PrevSQLDigest: prevSQLDigest,
PlanGenerator: planGenerator,
Expand Down
4 changes: 4 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
} else {
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
}
kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx)
kvReq, err := kvReqBuilder.
SetAnalyzeRequest(e.analyzePB).
SetStartTS(math.MaxUint64).
Expand Down Expand Up @@ -653,6 +654,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx)
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := reqBuilder.
Expand Down Expand Up @@ -1323,6 +1325,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
if err != nil {
Expand All @@ -1343,6 +1346,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand Down
9 changes: 6 additions & 3 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() {
// Open implements the Executor interface.
func (e *BatchPointGetExec) Open(context.Context) error {
e.snapshotTS = e.startTS
txnCtx := e.ctx.GetSessionVars().TxnCtx
sessVars := e.ctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
stmtCtx := sessVars.StmtCtx
if e.lock {
e.snapshotTS = txnCtx.GetForUpdateTS()
}
Expand All @@ -112,12 +114,12 @@ func (e *BatchPointGetExec) Open(context.Context) error {
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
Expand All @@ -128,6 +130,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
},
})
}
setResourceGroupTagForTxn(stmtCtx, snapshot)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
2 changes: 2 additions & 0 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
}

var builder distsql.RequestBuilder
builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx)
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand All @@ -256,6 +257,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
ranges := ranger.FullRange()

var builder distsql.RequestBuilder
builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx)
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand Down
Loading

0 comments on commit 2580240

Please sign in to comment.