Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: add more test to check resource tag for each RPC request #33623

Merged
merged 14 commits into from
Apr 7, 2022
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(ctx *jobContext, store kv.Storage, priority int, t table.Table, version uint64,
func iterateSnapshotRows(ctx *JobContext, store kv.Storage, priority int, t table.Table, version uint64,
startKey kv.Key, endKey kv.Key, fn recordIterFunc) error {
var firstKey kv.Key
if startKey == nil {
Expand Down
6 changes: 3 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1148,7 +1148,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
originalStartHandle, originalEndHandle, err := getTableRange(w.jobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(w.JobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type DDL interface {
// GetID gets the ddl ID.
GetID() string
// GetTableMaxHandle gets the max row ID of a normal table or a partition.
GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(*pumpcli.PumpsClient)
// GetHook gets the hook. It's exported for testing.
Expand Down
16 changes: 8 additions & 8 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ type worker struct {
lockSeqNum bool

*ddlCtx
*jobContext
*JobContext
}

// jobContext is the ddl job execution context.
type jobContext struct {
// JobContext is the ddl job execution context.
type JobContext struct {
// below fields are cache for top sql
ddlJobCtx context.Context
cacheSQL string
Expand All @@ -115,7 +115,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan
tp: tp,
ddlJobCh: make(chan struct{}, 1),
ctx: ctx,
jobContext: &jobContext{
JobContext: &JobContext{
ddlJobCtx: context.Background(),
cacheSQL: "",
cacheNormalizedSQL: "",
Expand Down Expand Up @@ -466,7 +466,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
updateRawArgs = false
}
w.writeDDLSeqNum(job)
w.jobContext.resetWhenJobFinish()
w.JobContext.resetWhenJobFinish()
err = t.AddHistoryDDLJob(job, updateRawArgs)
return errors.Trace(err)
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {
return meta.NewMeta(txn)
}

func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
return
}
Expand All @@ -533,7 +533,7 @@ func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
}
}

func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return nil
}
Expand All @@ -546,7 +546,7 @@ func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagg
return tagger
}

func (w *jobContext) resetWhenJobFinish() {
func (w *JobContext) resetWhenJobFinish() {
w.ddlJobCtx = context.Background()
w.cacheSQL = ""
w.cacheDigest = nil
Expand Down
5 changes: 5 additions & 0 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -179,6 +180,10 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(context.Background(), dr.store, false, func(ctx context.Context, txn kv.Transaction) error {
if topsqlstate.TopSQLEnabled() {
// Only test logic will run into here, so just set a mock internal resource tagger.
txn.SetOption(kv.ResourceGroupTagger, util.GetInternalResourceGroupTaggerForTopSQL())
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
// taskDone means that the reorged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -1594,7 +1594,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
reorgInfo, err := getReorgInfoFromPartitions(w.jobContext, d, t, job, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(w.JobContext, d, t, job, tbl, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
2 changes: 1 addition & 1 deletion ddl/primary_key_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
func getTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage) (kv.Handle, bool) {
ver, err := store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
maxHandle, emptyTable, err := d.GetTableMaxHandle(ver.Ver, tbl.(table.PhysicalTable))
maxHandle, emptyTable, err := d.GetTableMaxHandle(&ddl.JobContext{}, ver.Ver, tbl.(table.PhysicalTable))
require.NoError(t, err)
return maxHandle, emptyTable
}
Expand Down
18 changes: 9 additions & 9 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
}

// buildDescTableScan builds a desc table scan upon tblInfo.
func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable,
func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable,
handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
sctx := newContext(dc.store)
dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit)
Expand All @@ -459,6 +459,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
SetKeepOrder(true).
SetConcurrency(1).SetDesc(true)

builder.Request.ResourceGroupTagger = ctx.getResourceGroupTaggerForTopSQL()
builder.Request.NotFillCache = true
builder.Request.Priority = kv.PriorityLow

Expand All @@ -467,15 +468,15 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
return nil, errors.Trace(err)
}

result, err := distsql.Select(ctx, sctx, kvReq, getColumnsTypes(handleCols), statistics.NewQueryFeedback(0, nil, 0, false))
result, err := distsql.Select(ctx.ddlJobCtx, sctx, kvReq, getColumnsTypes(handleCols), statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return nil, errors.Trace(err)
}
return result, nil
}

// GetTableMaxHandle gets the max handle of a PhysicalTable.
func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) {
func (dc *ddlCtx) GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) {
var handleCols []*model.ColumnInfo
var pkIdx *model.IndexInfo
tblInfo := tbl.Meta()
Expand All @@ -497,7 +498,6 @@ func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (ma
handleCols = []*model.ColumnInfo{model.NewExtraHandleColInfo()}
}

ctx := context.Background()
// build a desc scan of tblInfo, which limit is 1, we can use it to retrieve the last handle of the table.
result, err := dc.buildDescTableScan(ctx, startTS, tbl, handleCols, 1)
if err != nil {
Expand All @@ -506,7 +506,7 @@ func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (ma
defer terror.Call(result.Close)

chk := chunk.New(getColumnsTypes(handleCols), 1, 1)
err = result.Next(ctx, chk)
err = result.Next(ctx.ddlJobCtx, chk)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand Down Expand Up @@ -542,7 +542,7 @@ func buildCommonHandleFromChunkRow(sctx *stmtctx.StatementContext, tblInfo *mode
}

// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
func getTableRange(ctx *JobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
// Get the start handle of this partition.
err = iterateSnapshotRows(ctx, d.store, priority, tbl, snapshotVer, nil, nil,
func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (bool, error) {
Expand All @@ -552,7 +552,7 @@ func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshot
if err != nil {
return startHandleKey, endHandleKey, errors.Trace(err)
}
maxHandle, isEmptyTable, err := d.GetTableMaxHandle(snapshotVer, tbl)
maxHandle, isEmptyTable, err := d.GetTableMaxHandle(ctx, snapshotVer, tbl)
if err != nil {
return startHandleKey, nil, errors.Trace(err)
}
Expand All @@ -579,7 +579,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}

func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -671,7 +671,7 @@ func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
return &info, nil
}

func getReorgInfoFromPartitions(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down
6 changes: 3 additions & 3 deletions ddl/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)

m = meta.NewMeta(txn)
info, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, nil)
info, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, nil)
require.NoError(t, err1)
require.Equal(t, info.StartKey, kv.Key(handle.Encoded()))
require.Equal(t, info.currElement, e)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestReorg(t *testing.T) {
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
_, err1 = getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
_, err1 = getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1))
require.Equal(t, job.SnapshotVer, uint64(0))
return nil
Expand All @@ -185,7 +185,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
info1, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
info1, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.NoError(t, err1)
require.Equal(t, info1.currElement, info.currElement)
require.Equal(t, info1.StartKey, info.StartKey)
Expand Down
17 changes: 17 additions & 0 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package util

import (
"bytes"
"context"
"encoding/hex"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikvrpc"
atomicutil "go.uber.org/atomic"
)

Expand Down Expand Up @@ -225,3 +227,18 @@ func EmulatorGCDisable() {
func IsEmulatorGCEnable() bool {
return emulatorGCEnable.Load() == 1
}

var intervalResourceGroupTag = []byte{0}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

// GetInternalResourceGroupTaggerForTopSQL only use for testing.
func GetInternalResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
tagger := func(req *tikvrpc.Request) {
req.ResourceGroupTag = intervalResourceGroupTag
}
return tagger
}

// IsInternalResourceGroupTaggerForTopSQL use for testing.
func IsInternalResourceGroupTaggerForTopSQL(tag []byte) bool {
return bytes.Equal(tag, intervalResourceGroupTag)
}
3 changes: 3 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err != nil {
return err
}
sessVars := e.ctx.GetSessionVars()
setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn)
setRPCInterceptorOfExecCounterForTxn(sessVars, txn)
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
Expand Down
6 changes: 6 additions & 0 deletions server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/util/testbridge"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/goleak"
)
Expand All @@ -32,6 +34,10 @@ func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()

RunInGoTest = true // flag for NewServer to known it is running in test environment
// Enable TopSQL for all test, and check the resource tag for each RPC request.
// This is used to detect which codes are not tracked by TopSQL.
topsqlstate.EnableTopSQL()
unistore.CheckResourceTagForTopSQLInGoTest = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@breeswish Now, all test case in server pkg will check the resource tag now.

I try to enable this test check in other pkg, but will meet many error, so currently I only enable this test in server pkg.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but will meet many error

Does these error comes from we are not tagging the request correctly? If so, maybe we need to add tracking issues to track them and fix them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will create issue to track this. and some error are cause by test itself.


// AsyncCommit will make DDL wait 2.5s before changing to the next state.
// Set schema lease to avoid it from making CI slow.
Expand Down
Loading