From 8c25a0074a62e6ebffe50ac765b2699b043f1f78 Mon Sep 17 00:00:00 2001 From: Shirly Date: Tue, 25 May 2021 15:23:33 +0800 Subject: [PATCH 1/2] */backoff: make backoff type as string instead of interface fmt.Stringer (#24810) --- store/tikv/retry/backoff.go | 12 ++++-- store/tikv/util/execdetails.go | 9 ++--- util/execdetails/execdetails.go | 5 +-- util/execdetails/execdetails_test.go | 44 ++++++---------------- util/stmtsummary/statement_summary.go | 8 ++-- util/stmtsummary/statement_summary_test.go | 25 ++++++------ 6 files changed, 41 insertions(+), 62 deletions(-) diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go index d07b9c4fdccae..a005f85bf698b 100644 --- a/store/tikv/retry/backoff.go +++ b/store/tikv/retry/backoff.go @@ -41,7 +41,7 @@ type Backoffer struct { maxSleep int totalSleep int errors []error - configs []fmt.Stringer + configs []*Config vars *kv.Variables noop bool @@ -172,7 +172,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e } logutil.BgLogger().Warn(errMsg) // Use the first backoff type to generate a MySQL error. - return b.configs[0].(*Config).err + return b.configs[0].err } // Lazy initialize. @@ -265,8 +265,12 @@ func (b *Backoffer) GetTotalSleep() int { } // GetTypes returns type list. -func (b *Backoffer) GetTypes() []fmt.Stringer { - return b.configs +func (b *Backoffer) GetTypes() []string { + typs := make([]string, 0, len(b.configs)) + for _, cfg := range b.configs { + typs = append(typs, cfg.String()) + } + return typs } // GetCtx returns the binded context. diff --git a/store/tikv/util/execdetails.go b/store/tikv/util/execdetails.go index eeaaf92da6b27..97c88a3687319 100644 --- a/store/tikv/util/execdetails.go +++ b/store/tikv/util/execdetails.go @@ -15,7 +15,6 @@ package util import ( "bytes" - "fmt" "math" "strconv" "sync" @@ -50,7 +49,7 @@ type CommitDetails struct { CommitBackoffTime int64 Mu struct { sync.Mutex - BackoffTypes []fmt.Stringer + BackoffTypes []string } ResolveLockTime int64 WriteKeys int @@ -90,7 +89,7 @@ func (cd *CommitDetails) Clone() *CommitDetails { PrewriteRegionNum: cd.PrewriteRegionNum, TxnRetry: cd.TxnRetry, } - commit.Mu.BackoffTypes = append([]fmt.Stringer{}, cd.Mu.BackoffTypes...) + commit.Mu.BackoffTypes = append([]string{}, cd.Mu.BackoffTypes...) return commit } @@ -103,7 +102,7 @@ type LockKeysDetails struct { BackoffTime int64 Mu struct { sync.Mutex - BackoffTypes []fmt.Stringer + BackoffTypes []string } LockRPCTime int64 LockRPCCount int64 @@ -135,7 +134,7 @@ func (ld *LockKeysDetails) Clone() *LockKeysDetails { LockRPCCount: ld.LockRPCCount, RetryCount: ld.RetryCount, } - lock.Mu.BackoffTypes = append([]fmt.Stringer{}, ld.Mu.BackoffTypes...) + lock.Mu.BackoffTypes = append([]string{}, ld.Mu.BackoffTypes...) return lock } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 676f0a241d489..16a17b656c1cc 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -892,14 +892,13 @@ func (e *RuntimeStatsWithCommit) String() string { return buf.String() } -func (e *RuntimeStatsWithCommit) formatBackoff(backoffTypes []fmt.Stringer) string { +func (e *RuntimeStatsWithCommit) formatBackoff(backoffTypes []string) string { if len(backoffTypes) == 0 { return "" } tpMap := make(map[string]struct{}) tpArray := []string{} - for _, tp := range backoffTypes { - tpStr := tp.String() + for _, tpStr := range backoffTypes { _, ok := tpMap[tpStr] if ok { continue diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index 371d06006051f..827410cb04350 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -14,7 +14,6 @@ package execdetails import ( - "fmt" "strconv" "sync" "testing" @@ -22,7 +21,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/store/tikv/util" - "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -43,14 +41,10 @@ func TestString(t *testing.T) { CommitBackoffTime: int64(time.Second), Mu: struct { sync.Mutex - BackoffTypes []fmt.Stringer - }{BackoffTypes: []fmt.Stringer{ - stringutil.MemoizeStr(func() string { - return "backoff1" - }), - stringutil.MemoizeStr(func() string { - return "backoff2" - }), + BackoffTypes []string + }{BackoffTypes: []string{ + "backoff1", + "backoff2", }}, ResolveLockTime: 1000000000, // 10^9 ns = 1s WriteKeys: 1, @@ -212,18 +206,8 @@ func TestRuntimeStatsWithCommit(t *testing.T) { CommitBackoffTime: int64(time.Second), Mu: struct { sync.Mutex - BackoffTypes []fmt.Stringer - }{BackoffTypes: []fmt.Stringer{ - stringutil.MemoizeStr(func() string { - return "backoff1" - }), - stringutil.MemoizeStr(func() string { - return "backoff2" - }), - stringutil.MemoizeStr(func() string { - return "backoff1" - }), - }}, + BackoffTypes []string + }{BackoffTypes: []string{"backoff1", "backoff2", "backoff1"}}, ResolveLockTime: int64(time.Second), WriteKeys: 3, WriteSize: 66, @@ -245,17 +229,11 @@ func TestRuntimeStatsWithCommit(t *testing.T) { BackoffTime: int64(time.Second * 3), Mu: struct { sync.Mutex - BackoffTypes []fmt.Stringer - }{BackoffTypes: []fmt.Stringer{ - stringutil.MemoizeStr(func() string { - return "backoff4" - }), - stringutil.MemoizeStr(func() string { - return "backoff5" - }), - stringutil.MemoizeStr(func() string { - return "backoff5" - }), + BackoffTypes []string + }{BackoffTypes: []string{ + "backoff4", + "backoff5", + "backoff5", }}, LockRPCTime: int64(time.Second * 5), LockRPCCount: 50, diff --git a/util/stmtsummary/statement_summary.go b/util/stmtsummary/statement_summary.go index f42089c9e7a01..3dac542bed4b8 100644 --- a/util/stmtsummary/statement_summary.go +++ b/util/stmtsummary/statement_summary.go @@ -172,7 +172,7 @@ type stmtSummaryByDigestElement struct { sumTxnRetry int64 maxTxnRetry int sumBackoffTimes int64 - backoffTypes map[fmt.Stringer]int + backoffTypes map[string]int authUsers map[string]struct{} // other sumMem int64 @@ -635,7 +635,7 @@ func newStmtSummaryByDigestElement(sei *StmtExecInfo, beginTime int64, intervalS minLatency: sei.TotalLatency, firstSeen: sei.StartTime, lastSeen: sei.StartTime, - backoffTypes: make(map[fmt.Stringer]int), + backoffTypes: make(map[string]int), authUsers: make(map[string]struct{}), planInCache: false, planCacheHits: 0, @@ -971,9 +971,9 @@ func formatSQL(sql string) string { } // Format the backoffType map to a string or nil. -func formatBackoffTypes(backoffMap map[fmt.Stringer]int) interface{} { +func formatBackoffTypes(backoffMap map[string]int) interface{} { type backoffStat struct { - backoffType fmt.Stringer + backoffType string count int } diff --git a/util/stmtsummary/statement_summary_test.go b/util/stmtsummary/statement_summary_test.go index fb4593e26e9b6..751a8e501f715 100644 --- a/util/stmtsummary/statement_summary_test.go +++ b/util/stmtsummary/statement_summary_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/execdetails" - "github.com/pingcap/tidb/util/stringutil" ) var _ = Suite(&testStmtSummarySuite{}) @@ -63,7 +62,7 @@ func TestT(t *testing.T) { } const ( - boTxnLockName = stringutil.StringerStr("txnlock") + boTxnLockName = "txnlock" ) // Test stmtSummaryByDigest.AddStatement. @@ -77,7 +76,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { // first statement stmtExecInfo1 := generateAnyExecInfo() - stmtExecInfo1.ExecDetail.CommitDetail.Mu.BackoffTypes = make([]fmt.Stringer, 0) + stmtExecInfo1.ExecDetail.CommitDetail.Mu.BackoffTypes = make([]string, 0) key := &stmtSummaryByDigestKey{ schemaName: stmtExecInfo1.SchemaName, digest: stmtExecInfo1.Digest, @@ -133,7 +132,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { maxPrewriteRegionNum: stmtExecInfo1.ExecDetail.CommitDetail.PrewriteRegionNum, sumTxnRetry: int64(stmtExecInfo1.ExecDetail.CommitDetail.TxnRetry), maxTxnRetry: stmtExecInfo1.ExecDetail.CommitDetail.TxnRetry, - backoffTypes: make(map[fmt.Stringer]int), + backoffTypes: make(map[string]int), sumMem: stmtExecInfo1.MemMax, maxMem: stmtExecInfo1.MemMax, sumDisk: stmtExecInfo1.DiskMax, @@ -194,9 +193,9 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { CommitBackoffTime: 1000, Mu: struct { sync.Mutex - BackoffTypes []fmt.Stringer + BackoffTypes []string }{ - BackoffTypes: []fmt.Stringer{boTxnLockName}, + BackoffTypes: []string{boTxnLockName}, }, ResolveLockTime: 10000, WriteKeys: 100000, @@ -321,9 +320,9 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { CommitBackoffTime: 100, Mu: struct { sync.Mutex - BackoffTypes []fmt.Stringer + BackoffTypes []string }{ - BackoffTypes: []fmt.Stringer{boTxnLockName}, + BackoffTypes: []string{boTxnLockName}, }, ResolveLockTime: 1000, WriteKeys: 10000, @@ -577,9 +576,9 @@ func generateAnyExecInfo() *StmtExecInfo { CommitBackoffTime: 200, Mu: struct { sync.Mutex - BackoffTypes []fmt.Stringer + BackoffTypes []string }{ - BackoffTypes: []fmt.Stringer{boTxnLockName}, + BackoffTypes: []string{boTxnLockName}, }, ResolveLockTime: 2000, WriteKeys: 20000, @@ -961,12 +960,12 @@ func (s *testStmtSummarySuite) TestGetMoreThanOnceBindableStmt(c *C) { // Test `formatBackoffTypes`. func (s *testStmtSummarySuite) TestFormatBackoffTypes(c *C) { - backoffMap := make(map[fmt.Stringer]int) + backoffMap := make(map[string]int) c.Assert(formatBackoffTypes(backoffMap), IsNil) - bo1 := stringutil.StringerStr("pdrpc") + bo1 := "pdrpc" backoffMap[bo1] = 1 c.Assert(formatBackoffTypes(backoffMap), Equals, "pdrpc:1") - bo2 := stringutil.StringerStr("txnlock") + bo2 := "txnlock" backoffMap[bo2] = 2 c.Assert(formatBackoffTypes(backoffMap), Equals, "txnlock:2,pdrpc:1") From 2580240dde9cf3c19db090d055aa7b0c610aa138 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 25 May 2021 15:47:33 +0800 Subject: [PATCH 2/2] *: pass sql, plan digest down to KV request (#24854) --- bindinfo/bind_test.go | 9 +- bindinfo/handle.go | 12 +- bindinfo/session_handle.go | 6 +- config/config.go | 21 +++ distsql/request_builder.go | 10 ++ executor/adapter.go | 33 ++--- executor/analyze.go | 4 + executor/batch_point_get.go | 9 +- executor/checksum.go | 2 + executor/executor.go | 13 +- executor/executor_test.go | 121 ++++++++++++++++++ executor/insert.go | 1 + executor/partition_table_test.go | 8 +- executor/point_get.go | 1 + executor/replace.go | 1 + executor/update.go | 9 +- go.mod | 4 +- go.sum | 8 +- infoschema/tables_test.go | 4 +- kv/kv.go | 2 + kv/option.go | 2 + planner/core/cache.go | 5 +- planner/core/encode.go | 8 +- planner/core/plan_test.go | 10 +- planner/optimize.go | 6 +- session/session.go | 3 +- session/session_test.go | 2 +- sessionctx/stmtctx/stmtctx.go | 34 ++++- sessionctx/variable/session_test.go | 2 +- store/copr/batch_coprocessor.go | 13 +- store/copr/coprocessor.go | 13 +- store/driver/txn/snapshot.go | 2 + store/driver/txn/txn_driver.go | 2 + store/mockstore/unistore/rpc.go | 9 ++ store/tikv/2pc.go | 3 + store/tikv/cleanup.go | 2 +- store/tikv/commit.go | 2 +- store/tikv/prewrite.go | 2 +- store/tikv/scan.go | 14 +- store/tikv/snapshot.go | 21 ++- store/tikv/txn.go | 7 + tools/check/go.mod | 1 - util/deadlockhistory/deadlock_history.go | 2 +- util/deadlockhistory/deadlock_history_test.go | 16 ++- util/resourcegrouptag/resource_group_tag.go | 89 ++++--------- .../resource_group_tag_test.go | 111 ++++++++-------- 46 files changed, 432 insertions(+), 227 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 22b60187f0a77..4175ddce77eb9 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -156,7 +156,8 @@ func normalizeWithDefaultDB(c *C, sql, db string) (string, string) { testParser := parser.New() stmt, err := testParser.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) - return parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", "")) + normalized, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, "test", "")) + return normalized, digest.String() } func (s *testSuite) TestBindParse(c *C) { @@ -182,7 +183,7 @@ func (s *testSuite) TestBindParse(c *C) { c.Check(bindHandle.Size(), Equals, 1) sql, hash := parser.NormalizeDigest("select * from test . t") - bindData := bindHandle.GetBindRecord(hash, sql, "test") + bindData := bindHandle.GetBindRecord(hash.String(), sql, "test") c.Check(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t`") bind := bindData.Bindings[0] @@ -656,7 +657,7 @@ func (s *testSuite) TestBindingSymbolList(c *C) { // Normalize sql, hash := parser.NormalizeDigest("select a, b from test . t where a = 1 limit 0, 1") - bindData := s.domain.BindHandle().GetBindRecord(hash, sql, "test") + bindData := s.domain.BindHandle().GetBindRecord(hash.String(), sql, "test") c.Assert(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select `a` , `b` from `test` . `t` where `a` = ? limit ...") bind := bindData.Bindings[0] @@ -776,7 +777,7 @@ func (s *testSuite) TestErrorBind(c *C) { c.Assert(err, IsNil, Commentf("err %v", err)) sql, hash := parser.NormalizeDigest("select * from test . t where i > ?") - bindData := s.domain.BindHandle().GetBindRecord(hash, sql, "test") + bindData := s.domain.BindHandle().GetBindRecord(hash.String(), sql, "test") c.Check(bindData, NotNil) c.Check(bindData.OriginalSQL, Equals, "select * from `test` . `t` where `i` > ?") bind := bindData.Bindings[0] diff --git a/bindinfo/handle.go b/bindinfo/handle.go index 6111910395d55..2281af3c88bd3 100644 --- a/bindinfo/handle.go +++ b/bindinfo/handle.go @@ -210,7 +210,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor } sqlDigest := parser.DigestNormalized(record.OriginalSQL) - h.setBindRecord(sqlDigest, record) + h.setBindRecord(sqlDigest.String(), record) }() // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. @@ -256,7 +256,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) } record.Db = strings.ToLower(record.Db) - oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL), record.OriginalSQL, record.Db) + oldRecord := h.GetBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record.OriginalSQL, record.Db) var duplicateBinding *Binding if oldRecord != nil { binding := oldRecord.FindBinding(record.Bindings[0].ID) @@ -294,7 +294,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) return } - h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record) + h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record) }() // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. @@ -367,7 +367,7 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e if binding != nil { record.Bindings = append(record.Bindings, *binding) } - h.removeBindRecord(parser.DigestNormalized(originalSQL), record) + h.removeBindRecord(parser.DigestNormalized(originalSQL).String(), record) }() // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. @@ -515,7 +515,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) { defer h.sctx.Unlock() h.sctx.GetSessionVars().CurrentDB = bindRecord.Db err := bindRecord.prepareHints(h.sctx.Context) - return hash, bindRecord, err + return hash.String(), bindRecord, err } // setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord, @@ -624,7 +624,7 @@ func (h *BindHandle) CaptureBaselines() { } dbName := utilparser.GetDefaultDB(stmt, bindableStmt.Schema) normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName, bindableStmt.Query)) - if r := h.GetBindRecord(digest, normalizedSQL, dbName); r != nil && r.HasUsingBinding() { + if r := h.GetBindRecord(digest.String(), normalizedSQL, dbName); r != nil && r.HasUsingBinding() { continue } bindSQL := GenerateBindSQL(context.TODO(), stmt, bindableStmt.PlanHint, true, dbName) diff --git a/bindinfo/session_handle.go b/bindinfo/session_handle.go index 2604d5b563f52..6b54aa9118f77 100644 --- a/bindinfo/session_handle.go +++ b/bindinfo/session_handle.go @@ -60,7 +60,7 @@ func (h *SessionHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRe } // update the BindMeta to the cache. - h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record) + h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), record) return nil } @@ -78,14 +78,14 @@ func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding) } else { newRecord = record } - h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL), newRecord) + h.ch.setBindRecord(parser.DigestNormalized(record.OriginalSQL).String(), newRecord) updateMetrics(metrics.ScopeSession, oldRecord, newRecord, false) return nil } // GetBindRecord return the BindMeta of the (normdOrigSQL,db) if BindMeta exist. func (h *SessionHandle) GetBindRecord(normdOrigSQL, db string) *BindRecord { - hash := parser.DigestNormalized(normdOrigSQL) + hash := parser.DigestNormalized(normdOrigSQL).String() bindRecords := h.ch[hash] for _, bindRecord := range bindRecords { if bindRecord.OriginalSQL == normdOrigSQL { diff --git a/config/config.go b/config/config.go index 664cb5bceaa7d..83490c345dae8 100644 --- a/config/config.go +++ b/config/config.go @@ -137,6 +137,7 @@ type Config struct { DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"` SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"` StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"` + TopSQL TopSQL `toml:"top-sql" json:"top-sql"` // RepairMode indicates that the TiDB is in the repair mode for table meta. RepairMode bool `toml:"repair-mode" json:"repair-mode"` RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"` @@ -527,6 +528,16 @@ type StmtSummary struct { HistorySize int `toml:"history-size" json:"history-size"` } +// TopSQL is the config for top sql. +type TopSQL struct { + // Enable statement summary or not. + Enable bool `toml:"enable" json:"enable"` + // The refresh interval of statement summary. + RefreshInterval int `toml:"refresh-interval" json:"refresh-interval"` + // The maximum number of statements kept in memory. + MaxStmtCount uint `toml:"max-stmt-count" json:"max-stmt-count"` +} + // IsolationRead is the config for isolation read. type IsolationRead struct { // Engines filters tidb-server access paths by engine type. @@ -656,6 +667,11 @@ var defaultConf = Config{ RefreshInterval: 1800, HistorySize: 24, }, + TopSQL: TopSQL{ + Enable: true, + RefreshInterval: 1, + MaxStmtCount: 5000, + }, IsolationRead: IsolationRead{ Engines: []string{"tikv", "tiflash", "tidb"}, }, @@ -943,6 +959,11 @@ func TableLockEnabled() bool { return GetGlobalConfig().EnableTableLock } +// TopSQLEnabled uses to check whether enabled the top SQL feature. +func TopSQLEnabled() bool { + return GetGlobalConfig().TopSQL.Enable +} + // TableLockDelayClean uses to get the time of delay clean table lock. var TableLockDelayClean = func() uint64 { return GetGlobalConfig().DelayCleanTableLock diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 42a098cd05440..d34ecc9ab0c47 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -239,6 +240,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req }, } } + builder.SetResourceGroupTag(sv.StmtCtx) return builder } @@ -274,6 +276,14 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde return builder } +// SetResourceGroupTag sets the request resource group tag. +func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder { + if config.TopSQLEnabled() { + builder.Request.ResourceGroupTag = sc.GetResourceGroupTag() + } + return builder +} + func (builder *RequestBuilder) verifyTxnScope() error { if builder.txnScope == "" { builder.txnScope = kv.GlobalTxnScope diff --git a/executor/adapter.go b/executor/adapter.go index 15a823dd0f724..c5d9b0406602c 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -333,6 +333,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { return nil, err } + getPlanDigest(a.Ctx, a.Plan) + if err = e.Open(ctx); err != nil { terror.Call(e.Close) return nil, err @@ -919,11 +921,11 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { statsInfos := plannercore.GetStatsInfo(a.Plan) memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed() - _, planDigest := getPlanDigest(a.Ctx, a.Plan) + planDigest := getPlanDigest(a.Ctx, a.Plan) slowItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql.String(), - Digest: digest, + Digest: digest.String(), TimeTotal: costTime, TimeParse: sessVars.DurationParse, TimeCompile: sessVars.DurationCompile, @@ -981,7 +983,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { } domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{ SQL: sql.String(), - Digest: digest, + Digest: digest.String(), Start: sessVars.StartTime, Duration: costTime, Detail: sessVars.StmtCtx.GetExecDetails(), @@ -1011,14 +1013,15 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { } // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (normalized, planDigest string) { - normalized, planDigest = sctx.GetSessionVars().StmtCtx.GetPlanDigest() - if len(normalized) > 0 { - return - } - normalized, planDigest = plannercore.NormalizePlan(p) - sctx.GetSessionVars().StmtCtx.SetPlanDigest(normalized, planDigest) - return +func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string { + sc := sctx.GetSessionVars().StmtCtx + _, planDigest := sc.GetPlanDigest() + if planDigest != nil { + return planDigest.String() + } + normalized, planDigest := plannercore.NormalizePlan(p) + sc.SetPlanDigest(normalized, planDigest) + return planDigest.String() } // getEncodedPlan gets the encoded plan, and generates the hint string if indicated. @@ -1079,7 +1082,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) { } prevSQL = sessVars.PrevStmt.String() } - sessVars.SetPrevStmtDigest(digest) + sessVars.SetPrevStmtDigest(digest.String()) // No need to encode every time, so encode lazily. planGenerator := func() (string, string) { @@ -1092,11 +1095,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) { var planDigestGen func() string if a.Plan.TP() == plancodec.TypePointGet { planDigestGen = func() string { - _, planDigest := getPlanDigest(a.Ctx, a.Plan) + planDigest := getPlanDigest(a.Ctx, a.Plan) return planDigest } } else { - _, planDigest = getPlanDigest(a.Ctx, a.Plan) + planDigest = getPlanDigest(a.Ctx, a.Plan) } execDetail := stmtCtx.GetExecDetails() @@ -1120,7 +1123,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) { Charset: charset, Collation: collation, NormalizedSQL: normalizedSQL, - Digest: digest, + Digest: digest.String(), PrevSQL: prevSQL, PrevSQLDigest: prevSQLDigest, PlanGenerator: planGenerator, diff --git a/executor/analyze.go b/executor/analyze.go index ed7463b2ac46f..a9aecb7547588 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -355,6 +355,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang } else { kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges) } + kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx) kvReq, err := kvReqBuilder. SetAnalyzeRequest(e.analyzePB). SetStartTS(math.MaxUint64). @@ -653,6 +654,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil) + builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx) // Always set KeepOrder of the request to be true, in order to compute // correct `correlation` of columns. kvReq, err := reqBuilder. @@ -1323,6 +1325,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) if err != nil { @@ -1343,6 +1346,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.NotFillCache, true) snapshot.SetOption(kv.IsolationLevel, kv.RC) snapshot.SetOption(kv.Priority, kv.PriorityLow) + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 1e1fcd581cf61..c34eafd54c408 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -89,7 +89,9 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *BatchPointGetExec) Open(context.Context) error { e.snapshotTS = e.startTS - txnCtx := e.ctx.GetSessionVars().TxnCtx + sessVars := e.ctx.GetSessionVars() + txnCtx := sessVars.TxnCtx + stmtCtx := sessVars.StmtCtx if e.lock { e.snapshotTS = txnCtx.GetForUpdateTS() } @@ -112,12 +114,12 @@ func (e *BatchPointGetExec) Open(context.Context) error { SnapshotRuntimeStats: snapshotStats, } snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } - snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + snapshot.SetOption(kv.TaskID, stmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { @@ -128,6 +130,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { }, }) } + setResourceGroupTagForTxn(stmtCtx, snapshot) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/checksum.go b/executor/checksum.go index 63f622d2f8140..62543068820e9 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -240,6 +240,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6 } var builder distsql.RequestBuilder + builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx) return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil). SetChecksumRequest(checksum). SetStartTS(c.StartTs). @@ -256,6 +257,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6 ranges := ranger.FullRange() var builder distsql.RequestBuilder + builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx) return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges). SetChecksumRequest(checksum). SetStartTS(c.StartTs). diff --git a/executor/executor.go b/executor/executor.go index 2b9b8f0f52954..1d136bac8a2f9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -29,6 +29,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" @@ -974,7 +975,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx { + var planDigest *parser.Digest _, sqlDigest := seVars.StmtCtx.SQLDigest() + if config.TopSQLEnabled() { + _, planDigest = seVars.StmtCtx.GetPlanDigest() + } return &tikvstore.LockCtx{ Killed: &seVars.Killed, ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(), @@ -984,7 +989,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, LockExpired: &seVars.TxnCtx.LockExpire, - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest), + ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest, planDigest), OnDeadlock: func(deadlock *tikverr.ErrDeadlock) { // TODO: Support collecting retryable deadlocks according to the config. if !deadlock.IsRetryable { @@ -1794,3 +1799,9 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd } return nil } + +func setResourceGroupTagForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) { + if snapshot != nil && config.TopSQLEnabled() { + snapshot.SetOption(kv.ResourceGroupTag, sc.GetResourceGroupTag()) + } +} diff --git a/executor/executor_test.go b/executor/executor_test.go index 4ba3ea2e89e85..c50459cf2850b 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -31,6 +31,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser" "github.com/pingcap/parser/auth" @@ -58,6 +59,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/copr" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/store/tikv/oracle" @@ -146,6 +148,7 @@ var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}}) var _ = SerialSuites(&testStaleTxnSerialSuite{&baseTestSuite{}}) var _ = SerialSuites(&testCoprCache{}) var _ = SerialSuites(&testPrepareSuite{}) +var _ = SerialSuites(&testResourceTagSuite{&baseTestSuite{}}) type testSuite struct{ *baseTestSuite } type testSuiteP1 struct{ *baseTestSuite } @@ -166,6 +169,7 @@ type testCoprCache struct { cls cluster.Cluster } type testPrepareSuite struct{ testData testutil.TestData } +type testResourceTagSuite struct{ *baseTestSuite } type baseTestSuite struct { cluster cluster.Cluster @@ -8311,3 +8315,120 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) { rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 'a'").Rows() c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) } + +func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int, b int, unique index idx(a));") + tbInfo := testGetTableByName(c, tk.Se, "test", "t") + + // Enable Top SQL + cfg := config.GetGlobalConfig() + newCfg := *cfg + newCfg.TopSQL.Enable = true + config.StoreGlobalConfig(&newCfg) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook", `return(true)`), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook") + + var sqlDigest, planDigest *parser.Digest + checkFn := func() {} + unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) { + var startKey []byte + var ctx *kvrpcpb.Context + switch req.Type { + case tikvrpc.CmdGet: + request := req.Get() + startKey = request.Key + ctx = request.Context + case tikvrpc.CmdBatchGet: + request := req.BatchGet() + startKey = request.Keys[0] + ctx = request.Context + case tikvrpc.CmdPrewrite: + request := req.Prewrite() + startKey = request.Mutations[0].Key + ctx = request.Context + case tikvrpc.CmdCommit: + request := req.Commit() + startKey = request.Keys[0] + ctx = request.Context + case tikvrpc.CmdCop: + request := req.Cop() + startKey = request.Ranges[0].Start + ctx = request.Context + case tikvrpc.CmdPessimisticLock: + request := req.PessimisticLock() + startKey = request.PrimaryLock + ctx = request.Context + } + tid := tablecodec.DecodeTableID(startKey) + if tid != tbInfo.Meta().ID { + return + } + if ctx == nil { + return + } + tag := &tipb.ResourceGroupTag{} + err := tag.Unmarshal(ctx.ResourceGroupTag) + c.Assert(err, IsNil) + sqlDigest = parser.NewDigest(tag.SqlDigest) + planDigest = parser.NewDigest(tag.PlanDigest) + checkFn() + } + + resetVars := func() { + sqlDigest = parser.NewDigest(nil) + planDigest = parser.NewDigest(nil) + } + + cases := []struct { + sql string + ignore bool + }{ + {sql: "insert into t values(1,1),(2,2),(3,3)"}, + {sql: "select * from t use index (idx) where a=1"}, + {sql: "select * from t use index (idx) where a in (1,2,3)"}, + {sql: "select * from t use index (idx) where a>1"}, + {sql: "select * from t where b>1"}, + {sql: "begin pessimistic", ignore: true}, + {sql: "insert into t values(4,4)"}, + {sql: "commit", ignore: true}, + {sql: "update t set a=5,b=5 where a=5"}, + {sql: "replace into t values(6,6)"}, + } + for _, ca := range cases { + resetVars() + commentf := Commentf("%v", ca.sql) + + _, expectSQLDigest := parser.NormalizeDigest(ca.sql) + var expectPlanDigest *parser.Digest + checkCnt := 0 + checkFn = func() { + if ca.ignore { + return + } + if expectPlanDigest == nil { + info := tk.Se.ShowProcess() + c.Assert(info, NotNil) + p, ok := info.Plan.(plannercore.Plan) + c.Assert(ok, IsTrue) + _, expectPlanDigest = plannercore.NormalizePlan(p) + } + c.Assert(sqlDigest.String(), Equals, expectSQLDigest.String(), commentf) + c.Assert(planDigest.String(), Equals, expectPlanDigest.String()) + checkCnt++ + } + + if strings.HasPrefix(ca.sql, "select") { + tk.MustQuery(ca.sql) + } else { + tk.MustExec(ca.sql) + } + if ca.ignore { + continue + } + c.Assert(checkCnt > 0, IsTrue, commentf) + } +} diff --git a/executor/insert.go b/executor/insert.go index 178aefed5fb8b..351f04c2ca5eb 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -63,6 +63,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { if err != nil { return err } + setResourceGroupTagForTxn(sessVars.StmtCtx, txn) txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index b9823a32a647e..82ec887ce2692 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -122,7 +122,7 @@ func (s *partitionTableSuite) TestPointGetwithRangeAndListPartitionTable(c *C) { tk.MustExec("set @@session.tidb_enable_list_partition = ON") // list partition table - tk.MustExec(`create table tlist(a int, b int, unique index idx_a(a), index idx_b(b)) partition by list(a)( + tk.MustExec(`create table tlist(a int, b int, unique index idx_a(a), index idx_b(b)) partition by list(a)( partition p0 values in (NULL, 1, 2, 3, 4), partition p1 values in (5, 6, 7, 8), partition p2 values in (9, 10, 11, 12));`) @@ -172,15 +172,15 @@ func (s *partitionTableSuite) TestPointGetwithRangeAndListPartitionTable(c *C) { } // test table dual - queryRange1 := fmt.Sprintf("select a from trange1 where a=200") + queryRange1 := "select a from trange1 where a=200" c.Assert(tk.HasPlan(queryRange1, "TableDual"), IsTrue) // check if TableDual is used tk.MustQuery(queryRange1).Check(testkit.Rows()) - queryRange2 := fmt.Sprintf("select a from trange2 where a=200") + queryRange2 := "select a from trange2 where a=200" c.Assert(tk.HasPlan(queryRange2, "TableDual"), IsTrue) // check if TableDual is used tk.MustQuery(queryRange2).Check(testkit.Rows()) - queryList := fmt.Sprintf("select a from tlist where a=200") + queryList := "select a from tlist where a=200" c.Assert(tk.HasPlan(queryList, "TableDual"), IsTrue) // check if TableDual is used tk.MustQuery(queryList).Check(testkit.Rows()) } diff --git a/executor/point_get.go b/executor/point_get.go index bc476794888fe..685d378f83d87 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -159,6 +159,7 @@ func (e *PointGetExecutor) Open(context.Context) error { }, }) } + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot) return nil } diff --git a/executor/replace.go b/executor/replace.go index 8f35be4d05dbd..03dc4bfad0543 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -224,6 +224,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { defer snapshot.DelOption(kv.CollectRuntimeStats) } } + setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, txn) prefetchStart := time.Now() // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. diff --git a/executor/update.go b/executor/update.go index 7c4b07ab8e6f6..954aa43c8067c 100644 --- a/executor/update.go +++ b/executor/update.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" @@ -258,11 +259,17 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { memUsageOfChk = chk.MemoryUsage() e.memTracker.Consume(memUsageOfChk) if e.collectRuntimeStatsEnabled() { - txn, err := e.ctx.Txn(false) + txn, err := e.ctx.Txn(true) if err == nil && txn.GetSnapshot() != nil { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } + if config.TopSQLEnabled() { + txn, err := e.ctx.Txn(true) + if err == nil { + txn.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag()) + } + } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { chunkRow := chk.GetRow(rowIdx) datumRow := chunkRow.GetDatumRow(fields) diff --git a/go.mod b/go.mod index 7b06ca2a8682d..0879d5f157bf4 100644 --- a/go.mod +++ b/go.mod @@ -45,10 +45,10 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 + github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible - github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 + github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index ac5cfe59f6b17..664a52bdca20d 100644 --- a/go.sum +++ b/go.sum @@ -440,16 +440,16 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 h1:wsH3psMH5ksDowsN9VUE9ZqSrX6oF4AYQQfOunkvSfU= -github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 h1:v7SipssMu4X1tVQOe3PIVE73keJNHCFXe4Cza5uNDZ8= +github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 h1:Kcp3jIcQrqG+pT1JQ0oWyRncVKQtDgnMFzRt3zJBaBo= -github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= +github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c h1:El3pMBpJHuSkItkHsnBqsaaHzJwFBNDt3Aul98AhREY= +github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 2d6506b56d5f4..761bb75fb76ed 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1517,7 +1517,7 @@ func (s *testTableSuite) TestTrx(c *C) { sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} sm.txnInfo[0] = &txninfo.TxnInfo{ StartTS: 424768545227014155, - CurrentSQLDigest: digest, + CurrentSQLDigest: digest.String(), State: txninfo.TxnRunningNormal, BlockStartTime: nil, EntriesCount: 1, @@ -1528,7 +1528,7 @@ func (s *testTableSuite) TestTrx(c *C) { } tk.Se.SetSessionManager(sm) tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( - testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest + " Normal 1 19 2 root test"), + testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest.String() + " Normal 1 19 2 root test"), ) } diff --git a/kv/kv.go b/kv/kv.go index 0889106ba9fbf..be612512317db 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -288,6 +288,8 @@ type Request struct { IsStaleness bool // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels []*metapb.StoreLabel + // ResourceGroupTag indicates the kv request task group. + ResourceGroupTag []byte } // ResultSubset represents a result subset from a single storage unit. diff --git a/kv/option.go b/kv/option.go index dc0d700666d5a..de5a1d8834c40 100644 --- a/kv/option.go +++ b/kv/option.go @@ -59,6 +59,8 @@ const ( IsStalenessReadOnly // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels + // ResourceGroupTag indicates the resource group of the kv request. + ResourceGroupTag ) // ReplicaReadType is the type of replica to read data from diff --git a/planner/core/cache.go b/planner/core/cache.go index f97c207d189de..0e5a624b3d635 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -199,7 +200,7 @@ type CachedPrepareStmt struct { Executor interface{} NormalizedSQL string NormalizedPlan string - SQLDigest string - PlanDigest string + SQLDigest *parser.Digest + PlanDigest *parser.Digest ForUpdateRead bool } diff --git a/planner/core/encode.go b/planner/core/encode.go index d1cad479d52f8..8dc6ddeca9473 100644 --- a/planner/core/encode.go +++ b/planner/core/encode.go @@ -16,11 +16,11 @@ package core import ( "bytes" "crypto/sha256" - "fmt" "hash" "sync" "github.com/pingcap/failpoint" + "github.com/pingcap/parser" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/plancodec" ) @@ -120,10 +120,10 @@ type planDigester struct { } // NormalizePlan is used to normalize the plan and generate plan digest. -func NormalizePlan(p Plan) (normalized, digest string) { +func NormalizePlan(p Plan) (normalized string, digest *parser.Digest) { selectPlan := getSelectPlan(p) if selectPlan == nil { - return "", "" + return "", parser.NewDigest(nil) } d := digesterPool.Get().(*planDigester) defer digesterPool.Put(d) @@ -134,7 +134,7 @@ func NormalizePlan(p Plan) (normalized, digest string) { panic(err) } d.buf.Reset() - digest = fmt.Sprintf("%x", d.hasher.Sum(nil)) + digest = parser.NewDigest(d.hasher.Sum(nil)) d.hasher.Reset() return } diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 53f63f25fbc18..6c29eef90f5ae 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -171,12 +171,12 @@ func (s *testPlanNormalize) TestNormalizedPlanForDiffStore(c *C) { normalizedPlanRows := getPlanRows(normalizedPlan) c.Assert(err, IsNil) s.testData.OnRecord(func() { - output[i].Digest = digest + output[i].Digest = digest.String() output[i].Plan = normalizedPlanRows }) compareStringSlice(c, normalizedPlanRows, output[i].Plan) - c.Assert(digest != lastDigest, IsTrue) - lastDigest = digest + c.Assert(digest.String() != lastDigest, IsTrue) + lastDigest = digest.String() } } @@ -404,10 +404,10 @@ func testNormalizeDigest(tk *testkit.TestKit, c *C, sql1, sql2 string, isSame bo comment := Commentf("sql1: %v, sql2: %v\n%v !=\n%v\n", sql1, sql2, normalized1, normalized2) if isSame { c.Assert(normalized1, Equals, normalized2, comment) - c.Assert(digest1, Equals, digest2, comment) + c.Assert(digest1.String(), Equals, digest2.String(), comment) } else { c.Assert(normalized1 != normalized2, IsTrue, comment) - c.Assert(digest1 != digest2, IsTrue, comment) + c.Assert(digest1.String() != digest2.String(), IsTrue, comment) } } diff --git a/planner/optimize.go b/planner/optimize.go index ec9bfef67d0a7..c667d6b124ef9 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -306,7 +306,7 @@ func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode, specifiledDB string) normalizeSQL := parser.Normalize(utilparser.RestoreWithDefaultDB(x.Stmt, specifiledDB, x.Text())) normalizeSQL = plannercore.EraseLastSemicolonInSQL(normalizeSQL) hash := parser.DigestNormalized(normalizeSQL) - return x.Stmt, normalizeSQL, hash, nil + return x.Stmt, normalizeSQL, hash.String(), nil case *ast.SetOprStmt: plannercore.EraseLastSemicolon(x) var normalizeExplainSQL string @@ -322,7 +322,7 @@ func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode, specifiledDB string) } normalizeSQL := normalizeExplainSQL[idx:] hash := parser.DigestNormalized(normalizeSQL) - return x.Stmt, normalizeSQL, hash, nil + return x.Stmt, normalizeSQL, hash.String(), nil } case *ast.SelectStmt, *ast.SetOprStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt: plannercore.EraseLastSemicolon(x) @@ -335,7 +335,7 @@ func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode, specifiledDB string) return x, "", "", nil } normalizedSQL, hash := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(x, specifiledDB, x.Text())) - return x, normalizedSQL, hash, nil + return x, normalizedSQL, hash.String(), nil } return nil, "", "", nil } diff --git a/session/session.go b/session/session.go index a479ba956cf63..797d0326c4a48 100644 --- a/session/session.go +++ b/session/session.go @@ -1226,7 +1226,8 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu if oldPi != nil && oldPi.Info == pi.Info { pi.Time = oldPi.Time } - _, pi.Digest = s.sessionVars.StmtCtx.SQLDigest() + _, digest := s.sessionVars.StmtCtx.SQLDigest() + pi.Digest = digest.String() // DO NOT reset the currentPlan to nil until this query finishes execution, otherwise reentrant calls // of SetProcessInfo would override Plan and PlanExplainRows to nil. if command == mysql.ComSleep { diff --git a/session/session_test.go b/session/session_test.go index 9d2d63cb02804..b8d8538193c7d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4390,7 +4390,7 @@ func (s *testTxnStateSuite) TestBasic(c *C) { tk.MustExec("select * from t for update;") info = tk.Se.TxnInfo() _, expectedDigest := parser.NormalizeDigest("select * from t for update;") - c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest.String()) c.Assert(info.State, Equals, txninfo.TxnRunningNormal) c.Assert(info.BlockStartTime, IsNil) // len and size will be covered in TestLenAndSize diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index d8a75aec48610..ea8bd70b8c0f2 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/resourcegrouptag" atomic2 "go.uber.org/atomic" "go.uber.org/zap" ) @@ -145,11 +146,11 @@ type StatementContext struct { digestMemo struct { sync.Once normalized string - digest string + digest *parser.Digest } // planNormalized use for cache the normalized plan, avoid duplicate builds. planNormalized string - planDigest string + planDigest *parser.Digest encodedPlan string planHint string planHintSet bool @@ -165,6 +166,8 @@ type StatementContext struct { // stmtCache is used to store some statement-related values. stmtCache map[StmtCacheKey]interface{} + // resourceGroupTag cache for the current statement resource group tag. + resourceGroupTag atomic.Value } // StmtHints are SessionVars related sql hints. @@ -229,7 +232,7 @@ func (sc *StatementContext) ResetStmtCache() { // SQLDigest gets normalized and digest for provided sql. // it will cache result after first calling. -func (sc *StatementContext) SQLDigest() (normalized, sqlDigest string) { +func (sc *StatementContext) SQLDigest() (normalized string, sqlDigest *parser.Digest) { sc.digestMemo.Do(func() { sc.digestMemo.normalized, sc.digestMemo.digest = parser.NormalizeDigest(sc.OriginalSQL) }) @@ -237,20 +240,37 @@ func (sc *StatementContext) SQLDigest() (normalized, sqlDigest string) { } // InitSQLDigest sets the normalized and digest for sql. -func (sc *StatementContext) InitSQLDigest(normalized, digest string) { +func (sc *StatementContext) InitSQLDigest(normalized string, digest *parser.Digest) { sc.digestMemo.Do(func() { sc.digestMemo.normalized, sc.digestMemo.digest = normalized, digest }) } // GetPlanDigest gets the normalized plan and plan digest. -func (sc *StatementContext) GetPlanDigest() (normalized, planDigest string) { +func (sc *StatementContext) GetPlanDigest() (normalized string, planDigest *parser.Digest) { return sc.planNormalized, sc.planDigest } +// GetResourceGroupTag gets the resource group of the statement. +func (sc *StatementContext) GetResourceGroupTag() []byte { + tag, _ := sc.resourceGroupTag.Load().([]byte) + if len(tag) > 0 { + return tag + } + normalized, sqlDigest := sc.SQLDigest() + if len(normalized) == 0 { + return nil + } + tag = resourcegrouptag.EncodeResourceGroupTag(sqlDigest, sc.planDigest) + sc.resourceGroupTag.Store(tag) + return tag +} + // SetPlanDigest sets the normalized plan and plan digest. -func (sc *StatementContext) SetPlanDigest(normalized, planDigest string) { - sc.planNormalized, sc.planDigest = normalized, planDigest +func (sc *StatementContext) SetPlanDigest(normalized string, planDigest *parser.Digest) { + if planDigest != nil { + sc.planNormalized, sc.planDigest = normalized, planDigest + } } // GetEncodedPlan gets the encoded plan, it is used to avoid repeated encode. diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index ee9030056bf05..00b728557d188 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -231,7 +231,7 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) { logItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql, - Digest: digest, + Digest: digest.String(), TimeTotal: costTime, TimeParse: time.Duration(10), TimeCompile: time.Duration(10), diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 8e45b546f10a9..7f954bb254051 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -514,12 +514,13 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta } req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{ - IsolationLevel: isolationLevelToPB(b.req.IsolationLevel), - Priority: priorityToPB(b.req.Priority), - NotFillCache: b.req.NotFillCache, - RecordTimeStat: true, - RecordScanStat: true, - TaskId: b.req.TaskID, + IsolationLevel: isolationLevelToPB(b.req.IsolationLevel), + Priority: priorityToPB(b.req.Priority), + NotFillCache: b.req.NotFillCache, + RecordTimeStat: true, + RecordScanStat: true, + TaskId: b.req.TaskID, + ResourceGroupTag: b.req.ResourceGroupTag, }) req.StoreTp = tikvrpc.TiFlash diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index c66c3cda9af35..8834824432bfd 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -699,12 +699,13 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch } req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ - IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), - Priority: priorityToPB(worker.req.Priority), - NotFillCache: worker.req.NotFillCache, - RecordTimeStat: true, - RecordScanStat: true, - TaskId: worker.req.TaskID, + IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), + Priority: priorityToPB(worker.req.Priority), + NotFillCache: worker.req.NotFillCache, + RecordTimeStat: true, + RecordScanStat: true, + TaskId: worker.req.TaskID, + ResourceGroupTag: worker.req.ResourceGroupTag, }) req.StoreTp = getEndPointType(task.storeType) startTime := time.Now() diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index 6692f45a749a3..892a85e9ccebb 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -88,6 +88,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { s.KVSnapshot.SetIsStatenessReadOnly(val.(bool)) case kv.MatchStoreLabels: s.KVSnapshot.SetMatchStoreLabels(val.([]*metapb.StoreLabel)) + case kv.ResourceGroupTag: + s.KVSnapshot.SetResourceGroupTag(val.([]byte)) } } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 7f05f80139c12..a722557f8fc4e 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -170,6 +170,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetIsStatenessReadOnly(val.(bool)) case kv.MatchStoreLabels: txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) + case kv.ResourceGroupTag: + txn.KVTxn.SetResourceGroupTag(val.([]byte)) } } diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 378b6b23b56a4..1cde24dd5ed94 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -59,6 +59,9 @@ type RPCClient struct { rpcCli Client } +// UnistoreRPCClientSendHook exports for test. +var UnistoreRPCClientSendHook func(*tikvrpc.Request) + // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { failpoint.Inject("rpcServerBusy", func(val failpoint.Value) { @@ -67,6 +70,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } }) + failpoint.Inject("unistoreRPCClientSendHook", func(val failpoint.Value) { + if val.(bool) && UnistoreRPCClientSendHook != nil { + UnistoreRPCClientSendHook(req) + } + }) + if req.StoreTp == tikvrpc.TiDB { return c.redirectRequestToRPCServer(ctx, addr, req, timeout) } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c01d97981dd09..a299ba357d3e8 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -107,6 +107,8 @@ type twoPhaseCommitter struct { doingAmend bool binlog BinlogExecutor + + resourceGroupTag []byte } type memBufferMutations struct { @@ -428,6 +430,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { c.lockTTL = txnLockTTL(txn.startTime, size) c.priority = txn.priority.ToPB() c.syncLog = txn.syncLog + c.resourceGroupTag = txn.resourceGroupTag c.setDetail(commitDetail) return nil } diff --git a/store/tikv/cleanup.go b/store/tikv/cleanup.go index 0260d770cdd44..e21c1211af9bf 100644 --- a/store/tikv/cleanup.go +++ b/store/tikv/cleanup.go @@ -40,7 +40,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batc req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{ Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, - }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + }, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) resp, err := c.store.SendReq(bo, req, batch.region, ReadTimeoutShort) if err != nil { return errors.Trace(err) diff --git a/store/tikv/commit.go b/store/tikv/commit.go index 10c60d9f6d4bd..8e876a6f11468 100644 --- a/store/tikv/commit.go +++ b/store/tikv/commit.go @@ -46,7 +46,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch StartVersion: c.startTS, Keys: keys, CommitVersion: c.commitTS, - }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + }, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort) diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index 49ddc1525b748..305806c931149 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u req.TryOnePc = true } - return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) } func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error { diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 94ece80ff067f..64b9a4728f551 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -192,9 +192,10 @@ func (s *Scanner) getData(bo *Backoffer) error { } sreq := &pb.ScanRequest{ Context: &pb.Context{ - Priority: s.snapshot.priority.ToPB(), - NotFillCache: s.snapshot.notFillCache, - IsolationLevel: s.snapshot.isolationLevel.ToPB(), + Priority: s.snapshot.priority.ToPB(), + NotFillCache: s.snapshot.notFillCache, + IsolationLevel: s.snapshot.isolationLevel.ToPB(), + ResourceGroupTag: s.snapshot.resourceGroupTag, }, StartKey: s.nextStartKey, EndKey: reqEndKey, @@ -210,9 +211,10 @@ func (s *Scanner) getData(bo *Backoffer) error { } s.snapshot.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, pb.Context{ - Priority: s.snapshot.priority.ToPB(), - NotFillCache: s.snapshot.notFillCache, - TaskId: s.snapshot.mu.taskID, + Priority: s.snapshot.priority.ToPB(), + NotFillCache: s.snapshot.notFillCache, + TaskId: s.snapshot.mu.taskID, + ResourceGroupTag: s.snapshot.resourceGroupTag, }) s.snapshot.mu.RUnlock() resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 180ac59369aca..9828537b7cb79 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -108,6 +108,8 @@ type KVSnapshot struct { matchStoreLabels []*metapb.StoreLabel } sampleStep uint32 + // resourceGroupTag is use to set the kv request resource group tag. + resourceGroupTag []byte } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -310,9 +312,10 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec Keys: pending, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ - Priority: s.priority.ToPB(), - NotFillCache: s.notFillCache, - TaskId: s.mu.taskID, + Priority: s.priority.ToPB(), + NotFillCache: s.notFillCache, + TaskId: s.mu.taskID, + ResourceGroupTag: s.resourceGroupTag, }) isStaleness = s.mu.isStaleness matchStoreLabels = s.mu.matchStoreLabels @@ -462,9 +465,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, Key: k, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ - Priority: s.priority.ToPB(), - NotFillCache: s.notFillCache, - TaskId: s.mu.taskID, + Priority: s.priority.ToPB(), + NotFillCache: s.notFillCache, + TaskId: s.mu.taskID, + ResourceGroupTag: s.resourceGroupTag, }) isStaleness = s.mu.isStaleness matchStoreLabels = s.mu.matchStoreLabels @@ -629,6 +633,11 @@ func (s *KVSnapshot) SetMatchStoreLabels(labels []*metapb.StoreLabel) { s.mu.matchStoreLabels = labels } +// SetResourceGroupTag sets resource group of the kv request. +func (s *KVSnapshot) SetResourceGroupTag(tag []byte) { + s.resourceGroupTag = tag +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt)) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 78fd2d5277a82..f2c5fc24449c3 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -110,6 +110,7 @@ type KVTxn struct { causalConsistency bool scope string kvFilter KVFilter + resourceGroupTag []byte } // ExtractStartTS use `option` to get the proper startTS for a transaction. @@ -231,6 +232,12 @@ func (txn *KVTxn) SetPriority(pri Priority) { txn.GetSnapshot().SetPriority(pri) } +// SetResourceGroupTag sets the resource tag for both write and read. +func (txn *KVTxn) SetResourceGroupTag(tag []byte) { + txn.resourceGroupTag = tag + txn.GetSnapshot().SetResourceGroupTag(tag) +} + // SetSchemaAmender sets an amender to update mutations after schema change. func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) { txn.schemaAmender = sa diff --git a/tools/check/go.mod b/tools/check/go.mod index f3827700f1589..90b34e7a296c4 100644 --- a/tools/check/go.mod +++ b/tools/check/go.mod @@ -18,7 +18,6 @@ require ( gopkg.in/alecthomas/gometalinter.v3 v3.0.0 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20170321130658-9670b87a702e // indirect - gopkg.in/yaml.v2 v2.2.2 // indirect honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3 ) diff --git a/util/deadlockhistory/deadlock_history.go b/util/deadlockhistory/deadlock_history.go index ddb78067ffe7c..c219442cf5bf1 100644 --- a/util/deadlockhistory/deadlock_history.go +++ b/util/deadlockhistory/deadlock_history.go @@ -183,7 +183,7 @@ func ErrDeadlockToDeadlockRecord(dl *tikverr.ErrDeadlock) *DeadlockRecord { } waitChain = append(waitChain, WaitChainItem{ TryLockTxn: rawItem.Txn, - SQLDigest: sqlDigest, + SQLDigest: hex.EncodeToString(sqlDigest), Key: rawItem.Key, AllSQLs: nil, TxnHoldingLock: rawItem.WaitForTxn, diff --git a/util/deadlockhistory/deadlock_history_test.go b/util/deadlockhistory/deadlock_history_test.go index 35cbb6c8513cd..dd9428a9f550a 100644 --- a/util/deadlockhistory/deadlock_history_test.go +++ b/util/deadlockhistory/deadlock_history_test.go @@ -20,9 +20,10 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/parser" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/resourcegrouptag" + "github.com/pingcap/tipb/go-tipb" ) type testDeadlockHistorySuite struct{} @@ -228,6 +229,11 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { } func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { + digest1, digest2 := parser.NewDigest([]byte("aabbccdd")), parser.NewDigest([]byte("ddccbbaa")) + tag1 := tipb.ResourceGroupTag{SqlDigest: digest1.Bytes()} + tag2 := tipb.ResourceGroupTag{SqlDigest: digest2.Bytes()} + tag1Data, _ := tag1.Marshal() + tag2Data, _ := tag2.Marshal() err := &tikverr.ErrDeadlock{ Deadlock: &kvrpcpb.Deadlock{ LockTs: 101, @@ -238,13 +244,13 @@ func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { Txn: 100, WaitForTxn: 101, Key: []byte("k2"), - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag("aabbccdd"), + ResourceGroupTag: tag1Data, }, { Txn: 101, WaitForTxn: 100, Key: []byte("k1"), - ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag("ddccbbaa"), + ResourceGroupTag: tag2Data, }, }, }, @@ -256,13 +262,13 @@ func (s *testDeadlockHistorySuite) TestErrDeadlockToDeadlockRecord(c *C) { WaitChain: []WaitChainItem{ { TryLockTxn: 100, - SQLDigest: "aabbccdd", + SQLDigest: digest1.String(), Key: []byte("k2"), TxnHoldingLock: 101, }, { TryLockTxn: 101, - SQLDigest: "ddccbbaa", + SQLDigest: digest2.String(), Key: []byte("k1"), TxnHoldingLock: 100, }, diff --git a/util/resourcegrouptag/resource_group_tag.go b/util/resourcegrouptag/resource_group_tag.go index cacbf574b91fb..03150a0393ea4 100644 --- a/util/resourcegrouptag/resource_group_tag.go +++ b/util/resourcegrouptag/resource_group_tag.go @@ -1,85 +1,40 @@ package resourcegrouptag import ( - "encoding/hex" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" -) - -const ( - resourceGroupTagPrefixSQLDigest = byte(1) + "github.com/pingcap/parser" + "github.com/pingcap/tipb/go-tipb" ) -// EncodeResourceGroupTag encodes sqlDigest into resource group tag. -// A resource group tag can be carried in the Context field of TiKV requests, which is a byte array, and sent to TiKV as -// diagnostic information. Currently it contains only the SQL Digest, and the codec method is naive but extendable. -// This function doesn't return error. When there's some error, which can only be caused by unexpected format of the -// arguments, it simply returns an empty result. -// The format: -// +-----------+-----------------------+----------------------------+---------------+----------------+---- -// | version=1 | field1 prefix (1byte) | field1 content (var bytes) | field2 prefix | field2 content | ... -// +-----------+-----------------------+----------------------------+---------------+----------------+---- -// The `version` section marks the codec version, which makes it easier for changing the format in the future. -// Each field starts with a byte to mark what field it is, and the length of the content depends on the field's -// definition. -// Currently there's only one field (SQL Digest), and its content starts with a byte `B` describing it's length, and -// then follows by exactly `B` bytes. -func EncodeResourceGroupTag(sqlDigest string) []byte { - if len(sqlDigest) == 0 { - return nil - } - if len(sqlDigest) >= 512 { - logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: length too long", zap.String("sqlDigest", sqlDigest)) +// EncodeResourceGroupTag encodes sql digest and plan digest into resource group tag. +func EncodeResourceGroupTag(sqlDigest, planDigest *parser.Digest) []byte { + if sqlDigest == nil && planDigest == nil { return nil } - res := make([]byte, 3+len(sqlDigest)/2) - - const encodingVersion = 1 - res[0] = encodingVersion - - res[1] = resourceGroupTagPrefixSQLDigest - // The SQL Digest is expected to be a hex string. Convert it back to bytes to save half of the memory. - res[2] = byte(len(sqlDigest) / 2) - _, err := hex.Decode(res[3:], []byte(sqlDigest)) + tag := &tipb.ResourceGroupTag{} + if sqlDigest != nil { + tag.SqlDigest = sqlDigest.Bytes() + } + if planDigest != nil { + tag.PlanDigest = planDigest.Bytes() + } + b, err := tag.Marshal() if err != nil { - logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: invalid hex string", zap.String("sqlDigest", sqlDigest)) return nil } - - return res + return b } -// DecodeResourceGroupTag decodes a resource group tag into various information contained in it. Currently it contains -// only the SQL Digest. -func DecodeResourceGroupTag(data []byte) (sqlDigest string, err error) { +// DecodeResourceGroupTag decodes a resource group tag and return the sql digest. +func DecodeResourceGroupTag(data []byte) (sqlDigest []byte, err error) { if len(data) == 0 { - return "", nil - } - - encodingVersion := data[0] - if encodingVersion != 1 { - return "", errors.Errorf("unsupported resource group tag version %v", data[0]) + return nil, nil } - rem := data[1:] - - for len(rem) > 0 { - switch rem[0] { - case resourceGroupTagPrefixSQLDigest: - // There must be one more byte at rem[1] to represent the content's length, and the remaining bytes should - // not be shorter than the length specified by rem[1]. - if len(rem) < 2 || len(rem)-2 < int(rem[1]) { - return "", errors.Errorf("cannot parse resource group tag: field length mismatch, tag: %v", hex.EncodeToString(data)) - } - fieldLen := int(rem[1]) - sqlDigest = hex.EncodeToString(rem[2 : 2+fieldLen]) - rem = rem[2+fieldLen:] - default: - return "", errors.Errorf("resource group tag field not recognized, prefix: %v, tag: %v", rem[0], hex.EncodeToString(data)) - } + tag := &tipb.ResourceGroupTag{} + err = tag.Unmarshal(data) + if err != nil { + return nil, errors.Errorf("invalid resource group tag data %x", data) } - - return + return tag.SqlDigest, nil } diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go index a979b92fce315..f5334aacbd17f 100644 --- a/util/resourcegrouptag/resource_group_tag_test.go +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -14,10 +14,14 @@ package resourcegrouptag import ( + "crypto/sha256" "math/rand" "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tipb/go-tipb" ) type testUtilsSuite struct{} @@ -29,83 +33,76 @@ func TestT(t *testing.T) { } func (s *testUtilsSuite) TestResourceGroupTagEncoding(c *C) { - sqlDigest := "" - tag := EncodeResourceGroupTag(sqlDigest) + sqlDigest := parser.NewDigest(nil) + tag := EncodeResourceGroupTag(sqlDigest, nil) c.Assert(len(tag), Equals, 0) decodedSQLDigest, err := DecodeResourceGroupTag(tag) c.Assert(err, IsNil) c.Assert(len(decodedSQLDigest), Equals, 0) - sqlDigest = "aa" - tag = EncodeResourceGroupTag(sqlDigest) + sqlDigest = parser.NewDigest([]byte{'a', 'a'}) + tag = EncodeResourceGroupTag(sqlDigest, nil) // version(1) + prefix(1) + length(1) + content(2hex -> 1byte) c.Assert(len(tag), Equals, 4) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, sqlDigest) + c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) - sqlDigest = genRandHex(64) - tag = EncodeResourceGroupTag(sqlDigest) + sqlDigest = parser.NewDigest(genRandHex(64)) + tag = EncodeResourceGroupTag(sqlDigest, nil) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, sqlDigest) + c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) - sqlDigest = genRandHex(510) - tag = EncodeResourceGroupTag(sqlDigest) + sqlDigest = parser.NewDigest(genRandHex(510)) + tag = EncodeResourceGroupTag(sqlDigest, nil) decodedSQLDigest, err = DecodeResourceGroupTag(tag) c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, sqlDigest) - - // The max supported length is 255 bytes (510 hex digits). - sqlDigest = genRandHex(512) - tag = EncodeResourceGroupTag(sqlDigest) - c.Assert(len(tag), Equals, 0) - - // A hex string can't have odd length. - sqlDigest = genRandHex(15) - tag = EncodeResourceGroupTag(sqlDigest) - c.Assert(len(tag), Equals, 0) - - // Non-hexadecimal character is invalid - sqlDigest = "aabbccddgg" - tag = EncodeResourceGroupTag(sqlDigest) - c.Assert(len(tag), Equals, 0) - - // A tag should start with a supported version - tag = []byte("\x00") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) - - // The fields should have format like `[prefix, length, content...]`, otherwise decoding it should returns error. - tag = []byte("\x01\x01") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) - - tag = []byte("\x01\x01\x02") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) - - tag = []byte("\x01\x01\x02AB") - decodedSQLDigest, err = DecodeResourceGroupTag(tag) - c.Assert(err, IsNil) - c.Assert(decodedSQLDigest, Equals, "4142") - - tag = []byte("\x01\x01\x00") - decodedSQLDigest, err = DecodeResourceGroupTag(tag) - c.Assert(err, IsNil) - c.Assert(len(decodedSQLDigest), Equals, 0) - - // Unsupported field - tag = []byte("\x01\x99") - _, err = DecodeResourceGroupTag(tag) - c.Assert(err, NotNil) + c.Assert(decodedSQLDigest, DeepEquals, sqlDigest.Bytes()) } -func genRandHex(length int) string { +func genRandHex(length int) []byte { const chars = "0123456789abcdef" res := make([]byte, length) for i := 0; i < length; i++ { res[i] = chars[rand.Intn(len(chars))] } - return string(res) + return res +} + +func genDigest(str string) []byte { + hasher := sha256.New() + hasher.Write(hack.Slice(str)) + return hasher.Sum(nil) +} + +func (s *testUtilsSuite) TestResourceGroupTagEncodingPB(c *C) { + digest1 := genDigest("abc") + digest2 := genDigest("abcdefg") + // Test for protobuf + resourceTag := &tipb.ResourceGroupTag{ + SqlDigest: digest1, + PlanDigest: digest2, + } + buf, err := resourceTag.Marshal() + c.Assert(err, IsNil) + c.Assert(len(buf), Equals, 68) + tag := &tipb.ResourceGroupTag{} + err = tag.Unmarshal(buf) + c.Assert(err, IsNil) + c.Assert(tag.SqlDigest, DeepEquals, digest1) + c.Assert(tag.PlanDigest, DeepEquals, digest2) + + // Test for protobuf sql_digest only + resourceTag = &tipb.ResourceGroupTag{ + SqlDigest: digest1, + } + buf, err = resourceTag.Marshal() + c.Assert(err, IsNil) + c.Assert(len(buf), Equals, 34) + tag = &tipb.ResourceGroupTag{} + err = tag.Unmarshal(buf) + c.Assert(err, IsNil) + c.Assert(tag.SqlDigest, DeepEquals, digest1) + c.Assert(tag.PlanDigest, IsNil) }