From cfa732d1291a93d1573ed03d8469dcc5046bd8b2 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 31 Mar 2022 10:42:24 +0800 Subject: [PATCH 1/7] topsql: add more test for check set resource tag Signed-off-by: crazycs520 --- ddl/ddl.go | 2 +- ddl/delete_range.go | 4 ++ ddl/reorg.go | 12 ++-- ddl/testutil/testutil.go | 71 +++++++++++++++++++++- ddl/util/util.go | 15 +++++ server/tidb_test.go | 120 ++++++++++++++++++++++++++++++++++++++ store/copr/coprocessor.go | 4 ++ 7 files changed, 220 insertions(+), 8 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 273e0007b98bb..7a792a0d8cb7b 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -174,7 +174,7 @@ type DDL interface { // GetID gets the ddl ID. GetID() string // GetTableMaxHandle gets the max row ID of a normal table or a partition. - GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error) + GetTableMaxHandle(ctx *jobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error) // SetBinlogClient sets the binlog client for DDL worker. It's exported for testing. SetBinlogClient(*pumpcli.PumpsClient) // GetHook gets the hook. It's exported for testing. diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 2182c2919ec30..31897de005963 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "go.uber.org/zap" ) @@ -179,6 +180,9 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { finish := true dr.keys = dr.keys[:0] err := kv.RunInNewTxn(context.Background(), dr.store, false, func(ctx context.Context, txn kv.Transaction) error { + if topsqlstate.TopSQLEnabled() { + txn.SetOption(kv.ResourceGroupTagger, util.GetInternalResourceGroupTaggerForTopSQL()) + } iter, err := txn.Iter(oldStartKey, r.EndKey) if err != nil { return errors.Trace(err) diff --git a/ddl/reorg.go b/ddl/reorg.go index 315f5eac93c9f..6c4031194dcb2 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -438,7 +438,7 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType { } // buildDescTableScan builds a desc table scan upon tblInfo. -func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable, +func (dc *ddlCtx) buildDescTableScan(ctx *jobContext, startTS uint64, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { sctx := newContext(dc.store) dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit) @@ -459,6 +459,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta SetKeepOrder(true). SetConcurrency(1).SetDesc(true) + builder.Request.ResourceGroupTagger = ctx.getResourceGroupTaggerForTopSQL() builder.Request.NotFillCache = true builder.Request.Priority = kv.PriorityLow @@ -467,7 +468,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta return nil, errors.Trace(err) } - result, err := distsql.Select(ctx, sctx, kvReq, getColumnsTypes(handleCols), statistics.NewQueryFeedback(0, nil, 0, false)) + result, err := distsql.Select(ctx.ddlJobCtx, sctx, kvReq, getColumnsTypes(handleCols), statistics.NewQueryFeedback(0, nil, 0, false)) if err != nil { return nil, errors.Trace(err) } @@ -475,7 +476,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta } // GetTableMaxHandle gets the max handle of a PhysicalTable. -func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) { +func (dc *ddlCtx) GetTableMaxHandle(ctx *jobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) { var handleCols []*model.ColumnInfo var pkIdx *model.IndexInfo tblInfo := tbl.Meta() @@ -497,7 +498,6 @@ func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (ma handleCols = []*model.ColumnInfo{model.NewExtraHandleColInfo()} } - ctx := context.Background() // build a desc scan of tblInfo, which limit is 1, we can use it to retrieve the last handle of the table. result, err := dc.buildDescTableScan(ctx, startTS, tbl, handleCols, 1) if err != nil { @@ -506,7 +506,7 @@ func (dc *ddlCtx) GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (ma defer terror.Call(result.Close) chk := chunk.New(getColumnsTypes(handleCols), 1, 1) - err = result.Next(ctx, chk) + err = result.Next(ctx.ddlJobCtx, chk) if err != nil { return nil, false, errors.Trace(err) } @@ -552,7 +552,7 @@ func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshot if err != nil { return startHandleKey, endHandleKey, errors.Trace(err) } - maxHandle, isEmptyTable, err := d.GetTableMaxHandle(snapshotVer, tbl) + maxHandle, isEmptyTable, err := d.GetTableMaxHandle(ctx, snapshotVer, tbl) if err != nil { return startHandleKey, nil, errors.Trace(err) } diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index a18d0e045fb3c..b351c27f37908 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -16,7 +16,6 @@ package testutil import ( "context" - "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -25,6 +24,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" + "github.com/tikv/client-go/v2/tikvrpc" ) // SessionExecInGoroutine export for testing. @@ -81,3 +81,72 @@ func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, }) return allHandles, err } + +func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { + var startKey []byte + var ts uint64 + switch req.Type { + case tikvrpc.CmdGet: + request := req.Get() + startKey = request.Key + ts = request.Version + case tikvrpc.CmdScan: + request := req.Scan() + startKey = request.StartKey + ts = request.Version + case tikvrpc.CmdPrewrite: + request := req.Prewrite() + startKey = request.Mutations[0].Key + ts = request.StartVersion + case tikvrpc.CmdCommit: + request := req.Commit() + startKey = request.Keys[0] + ts = request.StartVersion + case tikvrpc.CmdCleanup: + request := req.Cleanup() + startKey = request.Key + ts = request.StartVersion + case tikvrpc.CmdBatchGet: + request := req.BatchGet() + startKey = request.Keys[0] + ts = request.Version + case tikvrpc.CmdBatchRollback: + request := req.BatchRollback() + startKey = request.Keys[0] + ts = request.StartVersion + case tikvrpc.CmdScanLock: + request := req.ScanLock() + startKey = request.StartKey + ts = request.MaxVersion + case tikvrpc.CmdPessimisticLock: + request := req.PessimisticLock() + startKey = request.PrimaryLock + ts = request.StartVersion + case tikvrpc.CmdPessimisticRollback: + request := req.PessimisticRollback() + startKey = request.Keys[0] + ts = request.StartVersion + case tikvrpc.CmdCheckSecondaryLocks: + request := req.CheckSecondaryLocks() + startKey = request.Keys[0] + ts = request.StartVersion + case tikvrpc.CmdCop, tikvrpc.CmdCopStream: + request := req.Cop() + startKey = request.Ranges[0].Start + ts = request.StartTs + case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet, + tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange, + tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver, + tikvrpc.CmdCheckLockObserver, tikvrpc.CmdRemoveLockObserver, tikvrpc.CmdPhysicalScanLock, tikvrpc.CmdStoreSafeTS, + tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion, + tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty: + // Ignore those requests since now, since it is no business with TopSQL. + case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive: + // Ignore mpp requests. + case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus: + // TODO: add resource tag for those request. + default: + return nil, 0, errors.New("unknown request, check the new type RPC request here") + } + return startKey, ts, nil +} diff --git a/ddl/util/util.go b/ddl/util/util.go index d45e198ef8db5..9cd04066e5daf 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -15,6 +15,7 @@ package util import ( + "bytes" "context" "encoding/hex" "strings" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" + "github.com/tikv/client-go/v2/tikvrpc" atomicutil "go.uber.org/atomic" ) @@ -225,3 +227,16 @@ func EmulatorGCDisable() { func IsEmulatorGCEnable() bool { return emulatorGCEnable.Load() == 1 } + +var intervalResourceGroupTag = []byte{0} + +func GetInternalResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { + tagger := func(req *tikvrpc.Request) { + req.ResourceGroupTag = intervalResourceGroupTag + } + return tagger +} + +func IsInternalResourceGroupTaggerForTopSQL(tag []byte) bool { + return bytes.Equal(tag, intervalResourceGroupTag) +} diff --git a/server/tidb_test.go b/server/tidb_test.go index 4c7081478aacb..1fa8fbbd5d722 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -26,6 +26,12 @@ import ( "database/sql" "encoding/pem" "fmt" + "github.com/pingcap/tidb/ddl/testutil" + ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" "math/big" "net/http" "os" @@ -1911,6 +1917,9 @@ func (c *resourceTagChecker) checkExist(t *testing.T, digest stmtstats.BinaryDig } func (c *resourceTagChecker) checkReqExist(t *testing.T, digest stmtstats.BinaryDigest, sqlStr string, reqs ...tikvrpc.CmdType) { + if len(reqs) == 0 { + return + } c.Lock() defer c.Unlock() reqMap, ok := c.sqlDigest2Reqs[digest] @@ -1966,12 +1975,37 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S conf.TopSQL.ReceiverAddress = "mock-agent" }) + startTimestamp, err := ts.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + tagChecker := &resourceTagChecker{ sqlDigest2Reqs: make(map[stmtstats.BinaryDigest]map[tikvrpc.CmdType]struct{}), } unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) { tag := req.GetResourceGroupTag() if len(tag) == 0 { + startKey, txnTs, err := testutil.GetReqStartKeyAndTxnTs(req) + if err != nil { + logutil.BgLogger().Error("FAIL-- get request start key meet error", zap.String("err", err.Error()), zap.Stack("stack")) + } + require.NoError(t, err) + var tid int64 + if tablecodec.IsRecordKey(startKey) { + tid, _, err = tablecodec.DecodeRecordKey(startKey) + } + if tablecodec.IsIndexKey(startKey) { + tid, _, _, err = tablecodec.DecodeIndexKey(startKey) + } + // since the error maybe "invalid record key", should just ignore check resource tag for this request. + if err == nil && tid != 0 && txnTs > startTimestamp { + tbl, ok := ts.domain.InfoSchema().TableByID(tid) + if ok { + logutil.BgLogger().Error("FAIL-- rpc request does not set the resource tag", zap.String("req", req.Type.String()), zap.String("table", tbl.Meta().Name.O), zap.Stack("stack")) + require.Fail(t, "") + } + } + return + } else if ddlutil.IsInternalResourceGroupTaggerForTopSQL(tag) { return } sqlDigest, err := resourcegrouptag.DecodeResourceGroupTag(tag) @@ -2265,6 +2299,92 @@ func TestTopSQLStatementStats4(t *testing.T) { } } +func TestTopSQLResourceTag(t *testing.T) { + ts, _, tagChecker, _, cleanFn := setupForTestTopSQLStatementStats(t) + defer func() { + topsqlstate.DisableTopSQL() + cleanFn() + }() + + // Test case for other statements + cases1 := []struct { + sql string + isQuery bool + reqs []tikvrpc.CmdType + }{ + // Test for curd. + {"insert into t values (1,1), (3,3)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"insert into t values (1,2) on duplicate key update a = 2", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}}, + {"update t set b=b+1 where a=3", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdGet}}, + {"update t set b=b+1 where a>1", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdCop}}, + {"delete from t where a=3", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdGet}}, + {"delete from t where a>1", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdCop}}, + {"insert ignore into t values (2,2), (3,3)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}}, + {"select * from t where a in (1,2,3,4)", true, []tikvrpc.CmdType{tikvrpc.CmdBatchGet}}, + {"select * from t where a = 1", true, []tikvrpc.CmdType{tikvrpc.CmdGet}}, + {"select * from t where b > 0", true, []tikvrpc.CmdType{tikvrpc.CmdCop}}, + {"replace into t values (2,2), (4,4)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}}, + + // Test for DDL + {"create database test_db0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"create table test_db0.test_t0 (a int, b int, index idx(a))", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"create table test_db0.test_t1 (a int, b int, index idx(a))", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"alter table test_db0.test_t0 add column c int", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"drop table test_db0.test_t0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"drop database test_db0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + {"alter table t modify column b double", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan}}, + {"alter table t add index idx2 (b,a)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan}}, + {"alter table t drop index idx2", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + + // Test for transaction + {"begin", false, nil}, + {"insert into t2 values (10,10), (11,11)", false, nil}, + {"insert ignore into t2 values (20,20), (21,21)", false, []tikvrpc.CmdType{tikvrpc.CmdBatchGet}}, + {"commit", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, + + // Test for other statements. + {"set @@global.tidb_enable_1pc = 1", false, nil}, + } + + internalCases := []struct { + sql string + reqs []tikvrpc.CmdType + }{ + {"replace into mysql.global_variables (variable_name,variable_value) values ('tidb_enable_1pc', '1')", []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}}, + } + executeCaseFn := func(execFn func(db *sql.DB)) { + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("use stmtstats;") + require.NoError(t, err) + + execFn(db) + err = db.Close() + require.NoError(t, err) + } + execFn := func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + for _, ca := range cases1 { + if ca.isQuery { + mustQuery(t, dbt, ca.sql) + } else { + dbt.MustExec(ca.sql) + } + } + } + executeCaseFn(execFn) + + for _, ca := range cases1 { + _, digest := parser.NormalizeDigest(ca.sql) + tagChecker.checkReqExist(t, stmtstats.BinaryDigest(digest.Bytes()), ca.sql, ca.reqs...) + } + for _, ca := range internalCases { + _, digest := parser.NormalizeDigest(ca.sql) + tagChecker.checkReqExist(t, stmtstats.BinaryDigest(digest.Bytes()), ca.sql, ca.reqs...) + } +} + func (ts *tidbTestTopSQLSuite) loopExec(ctx context.Context, t *testing.T, fn func(db *sql.DB)) { db, err := sql.Open("mysql", ts.getDSN()) require.NoError(t, err, "Error connecting") diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index dba00aa935c60..e4b44e2f017e9 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -17,6 +17,7 @@ package copr import ( "context" "fmt" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "io" "strconv" "strings" @@ -92,6 +93,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa if err != nil { return copErrorResponse{err} } + if topsqlstate.TopSQLEnabled() && req.ResourceGroupTagger == nil { + logutil.BgLogger().Info("cop request does not set tag", zap.Stack("stack")) + } it := &copIterator{ store: c.store, req: req, From 0da9e7e2172661011b9161adfef4706bb597c2e1 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 31 Mar 2022 11:11:52 +0800 Subject: [PATCH 2/7] tiny refine Signed-off-by: crazycs520 --- ddl/delete_range.go | 1 + ddl/testutil/testutil.go | 46 +++++++++++++++------------------------ ddl/util/util.go | 2 ++ server/tidb_test.go | 30 ++++++++++++++----------- store/copr/coprocessor.go | 4 ---- 5 files changed, 38 insertions(+), 45 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 31897de005963..bacde289d46ed 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -181,6 +181,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { dr.keys = dr.keys[:0] err := kv.RunInNewTxn(context.Background(), dr.store, false, func(ctx context.Context, txn kv.Transaction) error { if topsqlstate.TopSQLEnabled() { + // Only test logic will run into here, so just set a mock internal resource tagger. txn.SetOption(kv.ResourceGroupTagger, util.GetInternalResourceGroupTaggerForTopSQL()) } iter, err := txn.Iter(oldStartKey, r.EndKey) diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index b351c27f37908..d34915bbda464 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -16,6 +16,7 @@ package testutil import ( "context" + "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -82,58 +83,45 @@ func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, return allHandles, err } +// GetReqStartKeyAndTxnTs gets start key and transaction ts of the request. func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { - var startKey []byte - var ts uint64 switch req.Type { case tikvrpc.CmdGet: request := req.Get() - startKey = request.Key - ts = request.Version + return request.Key, request.Version, nil case tikvrpc.CmdScan: request := req.Scan() - startKey = request.StartKey - ts = request.Version + return request.StartKey, request.Version, nil case tikvrpc.CmdPrewrite: request := req.Prewrite() - startKey = request.Mutations[0].Key - ts = request.StartVersion + return request.Mutations[0].Key, request.StartVersion, nil case tikvrpc.CmdCommit: request := req.Commit() - startKey = request.Keys[0] - ts = request.StartVersion + return request.Keys[0], request.StartVersion, nil case tikvrpc.CmdCleanup: request := req.Cleanup() - startKey = request.Key - ts = request.StartVersion + return request.Key, request.StartVersion, nil case tikvrpc.CmdBatchGet: request := req.BatchGet() - startKey = request.Keys[0] - ts = request.Version + return request.Keys[0], request.Version, nil case tikvrpc.CmdBatchRollback: request := req.BatchRollback() - startKey = request.Keys[0] - ts = request.StartVersion + return request.Keys[0], request.StartVersion, nil case tikvrpc.CmdScanLock: request := req.ScanLock() - startKey = request.StartKey - ts = request.MaxVersion + return request.StartKey, request.MaxVersion, nil case tikvrpc.CmdPessimisticLock: request := req.PessimisticLock() - startKey = request.PrimaryLock - ts = request.StartVersion + return request.PrimaryLock, request.StartVersion, nil case tikvrpc.CmdPessimisticRollback: request := req.PessimisticRollback() - startKey = request.Keys[0] - ts = request.StartVersion + return request.Keys[0], request.StartVersion, nil case tikvrpc.CmdCheckSecondaryLocks: request := req.CheckSecondaryLocks() - startKey = request.Keys[0] - ts = request.StartVersion + return request.Keys[0], request.StartVersion, nil case tikvrpc.CmdCop, tikvrpc.CmdCopStream: request := req.Cop() - startKey = request.Ranges[0].Start - ts = request.StartTs + return request.Ranges[0].Start, request.StartTs, nil case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet, tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange, tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver, @@ -141,12 +129,14 @@ func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion, tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty: // Ignore those requests since now, since it is no business with TopSQL. + return nil, 0, nil case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive: // Ignore mpp requests. + return nil, 0, nil case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus: - // TODO: add resource tag for those request. + // TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621 + return nil, 0, nil default: return nil, 0, errors.New("unknown request, check the new type RPC request here") } - return startKey, ts, nil } diff --git a/ddl/util/util.go b/ddl/util/util.go index 9cd04066e5daf..73f62ec9f1a01 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -230,6 +230,7 @@ func IsEmulatorGCEnable() bool { var intervalResourceGroupTag = []byte{0} +// GetInternalResourceGroupTaggerForTopSQL only use for testing. func GetInternalResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { tagger := func(req *tikvrpc.Request) { req.ResourceGroupTag = intervalResourceGroupTag @@ -237,6 +238,7 @@ func GetInternalResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { return tagger } +// IsInternalResourceGroupTaggerForTopSQL use for testing. func IsInternalResourceGroupTaggerForTopSQL(tag []byte) bool { return bytes.Equal(tag, intervalResourceGroupTag) } diff --git a/server/tidb_test.go b/server/tidb_test.go index 1fa8fbbd5d722..891cc0583bf89 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -26,12 +26,6 @@ import ( "database/sql" "encoding/pem" "fmt" - "github.com/pingcap/tidb/ddl/testutil" - ddlutil "github.com/pingcap/tidb/ddl/util" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/util/logutil" - "github.com/tikv/client-go/v2/oracle" - "go.uber.org/zap" "math/big" "net/http" "os" @@ -45,6 +39,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl/testutil" + ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" @@ -52,9 +48,11 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/cpuprofile" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/resourcegrouptag" "github.com/pingcap/tidb/util/topsql" @@ -63,7 +61,9 @@ import ( topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" ) type tidbTestSuite struct { @@ -1996,8 +1996,11 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S if tablecodec.IsIndexKey(startKey) { tid, _, _, err = tablecodec.DecodeIndexKey(startKey) } - // since the error maybe "invalid record key", should just ignore check resource tag for this request. - if err == nil && tid != 0 && txnTs > startTimestamp { + if err != nil { + // since the error maybe "invalid record key", should just ignore check resource tag for this request. + return + } + if tid != 0 && txnTs > startTimestamp { tbl, ok := ts.domain.InfoSchema().TableByID(tid) if ok { logutil.BgLogger().Error("FAIL-- rpc request does not set the resource tag", zap.String("req", req.Type.String()), zap.String("table", tbl.Meta().Name.O), zap.Stack("stack")) @@ -2006,6 +2009,7 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S } return } else if ddlutil.IsInternalResourceGroupTaggerForTopSQL(tag) { + // Ignore for internal background request. return } sqlDigest, err := resourcegrouptag.DecodeResourceGroupTag(tag) @@ -2307,7 +2311,7 @@ func TestTopSQLResourceTag(t *testing.T) { }() // Test case for other statements - cases1 := []struct { + cases := []struct { sql string isQuery bool reqs []tikvrpc.CmdType @@ -2332,8 +2336,8 @@ func TestTopSQLResourceTag(t *testing.T) { {"alter table test_db0.test_t0 add column c int", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, {"drop table test_db0.test_t0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, {"drop database test_db0", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, - {"alter table t modify column b double", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan}}, - {"alter table t add index idx2 (b,a)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan}}, + {"alter table t modify column b double", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan, tikvrpc.CmdCop}}, + {"alter table t add index idx2 (b,a)", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdScan, tikvrpc.CmdCop}}, {"alter table t drop index idx2", false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit}}, // Test for transaction @@ -2365,7 +2369,7 @@ func TestTopSQLResourceTag(t *testing.T) { } execFn := func(db *sql.DB) { dbt := testkit.NewDBTestKit(t, db) - for _, ca := range cases1 { + for _, ca := range cases { if ca.isQuery { mustQuery(t, dbt, ca.sql) } else { @@ -2375,7 +2379,7 @@ func TestTopSQLResourceTag(t *testing.T) { } executeCaseFn(execFn) - for _, ca := range cases1 { + for _, ca := range cases { _, digest := parser.NormalizeDigest(ca.sql) tagChecker.checkReqExist(t, stmtstats.BinaryDigest(digest.Bytes()), ca.sql, ca.reqs...) } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index e4b44e2f017e9..dba00aa935c60 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -17,7 +17,6 @@ package copr import ( "context" "fmt" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "io" "strconv" "strings" @@ -93,9 +92,6 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa if err != nil { return copErrorResponse{err} } - if topsqlstate.TopSQLEnabled() && req.ResourceGroupTagger == nil { - logutil.BgLogger().Info("cop request does not set tag", zap.Stack("stack")) - } it := &copIterator{ store: c.store, req: req, From e44c4def70582c49018061e854070e2b1490988a Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 1 Apr 2022 15:23:09 +0800 Subject: [PATCH 3/7] fix load data doesn't set resource tag Signed-off-by: crazycs520 --- ddl/testutil/testutil.go | 15 ++++++--- executor/insert_common.go | 3 ++ server/main_test.go | 4 +++ server/tidb_test.go | 57 +++++++++++++-------------------- store/mockstore/unistore/rpc.go | 43 +++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 39 deletions(-) diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index d34915bbda464..d808ad49f5006 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -16,6 +16,7 @@ package testutil import ( "context" + "runtime" "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" @@ -113,9 +114,6 @@ func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { case tikvrpc.CmdPessimisticLock: request := req.PessimisticLock() return request.PrimaryLock, request.StartVersion, nil - case tikvrpc.CmdPessimisticRollback: - request := req.PessimisticRollback() - return request.Keys[0], request.StartVersion, nil case tikvrpc.CmdCheckSecondaryLocks: request := req.CheckSecondaryLocks() return request.Keys[0], request.StartVersion, nil @@ -133,10 +131,19 @@ func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive: // Ignore mpp requests. return nil, 0, nil - case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus: + case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback: // TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621 return nil, 0, nil default: return nil, 0, errors.New("unknown request, check the new type RPC request here") } } + +// GetStack gets the stacktrace. +func GetStack() []byte { + const size = 1024 * 64 + buf := make([]byte, size) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + return buf +} diff --git a/executor/insert_common.go b/executor/insert_common.go index 21a35c9cbd0f1..8d053645a4355 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -1064,6 +1064,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D if err != nil { return err } + sessVars := e.ctx.GetSessionVars() + setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn) + setRPCInterceptorOfExecCounterForTxn(sessVars, txn) if e.collectRuntimeStatsEnabled() { if snapshot := txn.GetSnapshot(); snapshot != nil { snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) diff --git a/server/main_test.go b/server/main_test.go index 315ab4b859d21..8baa366fd3a7d 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -16,6 +16,8 @@ package server import ( "fmt" + "github.com/pingcap/tidb/store/mockstore/unistore" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "os" "reflect" "testing" @@ -32,6 +34,8 @@ func TestMain(m *testing.M) { testbridge.SetupForCommonTest() RunInGoTest = true // flag for NewServer to known it is running in test environment + unistore.CheckResourceTagForTopSQLInGoTest = true + topsqlstate.EnableTopSQL() // AsyncCommit will make DDL wait 2.5s before changing to the next state. // Set schema lease to avoid it from making CI slow. diff --git a/server/tidb_test.go b/server/tidb_test.go index 891cc0583bf89..86adf62e18f22 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl/testutil" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -48,11 +47,9 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/unistore" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/cpuprofile" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/resourcegrouptag" "github.com/pingcap/tidb/util/topsql" @@ -61,9 +58,7 @@ import ( topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" - "go.uber.org/zap" ) type tidbTestSuite struct { @@ -1975,40 +1970,12 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S conf.TopSQL.ReceiverAddress = "mock-agent" }) - startTimestamp, err := ts.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) - require.NoError(t, err) - tagChecker := &resourceTagChecker{ sqlDigest2Reqs: make(map[stmtstats.BinaryDigest]map[tikvrpc.CmdType]struct{}), } unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) { tag := req.GetResourceGroupTag() - if len(tag) == 0 { - startKey, txnTs, err := testutil.GetReqStartKeyAndTxnTs(req) - if err != nil { - logutil.BgLogger().Error("FAIL-- get request start key meet error", zap.String("err", err.Error()), zap.Stack("stack")) - } - require.NoError(t, err) - var tid int64 - if tablecodec.IsRecordKey(startKey) { - tid, _, err = tablecodec.DecodeRecordKey(startKey) - } - if tablecodec.IsIndexKey(startKey) { - tid, _, _, err = tablecodec.DecodeIndexKey(startKey) - } - if err != nil { - // since the error maybe "invalid record key", should just ignore check resource tag for this request. - return - } - if tid != 0 && txnTs > startTimestamp { - tbl, ok := ts.domain.InfoSchema().TableByID(tid) - if ok { - logutil.BgLogger().Error("FAIL-- rpc request does not set the resource tag", zap.String("req", req.Type.String()), zap.String("table", tbl.Meta().Name.O), zap.Stack("stack")) - require.Fail(t, "") - } - } - return - } else if ddlutil.IsInternalResourceGroupTaggerForTopSQL(tag) { + if len(tag) == 0 || ddlutil.IsInternalResourceGroupTaggerForTopSQL(tag) { // Ignore for internal background request. return } @@ -2310,6 +2277,21 @@ func TestTopSQLResourceTag(t *testing.T) { cleanFn() }() + loadDataFile, err := os.CreateTemp("", "load_data_test0.csv") + require.NoError(t, err) + defer func() { + path := loadDataFile.Name() + err = loadDataFile.Close() + require.NoError(t, err) + err = os.Remove(path) + require.NoError(t, err) + }() + _, err = loadDataFile.WriteString( + "31 31\n" + + "32 32\n" + + "33 33\n") + require.NoError(t, err) + // Test case for other statements cases := []struct { sql string @@ -2348,6 +2330,7 @@ func TestTopSQLResourceTag(t *testing.T) { // Test for other statements. {"set @@global.tidb_enable_1pc = 1", false, nil}, + {fmt.Sprintf("load data local infile %q into table t2", loadDataFile.Name()), false, []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}}, } internalCases := []struct { @@ -2357,7 +2340,11 @@ func TestTopSQLResourceTag(t *testing.T) { {"replace into mysql.global_variables (variable_name,variable_value) values ('tidb_enable_1pc', '1')", []tikvrpc.CmdType{tikvrpc.CmdPrewrite, tikvrpc.CmdCommit, tikvrpc.CmdBatchGet}}, } executeCaseFn := func(execFn func(db *sql.DB)) { - db, err := sql.Open("mysql", ts.getDSN()) + dsn := ts.getDSN(func(config *mysql.Config) { + config.AllowAllFiles = true + config.Params["sql_mode"] = "''" + }) + db, err := sql.Open("mysql", dsn) require.NoError(t, err) dbt := testkit.NewDBTestKit(t, db) dbt.MustExec("use stmtstats;") diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 87d46206f8283..e736905047f84 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -15,6 +15,7 @@ package unistore import ( + "fmt" "io" "math" "os" @@ -31,9 +32,12 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/parser/terror" us "github.com/pingcap/tidb/store/mockstore/unistore/tikv" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/tikv/client-go/v2/tikvrpc" "golang.org/x/net/context" "google.golang.org/grpc/metadata" @@ -53,6 +57,9 @@ type RPCClient struct { closed int32 } +// CheckResourceTagForTopSQLInGoTest is used to identify whether check resource tag for TopSQL. +var CheckResourceTagForTopSQLInGoTest bool + // UnistoreRPCClientSendHook exports for test. var UnistoreRPCClientSendHook func(*tikvrpc.Request) @@ -96,6 +103,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return nil, err } + if CheckResourceTagForTopSQLInGoTest { + err = checkResourceTagForTopSQL(req) + if err != nil { + return nil, err + } + } + resp := &tikvrpc.Response{} switch req.Type { case tikvrpc.CmdGet: @@ -410,6 +424,35 @@ func (c *RPCClient) CloseAddr(addr string) error { return nil } +func checkResourceTagForTopSQL(req *tikvrpc.Request) error { + if !topsqlstate.TopSQLEnabled() { + return nil + } + tag := req.GetResourceGroupTag() + if len(tag) > 0 { + return nil + } + + startKey, _, err := testutil.GetReqStartKeyAndTxnTs(req) + if err != nil { + return err + } + var tid int64 + if tablecodec.IsRecordKey(startKey) { + tid, _, _ = tablecodec.DecodeRecordKey(startKey) + } + if tablecodec.IsIndexKey(startKey) { + tid, _, _, _ = tablecodec.DecodeIndexKey(startKey) + } + // since the error maybe "invalid record key", should just ignore check resource tag for this request. + if tid > 0 { + stack := testutil.GetStack() + return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v", + req.Type.String(), tid, string(stack)) + } + return nil +} + type mockClientStream struct{} // Header implements grpc.ClientStream interface From e75b8d6399f7db326567c87d3d7d95e8973d2b5c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 1 Apr 2022 15:34:42 +0800 Subject: [PATCH 4/7] refine Signed-off-by: crazycs520 --- ddl/testutil/testutil.go | 34 ++++++++++++++++----------------- server/main_test.go | 8 +++++--- store/mockstore/unistore/rpc.go | 2 +- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index d808ad49f5006..7cc8802a007b7 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -84,42 +84,42 @@ func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, return allHandles, err } -// GetReqStartKeyAndTxnTs gets start key and transaction ts of the request. -func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { +// GetReqStartKey gets start key of the request. +func GetReqStartKey(req *tikvrpc.Request) ([]byte, error) { switch req.Type { case tikvrpc.CmdGet: request := req.Get() - return request.Key, request.Version, nil + return request.Key, nil case tikvrpc.CmdScan: request := req.Scan() - return request.StartKey, request.Version, nil + return request.StartKey, nil case tikvrpc.CmdPrewrite: request := req.Prewrite() - return request.Mutations[0].Key, request.StartVersion, nil + return request.Mutations[0].Key, nil case tikvrpc.CmdCommit: request := req.Commit() - return request.Keys[0], request.StartVersion, nil + return request.Keys[0], nil case tikvrpc.CmdCleanup: request := req.Cleanup() - return request.Key, request.StartVersion, nil + return request.Key, nil case tikvrpc.CmdBatchGet: request := req.BatchGet() - return request.Keys[0], request.Version, nil + return request.Keys[0], nil case tikvrpc.CmdBatchRollback: request := req.BatchRollback() - return request.Keys[0], request.StartVersion, nil + return request.Keys[0], nil case tikvrpc.CmdScanLock: request := req.ScanLock() - return request.StartKey, request.MaxVersion, nil + return request.StartKey, nil case tikvrpc.CmdPessimisticLock: request := req.PessimisticLock() - return request.PrimaryLock, request.StartVersion, nil + return request.PrimaryLock, nil case tikvrpc.CmdCheckSecondaryLocks: request := req.CheckSecondaryLocks() - return request.Keys[0], request.StartVersion, nil + return request.Keys[0], nil case tikvrpc.CmdCop, tikvrpc.CmdCopStream: request := req.Cop() - return request.Ranges[0].Start, request.StartTs, nil + return request.Ranges[0].Start, nil case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet, tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange, tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver, @@ -127,15 +127,15 @@ func GetReqStartKeyAndTxnTs(req *tikvrpc.Request) ([]byte, uint64, error) { tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion, tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty: // Ignore those requests since now, since it is no business with TopSQL. - return nil, 0, nil + return nil, nil case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive: // Ignore mpp requests. - return nil, 0, nil + return nil, nil case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback: // TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621 - return nil, 0, nil + return nil, nil default: - return nil, 0, errors.New("unknown request, check the new type RPC request here") + return nil, errors.New("unknown request, check the new type RPC request here") } } diff --git a/server/main_test.go b/server/main_test.go index 8baa366fd3a7d..da794fb085cb2 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -16,15 +16,15 @@ package server import ( "fmt" - "github.com/pingcap/tidb/store/mockstore/unistore" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "os" "reflect" "testing" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/util/testbridge" "github.com/tikv/client-go/v2/tikv" "go.uber.org/goleak" @@ -34,8 +34,10 @@ func TestMain(m *testing.M) { testbridge.SetupForCommonTest() RunInGoTest = true // flag for NewServer to known it is running in test environment - unistore.CheckResourceTagForTopSQLInGoTest = true + // Enable TopSQL for all test, and check the resource tag for each RPC request. + // This is used to detect which codes are not tracked by TopSQL. topsqlstate.EnableTopSQL() + unistore.CheckResourceTagForTopSQLInGoTest = true // AsyncCommit will make DDL wait 2.5s before changing to the next state. // Set schema lease to avoid it from making CI slow. diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index e736905047f84..3a6a6b87d863d 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -433,7 +433,7 @@ func checkResourceTagForTopSQL(req *tikvrpc.Request) error { return nil } - startKey, _, err := testutil.GetReqStartKeyAndTxnTs(req) + startKey, err := testutil.GetReqStartKey(req) if err != nil { return err } From fd170dae41b80957820ff1dc1f09c39e28744b8b Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 1 Apr 2022 15:58:45 +0800 Subject: [PATCH 5/7] fix test Signed-off-by: crazycs520 --- ddl/backfilling.go | 2 +- ddl/column.go | 6 +- ddl/ddl.go | 2 +- ddl/ddl_worker.go | 16 ++-- ddl/index.go | 8 +- ddl/partition.go | 2 +- ddl/primary_key_handle_test.go | 2 +- ddl/reorg.go | 10 +-- ddl/reorg_test.go | 6 +- ddl/testutil/testutil.go | 66 --------------- server/main_test.go | 2 +- store/mockstore/unistore/rpc.go | 33 -------- store/mockstore/unistore/testutil.go | 116 +++++++++++++++++++++++++++ 13 files changed, 144 insertions(+), 127 deletions(-) create mode 100644 store/mockstore/unistore/testutil.go diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 666aff138b7a5..10e9249ee4cf5 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -709,7 +709,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) -func iterateSnapshotRows(ctx *jobContext, store kv.Storage, priority int, t table.Table, version uint64, +func iterateSnapshotRows(ctx *JobContext, store kv.Storage, priority int, t table.Table, version uint64, startKey kv.Key, endKey kv.Key, fn recordIterFunc) error { var firstKey kv.Key if startKey == nil { diff --git a/ddl/column.go b/ddl/column.go index 462e892cdf731..3458f0d95add0 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1009,7 +1009,7 @@ func (w *worker) doModifyColumnTypeWithData( return ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs)) + reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs)) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1148,7 +1148,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column if err != nil { return errors.Trace(err) } - originalStartHandle, originalEndHandle, err := getTableRange(w.jobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) + originalStartHandle, originalEndHandle, err := getTableRange(w.JobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } @@ -1255,7 +1255,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg taskDone := false var lastAccessedHandle kv.Key oprStartTime := startTime - err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, + err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0) diff --git a/ddl/ddl.go b/ddl/ddl.go index 7a792a0d8cb7b..eaa8210a61dc8 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -174,7 +174,7 @@ type DDL interface { // GetID gets the ddl ID. GetID() string // GetTableMaxHandle gets the max row ID of a normal table or a partition. - GetTableMaxHandle(ctx *jobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error) + GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error) // SetBinlogClient sets the binlog client for DDL worker. It's exported for testing. SetBinlogClient(*pumpcli.PumpsClient) // GetHook gets the hook. It's exported for testing. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index ba9b8cc137492..973806963e67d 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -97,11 +97,11 @@ type worker struct { lockSeqNum bool *ddlCtx - *jobContext + *JobContext } -// jobContext is the ddl job execution context. -type jobContext struct { +// JobContext is the ddl job execution context. +type JobContext struct { // below fields are cache for top sql ddlJobCtx context.Context cacheSQL string @@ -115,7 +115,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan tp: tp, ddlJobCh: make(chan struct{}, 1), ctx: ctx, - jobContext: &jobContext{ + JobContext: &JobContext{ ddlJobCtx: context.Background(), cacheSQL: "", cacheNormalizedSQL: "", @@ -466,7 +466,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { updateRawArgs = false } w.writeDDLSeqNum(job) - w.jobContext.resetWhenJobFinish() + w.JobContext.resetWhenJobFinish() err = t.AddHistoryDDLJob(job, updateRawArgs) return errors.Trace(err) } @@ -519,7 +519,7 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta { return meta.NewMeta(txn) } -func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) { +func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) { if !topsqlstate.TopSQLEnabled() || job == nil { return } @@ -533,7 +533,7 @@ func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) { } } -func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { +func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil { return nil } @@ -546,7 +546,7 @@ func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagg return tagger } -func (w *jobContext) resetWhenJobFinish() { +func (w *JobContext) resetWhenJobFinish() { w.ddlJobCtx = context.Background() w.cacheSQL = "" w.cacheDigest = nil diff --git a/ddl/index.go b/ddl/index.go index 109418cb17926..8de8e27351316 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -558,7 +558,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} - reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements) + reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1141,7 +1141,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac // taskDone means that the reorged handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, + err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0) @@ -1410,7 +1410,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } @@ -1594,7 +1594,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } diff --git a/ddl/partition.go b/ddl/partition.go index b222fab807f78..3a24fa414cbab 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1124,7 +1124,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) } } - reorgInfo, err := getReorgInfoFromPartitions(w.jobContext, d, t, job, tbl, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(w.JobContext, d, t, job, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/primary_key_handle_test.go b/ddl/primary_key_handle_test.go index f1f24be108974..906f79380cf38 100644 --- a/ddl/primary_key_handle_test.go +++ b/ddl/primary_key_handle_test.go @@ -40,7 +40,7 @@ import ( func getTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage) (kv.Handle, bool) { ver, err := store.CurrentVersion(kv.GlobalTxnScope) require.NoError(t, err) - maxHandle, emptyTable, err := d.GetTableMaxHandle(ver.Ver, tbl.(table.PhysicalTable)) + maxHandle, emptyTable, err := d.GetTableMaxHandle(&ddl.JobContext{}, ver.Ver, tbl.(table.PhysicalTable)) require.NoError(t, err) return maxHandle, emptyTable } diff --git a/ddl/reorg.go b/ddl/reorg.go index 6c4031194dcb2..7c3445f375d37 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -438,7 +438,7 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType { } // buildDescTableScan builds a desc table scan upon tblInfo. -func (dc *ddlCtx) buildDescTableScan(ctx *jobContext, startTS uint64, tbl table.PhysicalTable, +func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { sctx := newContext(dc.store) dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit) @@ -476,7 +476,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *jobContext, startTS uint64, tbl table. } // GetTableMaxHandle gets the max handle of a PhysicalTable. -func (dc *ddlCtx) GetTableMaxHandle(ctx *jobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) { +func (dc *ddlCtx) GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) { var handleCols []*model.ColumnInfo var pkIdx *model.IndexInfo tblInfo := tbl.Meta() @@ -542,7 +542,7 @@ func buildCommonHandleFromChunkRow(sctx *stmtctx.StatementContext, tblInfo *mode } // getTableRange gets the start and end handle of a table (or partition). -func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) { +func getTableRange(ctx *JobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) { // Get the start handle of this partition. err = iterateSnapshotRows(ctx, d.store, priority, tbl, snapshotVer, nil, nil, func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (bool, error) { @@ -579,7 +579,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) { return ver, nil } -func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) { +func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) { var ( element *meta.Element start kv.Key @@ -671,7 +671,7 @@ func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl return &info, nil } -func getReorgInfoFromPartitions(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { +func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { var ( element *meta.Element start kv.Key diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 41f2d2772b5a3..64c256b98e125 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -143,7 +143,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) m = meta.NewMeta(txn) - info, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, nil) + info, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, nil) require.NoError(t, err1) require.Equal(t, info.StartKey, kv.Key(handle.Encoded())) require.Equal(t, info.currElement, e) @@ -174,7 +174,7 @@ func TestReorg(t *testing.T) { err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) var err1 error - _, err1 = getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) + _, err1 = getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1)) require.Equal(t, job.SnapshotVer, uint64(0)) return nil @@ -185,7 +185,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) - info1, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) + info1, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) require.NoError(t, err1) require.Equal(t, info1.currElement, info.currElement) require.Equal(t, info1.StartKey, info.StartKey) diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index 7cc8802a007b7..a18d0e045fb3c 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -16,7 +16,6 @@ package testutil import ( "context" - "runtime" "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" @@ -26,7 +25,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" - "github.com/tikv/client-go/v2/tikvrpc" ) // SessionExecInGoroutine export for testing. @@ -83,67 +81,3 @@ func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, }) return allHandles, err } - -// GetReqStartKey gets start key of the request. -func GetReqStartKey(req *tikvrpc.Request) ([]byte, error) { - switch req.Type { - case tikvrpc.CmdGet: - request := req.Get() - return request.Key, nil - case tikvrpc.CmdScan: - request := req.Scan() - return request.StartKey, nil - case tikvrpc.CmdPrewrite: - request := req.Prewrite() - return request.Mutations[0].Key, nil - case tikvrpc.CmdCommit: - request := req.Commit() - return request.Keys[0], nil - case tikvrpc.CmdCleanup: - request := req.Cleanup() - return request.Key, nil - case tikvrpc.CmdBatchGet: - request := req.BatchGet() - return request.Keys[0], nil - case tikvrpc.CmdBatchRollback: - request := req.BatchRollback() - return request.Keys[0], nil - case tikvrpc.CmdScanLock: - request := req.ScanLock() - return request.StartKey, nil - case tikvrpc.CmdPessimisticLock: - request := req.PessimisticLock() - return request.PrimaryLock, nil - case tikvrpc.CmdCheckSecondaryLocks: - request := req.CheckSecondaryLocks() - return request.Keys[0], nil - case tikvrpc.CmdCop, tikvrpc.CmdCopStream: - request := req.Cop() - return request.Ranges[0].Start, nil - case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet, - tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange, - tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver, - tikvrpc.CmdCheckLockObserver, tikvrpc.CmdRemoveLockObserver, tikvrpc.CmdPhysicalScanLock, tikvrpc.CmdStoreSafeTS, - tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion, - tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty: - // Ignore those requests since now, since it is no business with TopSQL. - return nil, nil - case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive: - // Ignore mpp requests. - return nil, nil - case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback: - // TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621 - return nil, nil - default: - return nil, errors.New("unknown request, check the new type RPC request here") - } -} - -// GetStack gets the stacktrace. -func GetStack() []byte { - const size = 1024 * 64 - buf := make([]byte, size) - stackSize := runtime.Stack(buf, false) - buf = buf[:stackSize] - return buf -} diff --git a/server/main_test.go b/server/main_test.go index da794fb085cb2..1d79b3caf2070 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -22,10 +22,10 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/util/testbridge" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/tikv/client-go/v2/tikv" "go.uber.org/goleak" ) diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 3a6a6b87d863d..3da40691310a5 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -15,7 +15,6 @@ package unistore import ( - "fmt" "io" "math" "os" @@ -32,12 +31,9 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" - "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/parser/terror" us "github.com/pingcap/tidb/store/mockstore/unistore/tikv" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" - topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/tikv/client-go/v2/tikvrpc" "golang.org/x/net/context" "google.golang.org/grpc/metadata" @@ -424,35 +420,6 @@ func (c *RPCClient) CloseAddr(addr string) error { return nil } -func checkResourceTagForTopSQL(req *tikvrpc.Request) error { - if !topsqlstate.TopSQLEnabled() { - return nil - } - tag := req.GetResourceGroupTag() - if len(tag) > 0 { - return nil - } - - startKey, err := testutil.GetReqStartKey(req) - if err != nil { - return err - } - var tid int64 - if tablecodec.IsRecordKey(startKey) { - tid, _, _ = tablecodec.DecodeRecordKey(startKey) - } - if tablecodec.IsIndexKey(startKey) { - tid, _, _, _ = tablecodec.DecodeIndexKey(startKey) - } - // since the error maybe "invalid record key", should just ignore check resource tag for this request. - if tid > 0 { - stack := testutil.GetStack() - return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v", - req.Type.String(), tid, string(stack)) - } - return nil -} - type mockClientStream struct{} // Header implements grpc.ClientStream interface diff --git a/store/mockstore/unistore/testutil.go b/store/mockstore/unistore/testutil.go new file mode 100644 index 0000000000000..d085a24ba5788 --- /dev/null +++ b/store/mockstore/unistore/testutil.go @@ -0,0 +1,116 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package unistore + +import ( + "errors" + "fmt" + "runtime" + + "github.com/pingcap/tidb/tablecodec" + topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func checkResourceTagForTopSQL(req *tikvrpc.Request) error { + if !topsqlstate.TopSQLEnabled() { + return nil + } + tag := req.GetResourceGroupTag() + if len(tag) > 0 { + return nil + } + + startKey, err := getReqStartKey(req) + if err != nil { + return err + } + var tid int64 + if tablecodec.IsRecordKey(startKey) { + tid, _, _ = tablecodec.DecodeRecordKey(startKey) + } + if tablecodec.IsIndexKey(startKey) { + tid, _, _, _ = tablecodec.DecodeIndexKey(startKey) + } + // since the error maybe "invalid record key", should just ignore check resource tag for this request. + if tid > 0 { + stack := getStack() + return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v", + req.Type.String(), tid, string(stack)) + } + return nil +} + +func getReqStartKey(req *tikvrpc.Request) ([]byte, error) { + switch req.Type { + case tikvrpc.CmdGet: + request := req.Get() + return request.Key, nil + case tikvrpc.CmdScan: + request := req.Scan() + return request.StartKey, nil + case tikvrpc.CmdPrewrite: + request := req.Prewrite() + return request.Mutations[0].Key, nil + case tikvrpc.CmdCommit: + request := req.Commit() + return request.Keys[0], nil + case tikvrpc.CmdCleanup: + request := req.Cleanup() + return request.Key, nil + case tikvrpc.CmdBatchGet: + request := req.BatchGet() + return request.Keys[0], nil + case tikvrpc.CmdBatchRollback: + request := req.BatchRollback() + return request.Keys[0], nil + case tikvrpc.CmdScanLock: + request := req.ScanLock() + return request.StartKey, nil + case tikvrpc.CmdPessimisticLock: + request := req.PessimisticLock() + return request.PrimaryLock, nil + case tikvrpc.CmdCheckSecondaryLocks: + request := req.CheckSecondaryLocks() + return request.Keys[0], nil + case tikvrpc.CmdCop, tikvrpc.CmdCopStream: + request := req.Cop() + return request.Ranges[0].Start, nil + case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet, + tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange, + tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver, + tikvrpc.CmdCheckLockObserver, tikvrpc.CmdRemoveLockObserver, tikvrpc.CmdPhysicalScanLock, tikvrpc.CmdStoreSafeTS, + tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion, + tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty: + // Ignore those requests since now, since it is no business with TopSQL. + return nil, nil + case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive: + // Ignore mpp requests. + return nil, nil + case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback: + // TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621 + return nil, nil + default: + return nil, errors.New("unknown request, check the new type RPC request here") + } +} + +func getStack() []byte { + const size = 1024 * 64 + buf := make([]byte, size) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + return buf +} From 5b08a3c72edfe9f1affef6e8c03fa457373bc66d Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Sat, 2 Apr 2022 14:42:35 +0800 Subject: [PATCH 6/7] fix test Signed-off-by: crazycs520 --- ddl/ddl_worker.go | 16 ++++++++++------ ddl/primary_key_handle_test.go | 2 +- ddl/reorg_test.go | 6 +++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 42f2680d2651d..7c1e21d03d3fd 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -109,18 +109,22 @@ type JobContext struct { cacheDigest *parser.Digest } +func NewJobContext() *JobContext{ + return &JobContext{ + ddlJobCtx: context.Background(), + cacheSQL: "", + cacheNormalizedSQL: "", + cacheDigest: nil, + } +} + func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx) *worker { worker := &worker{ id: atomic.AddInt32(&ddlWorkerID, 1), tp: tp, ddlJobCh: make(chan struct{}, 1), ctx: ctx, - JobContext: &JobContext{ - ddlJobCtx: context.Background(), - cacheSQL: "", - cacheNormalizedSQL: "", - cacheDigest: nil, - }, + JobContext: NewJobContext(), ddlCtx: dCtx, reorgCtx: &reorgCtx{notifyCancelReorgJob: 0}, sessPool: sessPool, diff --git a/ddl/primary_key_handle_test.go b/ddl/primary_key_handle_test.go index 906f79380cf38..ee9c318d6a2a2 100644 --- a/ddl/primary_key_handle_test.go +++ b/ddl/primary_key_handle_test.go @@ -40,7 +40,7 @@ import ( func getTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage) (kv.Handle, bool) { ver, err := store.CurrentVersion(kv.GlobalTxnScope) require.NoError(t, err) - maxHandle, emptyTable, err := d.GetTableMaxHandle(&ddl.JobContext{}, ver.Ver, tbl.(table.PhysicalTable)) + maxHandle, emptyTable, err := d.GetTableMaxHandle(ddl.NewJobContext(), ver.Ver, tbl.(table.PhysicalTable)) require.NoError(t, err) return maxHandle, emptyTable } diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 64c256b98e125..192d5de89d8c2 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -143,7 +143,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) m = meta.NewMeta(txn) - info, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, nil) + info, err1 := getReorgInfo(NewJobContext(), d.ddlCtx, m, job, mockTbl, nil) require.NoError(t, err1) require.Equal(t, info.StartKey, kv.Key(handle.Encoded())) require.Equal(t, info.currElement, e) @@ -174,7 +174,7 @@ func TestReorg(t *testing.T) { err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) var err1 error - _, err1 = getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) + _, err1 = getReorgInfo(NewJobContext(), d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1)) require.Equal(t, job.SnapshotVer, uint64(0)) return nil @@ -185,7 +185,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) - info1, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) + info1, err1 := getReorgInfo(NewJobContext(), d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) require.NoError(t, err1) require.Equal(t, info1.currElement, info.currElement) require.Equal(t, info1.StartKey, info.StartKey) From fcc4a797ca81dccba0f792c2a5368a7c60511d8c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 6 Apr 2022 10:38:28 +0800 Subject: [PATCH 7/7] address comment Signed-off-by: crazycs520 --- ddl/ddl_worker.go | 1 + ddl/delete_range.go | 2 +- ddl/util/util.go | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 7c1e21d03d3fd..87e1067bc25e4 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -109,6 +109,7 @@ type JobContext struct { cacheDigest *parser.Digest } +// NewJobContext returns a new ddl job context. func NewJobContext() *JobContext{ return &JobContext{ ddlJobCtx: context.Background(), diff --git a/ddl/delete_range.go b/ddl/delete_range.go index bacde289d46ed..3d7d5895bdb79 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -181,7 +181,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { dr.keys = dr.keys[:0] err := kv.RunInNewTxn(context.Background(), dr.store, false, func(ctx context.Context, txn kv.Transaction) error { if topsqlstate.TopSQLEnabled() { - // Only test logic will run into here, so just set a mock internal resource tagger. + // Only when TiDB run without PD(use unistore as storage for test) will run into here, so just set a mock internal resource tagger. txn.SetOption(kv.ResourceGroupTagger, util.GetInternalResourceGroupTaggerForTopSQL()) } iter, err := txn.Iter(oldStartKey, r.EndKey) diff --git a/ddl/util/util.go b/ddl/util/util.go index 73f62ec9f1a01..5caae9ea3ac8e 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -228,17 +228,17 @@ func IsEmulatorGCEnable() bool { return emulatorGCEnable.Load() == 1 } -var intervalResourceGroupTag = []byte{0} +var internalResourceGroupTag = []byte{0} // GetInternalResourceGroupTaggerForTopSQL only use for testing. func GetInternalResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { tagger := func(req *tikvrpc.Request) { - req.ResourceGroupTag = intervalResourceGroupTag + req.ResourceGroupTag = internalResourceGroupTag } return tagger } // IsInternalResourceGroupTaggerForTopSQL use for testing. func IsInternalResourceGroupTaggerForTopSQL(tag []byte) bool { - return bytes.Equal(tag, intervalResourceGroupTag) + return bytes.Equal(tag, internalResourceGroupTag) }