Skip to content

Commit

Permalink
add resource tag for txn commit rpc request
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs <[email protected]>
  • Loading branch information
crazycs520 committed May 24, 2021
1 parent addd0ef commit 3f0c2a3
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 20 deletions.
12 changes: 9 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type Config struct {
DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"`
SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"`
StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"`
TopStmt TopStmt `toml:"top-stmt" json:"top-stmt"`
TopStmt TopSQL `toml:"top-sql" json:"top-sql"`
// RepairMode indicates that the TiDB is in the repair mode for table meta.
RepairMode bool `toml:"repair-mode" json:"repair-mode"`
RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"`
Expand Down Expand Up @@ -528,7 +528,8 @@ type StmtSummary struct {
HistorySize int `toml:"history-size" json:"history-size"`
}

type TopStmt struct {
// TopSQL is the config for top sql.
type TopSQL struct {
// Enable statement summary or not.
Enable bool `toml:"enable" json:"enable"`
// The refresh interval of statement summary.
Expand Down Expand Up @@ -666,7 +667,7 @@ var defaultConf = Config{
RefreshInterval: 1800,
HistorySize: 24,
},
TopStmt: TopStmt{
TopStmt: TopSQL{
Enable: true,
RefreshInterval: 1,
MaxStmtCount: 5000,
Expand Down Expand Up @@ -958,6 +959,11 @@ func TableLockEnabled() bool {
return GetGlobalConfig().EnableTableLock
}

// TopSQLEnabled uses to check whether enabled the top SQL feature.
func TopSQLEnabled() bool {
return GetGlobalConfig().TopStmt.Enable
}

// TableLockDelayClean uses to get the time of delay clean table lock.
var TableLockDelayClean = func() uint64 {
return GetGlobalConfig().DelayCleanTableLock
Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ

// SetResourceGroupTag sets the request resource group tag.
func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder {
if config.GetGlobalConfig().TopStmt.Enable {
if config.TopSQLEnabled() {
builder.Request.ResourceGroupTag = sc.GetResourceGroupTag()
}
return builder
Expand Down
5 changes: 2 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -1326,7 +1325,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, snapshot)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
if err != nil {
Expand All @@ -1347,7 +1346,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, snapshot)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
},
})
}
setResourceGroupTagForSnapshot(stmtCtx, snapshot)
setResourceGroupTagForTxn(stmtCtx, snapshot)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
6 changes: 3 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
var planDigest *parser.Digest
_, sqlDigest := seVars.StmtCtx.SQLDigest()
if config.GetGlobalConfig().TopStmt.Enable {
if config.TopSQLEnabled() {
_, planDigest = seVars.StmtCtx.GetPlanDigest()
}
return &tikvstore.LockCtx{
Expand Down Expand Up @@ -1800,8 +1800,8 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd
return nil
}

func setResourceGroupTagForSnapshot(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot != nil && config.GetGlobalConfig().TopStmt.Enable {
func setResourceGroupTagForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot != nil && config.TopSQLEnabled() {
snapshot.SetOption(kv.ResourceGroupTag, sc.GetResourceGroupTag())
}
}
3 changes: 1 addition & 2 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if err != nil {
return err
}
setResourceGroupTagForSnapshot(sessVars.StmtCtx, txn.GetSnapshot())
setResourceGroupTagForTxn(sessVars.StmtCtx, txn)
txnSize := txn.Size()
sessVars.StmtCtx.AddRecordRows(uint64(len(rows)))
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
Expand Down
3 changes: 1 addition & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -161,7 +160,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
},
})
}
setResourceGroupTagForSnapshot(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, txn)
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
Expand Down
7 changes: 7 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -263,6 +264,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
if config.TopSQLEnabled() {
txn, err := e.ctx.Txn(false)
if err == nil {
txn.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag())
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
Expand Down
2 changes: 1 addition & 1 deletion store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
case kv.MatchStoreLabels:
txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel))
case kv.ResourceGroupTag:
txn.KVTxn.GetSnapshot().SetResourceGroupTag(val.([]byte))
txn.KVTxn.SetResourceGroupTag(val.([]byte))
}
}

Expand Down
3 changes: 3 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type twoPhaseCommitter struct {
doingAmend bool

binlog BinlogExecutor

resourceGroupTag []byte
}

type memBufferMutations struct {
Expand Down Expand Up @@ -428,6 +430,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB()
c.syncLog = txn.syncLog
c.resourceGroupTag = txn.resourceGroupTag
c.setDetail(commitDetail)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batc
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
resp, err := c.store.SendReq(bo, req, batch.region, ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
StartVersion: c.startTS,
Keys: keys,
CommitVersion: c.commitTS,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})

sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
req.TryOnePc = true
}

return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
}

func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type KVTxn struct {
causalConsistency bool
scope string
kvFilter KVFilter
resourceGroupTag []byte
}

// ExtractStartTs use `option` to get the proper startTS for a transaction
Expand Down Expand Up @@ -293,6 +294,12 @@ func (txn *KVTxn) SetPriority(pri Priority) {
txn.GetSnapshot().SetPriority(pri)
}

// SetResourceGroupTag sets the resource tag for both write and read.
func (txn *KVTxn) SetResourceGroupTag(tag []byte) {
txn.resourceGroupTag = tag
txn.GetSnapshot().SetResourceGroupTag(tag)
}

// SetSchemaAmender sets an amender to update mutations after schema change.
func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) {
txn.schemaAmender = sa
Expand Down
5 changes: 4 additions & 1 deletion util/resourcegrouptag/resource_group_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ func EncodeResourceGroupTag(sqlDigest, planDigest *parser.Digest) []byte {
if planDigest != nil {
tag.PlanDigest = planDigest.Bytes()
}
b, _ := tag.Marshal()
b, err := tag.Marshal()
if err != nil {
return nil
}
return b
}

Expand Down

0 comments on commit 3f0c2a3

Please sign in to comment.