Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: pass sql, plan digest down to KV request #24854

Merged
merged 22 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
}

sqlDigest := parser.DigestNormalized(record.OriginalSQL)
h.setBindRecord(sqlDigest, record)
h.setBindRecord(sqlDigest.String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
Expand Down Expand Up @@ -256,7 +256,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}

record.Db = strings.ToLower(record.Db)
oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL), record.OriginalSQL, record.Db)
oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record.OriginalSQL, record.Db)
var duplicateBinding *Binding
if oldRecord != nil {
binding := oldRecord.FindBinding(record.Bindings[0].ID)
Expand Down Expand Up @@ -294,7 +294,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return
}

h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
Expand Down Expand Up @@ -367,7 +367,7 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
h.removeBindRecord(parser.DigestNormalized(originalSQL), record)
h.removeBindRecord(parser.DigestNormalized(originalSQL).String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
Expand Down Expand Up @@ -515,7 +515,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
defer h.sctx.Unlock()
h.sctx.GetSessionVars().CurrentDB = bindRecord.Db
err := bindRecord.prepareHints(h.sctx.Context)
return hash, bindRecord, err
return hash.String(), bindRecord, err
}

// setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord,
Expand Down Expand Up @@ -624,7 +624,7 @@ func (h *BindHandle) CaptureBaselines() {
}
dbName := utilparser.GetDefaultDB(stmt, bindableStmt.Schema)
normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName, bindableStmt.Query))
if r := h.GetBindRecord(digest, normalizedSQL, dbName); r != nil && r.HasUsingBinding() {
if r := h.GetBindRecord(digest.String(), normalizedSQL, dbName); r != nil && r.HasUsingBinding() {
continue
}
bindSQL := GenerateBindSQL(context.TODO(), stmt, bindableStmt.PlanHint, true, dbName)
Expand Down
6 changes: 3 additions & 3 deletions bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *SessionHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRe
}

// update the BindMeta to the cache.
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record)
return nil
}

Expand All @@ -78,14 +78,14 @@ func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding)
} else {
newRecord = record
}
h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL), newRecord)
h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), newRecord)
updateMetrics(metrics.ScopeSession, oldRecord, newRecord, false)
return nil
}

// GetBindRecord return the BindMeta of the (normdOrigSQL,db) if BindMeta exist.
func (h *SessionHandle) GetBindRecord(normdOrigSQL, db string) *BindRecord {
hash := parser.DigestNormalized(normdOrigSQL)
hash := parser.DigestNormalized(normdOrigSQL).String()
bindRecords := h.ch[hash]
for _, bindRecord := range bindRecords {
if bindRecord.OriginalSQL == normdOrigSQL {
Expand Down
21 changes: 21 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type Config struct {
DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"`
SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"`
StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"`
TopSQL TopSQL `toml:"top-sql" json:"top-sql"`
// RepairMode indicates that the TiDB is in the repair mode for table meta.
RepairMode bool `toml:"repair-mode" json:"repair-mode"`
RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"`
Expand Down Expand Up @@ -527,6 +528,16 @@ type StmtSummary struct {
HistorySize int `toml:"history-size" json:"history-size"`
}

// TopSQL is the config for top sql.
type TopSQL struct {
// Enable statement summary or not.
Enable bool `toml:"enable" json:"enable"`
// The refresh interval of statement summary.
RefreshInterval int `toml:"refresh-interval" json:"refresh-interval"`
// The maximum number of statements kept in memory.
MaxStmtCount uint `toml:"max-stmt-count" json:"max-stmt-count"`
}

// IsolationRead is the config for isolation read.
type IsolationRead struct {
// Engines filters tidb-server access paths by engine type.
Expand Down Expand Up @@ -656,6 +667,11 @@ var defaultConf = Config{
RefreshInterval: 1800,
HistorySize: 24,
},
TopSQL: TopSQL{
Enable: true,
RefreshInterval: 1,
MaxStmtCount: 5000,
},
IsolationRead: IsolationRead{
Engines: []string{"tikv", "tiflash", "tidb"},
},
Expand Down Expand Up @@ -943,6 +959,11 @@ func TableLockEnabled() bool {
return GetGlobalConfig().EnableTableLock
}

// TopSQLEnabled uses to check whether enabled the top SQL feature.
func TopSQLEnabled() bool {
return GetGlobalConfig().TopSQL.Enable
}

// TableLockDelayClean uses to get the time of delay clean table lock.
var TableLockDelayClean = func() uint64 {
return GetGlobalConfig().DelayCleanTableLock
Expand Down
10 changes: 10 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -243,6 +244,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
},
}
}
builder.SetResourceGroupTag(sv.StmtCtx)
return builder
}

Expand Down Expand Up @@ -276,6 +278,14 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ
return builder
}

// SetResourceGroupTag sets the request resource group tag.
func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder {
if config.TopSQLEnabled() {
builder.Request.ResourceGroupTag = sc.GetResourceGroupTag()
}
return builder
}

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = kv.GlobalTxnScope
Expand Down
29 changes: 16 additions & 13 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return nil, err
}

getPlanDigest(a.Ctx, a.Plan)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
return nil, err
Expand Down Expand Up @@ -919,11 +921,11 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
planDigest := getPlanDigest(a.Ctx, a.Plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated with line 336?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 maybe make getPlanDigest be memoized, just calculate plan once in 336 and directly reuse result in here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 336 uses to initialize the plan digest, and it is already cache in StatementContext

slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Digest: digest,
Digest: digest.String(),
TimeTotal: costTime,
TimeParse: sessVars.DurationParse,
TimeCompile: sessVars.DurationCompile,
Expand Down Expand Up @@ -981,7 +983,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
}
domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{
SQL: sql.String(),
Digest: digest,
Digest: digest.String(),
Start: sessVars.StartTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Expand Down Expand Up @@ -1011,14 +1013,15 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string {
}

// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (normalized, planDigest string) {
normalized, planDigest = sctx.GetSessionVars().StmtCtx.GetPlanDigest()
if len(normalized) > 0 {
return
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string {
sc := sctx.GetSessionVars().StmtCtx
normalized, planDigest := sc.GetPlanDigest()
if planDigest != nil {
return planDigest.String()
}
normalized, planDigest = plannercore.NormalizePlan(p)
sctx.GetSessionVars().StmtCtx.SetPlanDigest(normalized, planDigest)
return
sc.SetPlanDigest(normalized, planDigest)
return planDigest.String()
}

// getEncodedPlan gets the encoded plan, and generates the hint string if indicated.
Expand Down Expand Up @@ -1079,7 +1082,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
}
prevSQL = sessVars.PrevStmt.String()
}
sessVars.SetPrevStmtDigest(digest)
sessVars.SetPrevStmtDigest(digest.String())

// No need to encode every time, so encode lazily.
planGenerator := func() (string, string) {
Expand All @@ -1092,11 +1095,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
planDigest := getPlanDigest(a.Ctx, a.Plan)
return planDigest
}
} else {
_, planDigest = getPlanDigest(a.Ctx, a.Plan)
planDigest = getPlanDigest(a.Ctx, a.Plan)
}

execDetail := stmtCtx.GetExecDetails()
Expand All @@ -1120,7 +1123,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
Charset: charset,
Collation: collation,
NormalizedSQL: normalizedSQL,
Digest: digest,
Digest: digest.String(),
PrevSQL: prevSQL,
PrevSQLDigest: prevSQLDigest,
PlanGenerator: planGenerator,
Expand Down
4 changes: 4 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
} else {
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
}
kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx)
kvReq, err := kvReqBuilder.
SetAnalyzeRequest(e.analyzePB).
SetStartTS(math.MaxUint64).
Expand Down Expand Up @@ -653,6 +654,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx)
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := reqBuilder.
Expand Down Expand Up @@ -1323,6 +1325,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
if err != nil {
Expand All @@ -1343,6 +1346,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand Down
9 changes: 6 additions & 3 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() {
// Open implements the Executor interface.
func (e *BatchPointGetExec) Open(context.Context) error {
e.snapshotTS = e.startTS
txnCtx := e.ctx.GetSessionVars().TxnCtx
sessVars := e.ctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
stmtCtx := sessVars.StmtCtx
if e.lock {
e.snapshotTS = txnCtx.GetForUpdateTS()
}
Expand All @@ -113,12 +115,12 @@ func (e *BatchPointGetExec) Open(context.Context) error {
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
Expand All @@ -129,6 +131,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
},
})
}
setResourceGroupTagForTxn(stmtCtx, snapshot)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
2 changes: 2 additions & 0 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
}

var builder distsql.RequestBuilder
builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx)
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand All @@ -256,6 +257,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
ranges := ranger.FullRange()

var builder distsql.RequestBuilder
builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx)
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand Down
13 changes: 12 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -974,7 +975,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
var planDigest *parser.Digest
_, sqlDigest := seVars.StmtCtx.SQLDigest()
if config.TopSQLEnabled() {
_, planDigest = seVars.StmtCtx.GetPlanDigest()
}
return &tikvstore.LockCtx{
Killed: &seVars.Killed,
ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(),
Expand All @@ -984,7 +989,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest),
ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest, planDigest),
OnDeadlock: func(deadlock *tikverr.ErrDeadlock) {
// TODO: Support collecting retryable deadlocks according to the config.
if !deadlock.IsRetryable {
Expand Down Expand Up @@ -1794,3 +1799,9 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd
}
return nil
}

func setResourceGroupTagForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot != nil && config.TopSQLEnabled() {
snapshot.SetOption(kv.ResourceGroupTag, sc.GetResourceGroupTag())
}
}
Loading