diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 3384da9ecf046..3b48ab30f4e5f 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tidb/util/topsql" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -312,6 +313,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { } }) + failpoint.Inject("mockHighLoadForAddIndex", func() { + sqlPrefixes := []string{"alter"} + topsql.MockHighCPULoad(job.Query, sqlPrefixes, 5) + }) + // Dynamic change batch size. w.batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task, bf) diff --git a/executor/table_reader.go b/executor/table_reader.go index 018271e84a313..51199c5f0c648 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -17,8 +17,10 @@ package executor import ( "context" "sort" + "time" "github.com/opentracing/opentracing-go" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -129,6 +131,10 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } + failpoint.Inject("mockSleepInTableReaderNext", func(v failpoint.Value) { + ms := v.(int) + time.Sleep(time.Millisecond * time.Duration(ms)) + }) e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) diff --git a/planner/optimize.go b/planner/optimize.go index ff702c3d8458b..0941e89545fb3 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/util/hint" "github.com/pingcap/tidb/util/logutil" utilparser "github.com/pingcap/tidb/util/parser" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" ) @@ -321,6 +322,11 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in failpoint.Return(nil, nil, 0, errors.New("gofail wrong optimizerCnt error")) } }) + failpoint.Inject("mockHighLoadForOptimize", func() { + sqlPrefixes := []string{"select"} + topsql.MockHighCPULoad(sctx.GetSessionVars().StmtCtx.OriginalSQL, sqlPrefixes, 10) + }) + // build logical plan sctx.GetSessionVars().PlanID = 0 sctx.GetSessionVars().PlanColumnID = 0 diff --git a/server/tidb_test.go b/server/tidb_test.go index 77f7f8317afc5..1f25a8fe5dbad 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -17,7 +17,6 @@ package server import ( - "bytes" "context" "crypto/rand" "crypto/rsa" @@ -131,8 +130,7 @@ func createTidbTestTopSQLSuite(t *testing.T) (*tidbTestTopSQLSuite, func()) { topsqlstate.GlobalState.ReportIntervalSeconds.Store(2) dbt.MustExec("set @@global.tidb_top_sql_max_time_series_count=5;") - err = cpuprofile.StartCPUProfiler() - require.NoError(t, err) + require.NoError(t, cpuprofile.StartCPUProfiler()) cleanFn := func() { cleanup() cpuprofile.StopCPUProfiler() @@ -1260,20 +1258,19 @@ func TestTopSQLCPUProfile(t *testing.T) { db, err := sql.Open("mysql", ts.getDSN()) require.NoError(t, err) defer func() { - err := db.Close() - require.NoError(t, err) + require.NoError(t, db.Close()) }() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`)) defer func() { - err = failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop") - require.NoError(t, err) - err = failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL") - require.NoError(t, err) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")) }() topsqlstate.EnableTopSQL() + defer topsqlstate.DisableTopSQL() + mc := mockTopSQLTraceCPU.NewTopSQLCollector() topsql.SetupTopSQLForTest(mc) sqlCPUCollector := collector.NewSQLCPUCollector(mc) @@ -1287,53 +1284,15 @@ func TestTopSQLCPUProfile(t *testing.T) { dbt.MustExec("create table t (a int auto_increment, b int, unique index idx(a));") dbt.MustExec("create table t1 (a int auto_increment, b int, unique index idx(a));") dbt.MustExec("create table t2 (a int auto_increment, b int, unique index idx(a));") - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "127.0.0.1:4001" - }) - topsqlstate.GlobalState.PrecisionSeconds.Store(1) dbt.MustExec("set @@global.tidb_txn_mode = 'pessimistic'") - // Test case 1: DML query: insert/update/replace/delete/select - cases1 := []struct { - sql string - planRegexp string - cancel func() - }{ - {sql: "insert into t () values (),(),(),(),(),(),();", planRegexp: ""}, - {sql: "insert into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""}, - {sql: "update t set b=a where b is null limit 1;", planRegexp: ".*Limit.*TableReader.*"}, - {sql: "delete from t where b = a limit 2;", planRegexp: ".*Limit.*TableReader.*"}, - {sql: "replace into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""}, - {sql: "select * from t use index(idx) where a<10;", planRegexp: ".*IndexLookUp.*"}, - {sql: "select * from t ignore index(idx) where a>1000000000;", planRegexp: ".*TableReader.*"}, - {sql: "select /*+ HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t1.a=t2.a where t1.b is not null;", planRegexp: ".*HashJoin.*"}, - {sql: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t2.a=t1.a where t1.b is not null;", planRegexp: ".*IndexHashJoin.*"}, - {sql: "select * from t where a=1;", planRegexp: ".*Point_Get.*"}, - {sql: "select * from t where a in (1,2,3,4)", planRegexp: ".*Batch_Point_Get.*"}, - } - for i, ca := range cases1 { - ctx, cancel := context.WithCancel(context.Background()) - cases1[i].cancel = cancel - sqlStr := ca.sql - go ts.loopExec(ctx, t, func(db *sql.DB) { - dbt := testkit.NewDBTestKit(t, db) - if strings.HasPrefix(sqlStr, "select") { - rows := dbt.MustQuery(sqlStr) - require.NoError(t, rows.Close()) - } else { - // Ignore error here since the error may be write conflict. - db.Exec(sqlStr) - } - }) - } - timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() checkFn := func(sql, planRegexp string) { require.NoError(t, timeoutCtx.Err()) stats := mc.GetSQLStatsBySQLWithRetry(sql, len(planRegexp) > 0) // since 1 sql may has many plan, check `len(stats) > 0` instead of `len(stats) == 1`. - require.Greaterf(t, len(stats), 0, "sql: %v", sql) + require.Greaterf(t, len(stats), 0, "sql: "+sql) for _, s := range stats { sqlStr := mc.GetSQL(s.SQLDigest) @@ -1350,20 +1309,47 @@ func TestTopSQLCPUProfile(t *testing.T) { require.Regexpf(t, planRegexp, normalizedPlan, "sql: %v", sql) } } - // Wait the top sql collector to collect profile data. - mc.WaitCollectCnt(1) - // Check result of test case 1. - for _, ca := range cases1 { - checkFn(ca.sql, ca.planRegexp) - ca.cancel() + + // Test case 1: DML query: insert/update/replace/delete/select + cases1 := []struct { + sql string + planRegexp string + }{ + {sql: "insert into t () values (),(),(),(),(),(),();", planRegexp: ""}, + {sql: "insert into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""}, + {sql: "update t set b=a where b is null limit 1;", planRegexp: ".*Limit.*TableReader.*"}, + {sql: "delete from t where b = a limit 2;", planRegexp: ".*Limit.*TableReader.*"}, + {sql: "replace into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""}, + {sql: "select * from t use index(idx) where a<10;", planRegexp: ".*IndexLookUp.*"}, + {sql: "select * from t ignore index(idx) where a>1000000000;", planRegexp: ".*TableReader.*"}, + {sql: "select /*+ HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t1.a=t2.a where t1.b is not null;", planRegexp: ".*HashJoin.*"}, + {sql: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t2.a=t1.a where t1.b is not null;", planRegexp: ".*IndexHashJoin.*"}, + {sql: "select * from t where a=1;", planRegexp: ".*Point_Get.*"}, + {sql: "select * from t where a in (1,2,3,4)", planRegexp: ".*Batch_Point_Get.*"}, + } + execFn := func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + for _, ca := range cases1 { + sqlStr := ca.sql + if strings.HasPrefix(sqlStr, "select") { + mustQuery(t, dbt, sqlStr) + } else { + dbt.MustExec(sqlStr) + } + } + } + check := func() { + for _, ca := range cases1 { + checkFn(ca.sql, ca.planRegexp) + } } + ts.testCase(t, mc, execFn, check) // Test case 2: prepare/execute sql cases2 := []struct { prepare string args []interface{} planRegexp string - cancel func() }{ {prepare: "insert into t1 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, {prepare: "replace into t1 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, @@ -1377,41 +1363,35 @@ func TestTopSQLCPUProfile(t *testing.T) { {prepare: "select * from t1 where a=?;", args: []interface{}{1}, planRegexp: ".*Point_Get.*"}, {prepare: "select * from t1 where a in (?,?,?,?)", args: []interface{}{1, 2, 3, 4}, planRegexp: ".*Batch_Point_Get.*"}, } - for i, ca := range cases2 { - ctx, cancel := context.WithCancel(context.Background()) - cases2[i].cancel = cancel - prepare, args := ca.prepare, ca.args - var stmt *sql.Stmt - go ts.loopExec(ctx, t, func(db *sql.DB) { - if stmt == nil { - stmt, err = db.Prepare(prepare) - require.NoError(t, err) - } + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + for _, ca := range cases2 { + prepare, args := ca.prepare, ca.args + stmt := dbt.MustPrepare(prepare) if strings.HasPrefix(prepare, "select") { rows, err := stmt.Query(args...) require.NoError(t, err) + for rows.Next() { + } require.NoError(t, rows.Close()) } else { - // Ignore error here since the error may be write conflict. _, err = stmt.Exec(args...) require.NoError(t, err) } - }) + } } - // Wait the top sql collector to collect profile data. - mc.WaitCollectCnt(1) - // Check result of test case 2. - for _, ca := range cases2 { - checkFn(ca.prepare, ca.planRegexp) - ca.cancel() + check = func() { + for _, ca := range cases2 { + checkFn(ca.prepare, ca.planRegexp) + } } + ts.testCase(t, mc, execFn, check) // Test case 3: prepare, execute stmt using @val... cases3 := []struct { prepare string args []interface{} planRegexp string - cancel func() }{ {prepare: "insert into t2 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, {prepare: "update t2 set b=a where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"}, @@ -1424,60 +1404,218 @@ func TestTopSQLCPUProfile(t *testing.T) { {prepare: "select * from t2 where a=?;", args: []interface{}{1}, planRegexp: ".*Point_Get.*"}, {prepare: "select * from t2 where a in (?,?,?,?)", args: []interface{}{1, 2, 3, 4}, planRegexp: ".*Batch_Point_Get.*"}, } - for i, ca := range cases3 { - ctx, cancel := context.WithCancel(context.Background()) - cases3[i].cancel = cancel - prepare, args := ca.prepare, ca.args - doPrepare := true - go ts.loopExec(ctx, t, func(db *sql.DB) { - if doPrepare { - doPrepare = false - _, err := db.Exec(fmt.Sprintf("prepare stmt from '%v'", prepare)) - require.NoError(t, err) - } - sqlBuf := bytes.NewBuffer(nil) - sqlBuf.WriteString("execute stmt ") + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + for _, ca := range cases3 { + prepare, args := ca.prepare, ca.args + dbt.MustExec(fmt.Sprintf("prepare stmt from '%v'", prepare)) + + var params []string for i := range args { - _, err = db.Exec(fmt.Sprintf("set @%c=%v", 'a'+i, args[i])) - require.NoError(t, err) - if i == 0 { - sqlBuf.WriteString("using ") - } else { - sqlBuf.WriteByte(',') - } - sqlBuf.WriteByte('@') - sqlBuf.WriteByte('a' + byte(i)) + param := 'a' + i + dbt.MustExec(fmt.Sprintf("set @%c=%v", param, args[i])) + params = append(params, fmt.Sprintf("@%c", param)) + } + + sqlStr := "execute stmt" + if len(params) > 0 { + sqlStr += " using " + sqlStr += strings.Join(params, ",") } if strings.HasPrefix(prepare, "select") { - rows, err := db.Query(sqlBuf.String()) - require.NoErrorf(t, err, "%v", sqlBuf.String()) - require.NoError(t, rows.Close()) + mustQuery(t, dbt, sqlStr) } else { - // Ignore error here since the error may be write conflict. - _, err = db.Exec(sqlBuf.String()) - require.NoError(t, err) + dbt.MustExec(sqlStr) } - }) + } } + check = func() { + for _, ca := range cases3 { + checkFn(ca.prepare, ca.planRegexp) + } + } + ts.testCase(t, mc, execFn, check) - // Wait the top sql collector to collect profile data. - mc.WaitCollectCnt(1) - // Check result of test case 3. - for _, ca := range cases3 { - checkFn(ca.prepare, ca.planRegexp) - ca.cancel() + // Test case for other statements + cases4 := []struct { + sql string + plan string + isQuery bool + }{ + {"begin", "", false}, + {"insert into t () values (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),()", "", false}, + {"commit", "", false}, + {"analyze table t", "", false}, + {"explain analyze select sum(a+b) from t", ".*TableReader.*", true}, + {"trace select sum(b*a), sum(a+b) from t", "", true}, + {"set global tidb_stmt_summary_history_size=5;", "", false}, + } + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + for _, ca := range cases4 { + if ca.isQuery { + mustQuery(t, dbt, ca.sql) + } else { + dbt.MustExec(ca.sql) + } + } } + check = func() { + for _, ca := range cases4 { + checkFn(ca.sql, ca.plan) + } + // check for internal SQL. + checkFn("replace into mysql.global_variables (variable_name,variable_value) values ('tidb_stmt_summary_history_size', '5')", "") + } + ts.testCase(t, mc, execFn, check) - // Test case 4: transaction commit - ctx4, cancel4 := context.WithCancel(context.Background()) - defer cancel4() - go ts.loopExec(ctx4, t, func(db *sql.DB) { - db.Exec("begin") - db.Exec("insert into t () values (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),()") - db.Exec("commit") - }) - // Check result of test case 4. - checkFn("commit", "") + // Test case for multi-statement. + cases5 := []string{ + "delete from t limit 1;", + "update t set b=1 where b is null limit 1;", + "select sum(a+b*2) from t;", + } + multiStatement5 := strings.Join(cases5, "") + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("SET tidb_multi_statement_mode='ON'") + dbt.MustExec(multiStatement5) + } + check = func() { + for _, sqlStr := range cases5 { + checkFn(sqlStr, ".*TableReader.*") + } + } + ts.testCase(t, mc, execFn, check) + + // Test case for multi-statement, but first statements execute failed + cases6 := []string{ + "delete from t_not_exist;", + "update t set a=1 where a is null limit 1;", + } + multiStatement6 := strings.Join(cases6, "") + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("SET tidb_multi_statement_mode='ON'") + _, err := db.Exec(multiStatement6) + require.NotNil(t, err) + require.Equal(t, "Error 1146: Table 'topsql.t_not_exist' doesn't exist", err.Error()) + } + check = func() { + for i := 1; i < len(cases6); i++ { + sqlStr := cases6[i] + stats := mc.GetSQLStatsBySQL(sqlStr, false) + require.Equal(t, 0, len(stats), sqlStr) + } + } + ts.testCase(t, mc, execFn, check) + + // Test case for multi-statement, the first statements execute success but the second statement execute failed. + cases7 := []string{ + "update t set a=1 where a <0 limit 1;", + "delete from t_not_exist;", + } + multiStatement7 := strings.Join(cases7, "") + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("SET tidb_multi_statement_mode='ON'") + _, err = db.Exec(multiStatement7) + require.NotNil(t, err) + require.Equal(t, "Error 1146: Table 'topsql.t_not_exist' doesn't exist", err.Error()) + } + check = func() { + checkFn(cases7[0], "") // the first statement execute success, should have topsql data. + } + ts.testCase(t, mc, execFn, check) + + // Test case for statement with wrong syntax. + wrongSyntaxSQL := "select * froms t" + execFn = func(db *sql.DB) { + _, err = db.Exec(wrongSyntaxSQL) + require.NotNil(t, err) + require.Regexp(t, "Error 1064: You have an error in your SQL syntax...", err.Error()) + } + check = func() { + stats := mc.GetSQLStatsBySQL(wrongSyntaxSQL, false) + require.Equal(t, 0, len(stats), wrongSyntaxSQL) + } + ts.testCase(t, mc, execFn, check) + + // Test case for high cost of plan optimize. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/mockHighLoadForOptimize", "return")) + selectSQL := "select sum(a+b), count(distinct b) from t where a+b >0" + updateSQL := "update t set a=a+100 where a > 10000000" + selectInPlanSQL := "select * from t where exists (select 1 from t1 where t1.a = 1);" + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + mustQuery(t, dbt, selectSQL) + dbt.MustExec(updateSQL) + mustQuery(t, dbt, selectInPlanSQL) + } + check = func() { + checkFn(selectSQL, "") + checkFn(updateSQL, "") + selectCPUTime := mc.GetSQLCPUTimeBySQL(selectSQL) + updateCPUTime := mc.GetSQLCPUTimeBySQL(updateSQL) + require.Less(t, updateCPUTime, selectCPUTime) + checkFn(selectInPlanSQL, "") + } + ts.testCase(t, mc, execFn, check) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/mockHighLoadForOptimize")) + + // Test case for DDL execute failed but should still have CPU data. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockHighLoadForAddIndex", "return")) + dbt.MustExec(fmt.Sprintf("insert into t values (%v,%v), (%v, %v);", 2000, 1, 2001, 1)) + addIndexStr := "alter table t add unique index idx_b (b)" + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("alter table t drop index if exists idx_b") + _, err := db.Exec(addIndexStr) + require.NotNil(t, err) + require.Equal(t, "Error 1062: Duplicate entry '1' for key 'idx_b'", err.Error()) + } + check = func() { + checkFn(addIndexStr, "") + } + ts.testCase(t, mc, execFn, check) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockHighLoadForAddIndex")) + + // Test case for execute failed cause by storage error. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/handleTaskOnceError", `return(true)`)) + execFailedQuery := "select * from t where a*b < 1000" + execFn = func(db *sql.DB) { + _, err = db.Query(execFailedQuery) + require.NotNil(t, err) + require.Equal(t, "Error 1105: mock handleTaskOnce error", err.Error()) + } + check = func() { + checkFn(execFailedQuery, "") + } + ts.testCase(t, mc, execFn, check) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/handleTaskOnceError")) +} + +func (ts *tidbTestTopSQLSuite) testCase(t *testing.T, mc *mockTopSQLTraceCPU.TopSQLCollector, execFn func(db *sql.DB), checkFn func()) { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + ts.loopExec(ctx, t, execFn) + }() + + checkFn() + cancel() + wg.Wait() + mc.Reset() +} + +func mustQuery(t *testing.T, dbt *testkit.DBTestKit, query string) { + rows := dbt.MustQuery(query) + for rows.Next() { + } + err := rows.Close() + require.NoError(t, err) } type mockCollector struct { @@ -1492,52 +1630,18 @@ func (c *mockCollector) CollectStmtStatsMap(data stmtstats.StatementStatsMap) { c.f(data) } -func TestTopSQLStatementStats(t *testing.T) { - // Prepare stmt stats. - stmtstats.SetupAggregator() - defer stmtstats.CloseAggregator() - - // Register stmt stats collector. - var mu sync.Mutex - total := stmtstats.StatementStatsMap{} - stmtstats.RegisterCollector(newMockCollector(func(data stmtstats.StatementStatsMap) { - mu.Lock() - defer mu.Unlock() - total.Merge(data) - })) - - ts, cleanup := createTidbTestSuite(t) - defer cleanup() - - db, err := sql.Open("mysql", ts.getDSN()) - require.NoError(t, err) - defer func() { - err := db.Close() - require.NoError(t, err) - }() - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`)) - defer func() { - err = failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop") - require.NoError(t, err) - }() - - dbt := testkit.NewDBTestKit(t, db) - dbt.MustExec("drop database if exists stmtstats") - dbt.MustExec("create database stmtstats") - dbt.MustExec("use stmtstats;") - dbt.MustExec("create table t (a int, b int, unique index idx(a));") - dbt.MustExec("create table t2 (a int, b int, unique index idx(a));") - dbt.MustExec("create table t3 (a int, b int, unique index idx(a));") - - // Enable TopSQL - topsqlstate.EnableTopSQL() - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "mock-agent" - }) +func waitCollected(ch chan struct{}) { + select { + case <-ch: + case <-time.After(time.Second * 3): + } +} - const ExecCountPerSQL = 3 +func TestTopSQLStatementStats(t *testing.T) { + ts, total, collectedNotifyCh, cleanFn := setupForTestTopSQLStatementStats(t) + defer cleanFn() + const ExecCountPerSQL = 2 // Test for CRUD. cases1 := []string{ "insert into t values (%d, sleep(0.1))", @@ -1547,28 +1651,33 @@ func TestTopSQLStatementStats(t *testing.T) { "delete from t where a = %d and sleep(0.1);", "insert into t values (%d, sleep(0.1)) on duplicate key update b = b+1", } + var wg sync.WaitGroup sqlDigests := map[stmtstats.BinaryDigest]string{} for i, ca := range cases1 { sqlStr := fmt.Sprintf(ca, i) _, digest := parser.NormalizeDigest(sqlStr) sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = sqlStr - db, err := sql.Open("mysql", ts.getDSN()) - require.NoError(t, err) - dbt := testkit.NewDBTestKit(t, db) - dbt.MustExec("use stmtstats;") - for n := 0; n < ExecCountPerSQL; n++ { - sqlStr := fmt.Sprintf(ca, n) - if strings.HasPrefix(strings.ToLower(sqlStr), "select") { - row := dbt.MustQuery(sqlStr) - err := row.Close() - require.NoError(t, err) - } else { - dbt.MustExec(sqlStr) + } + wg.Add(1) + go func() { + defer wg.Done() + for _, ca := range cases1 { + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("use stmtstats;") + for n := 0; n < ExecCountPerSQL; n++ { + sqlStr := fmt.Sprintf(ca, n) + if strings.HasPrefix(strings.ToLower(sqlStr), "select") { + mustQuery(t, dbt, sqlStr) + } else { + dbt.MustExec(sqlStr) + } } + err = db.Close() + require.NoError(t, err) } - err = db.Close() - require.NoError(t, err) - } + }() // Test for prepare stmt/execute stmt cases2 := []struct { @@ -1638,28 +1747,32 @@ func TestTopSQLStatementStats(t *testing.T) { for _, ca := range cases2 { _, digest := parser.NormalizeDigest(ca.execStmt) sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = ca.execStmt - db, err := sql.Open("mysql", ts.getDSN()) - require.NoError(t, err) - dbt := testkit.NewDBTestKit(t, db) - dbt.MustExec("use stmtstats;") - // prepare stmt - dbt.MustExec(ca.prepare) - for n := 0; n < ExecCountPerSQL; n++ { - setSQLs := ca.setSQLsGen(n) - for _, setSQL := range setSQLs { - dbt.MustExec(setSQL) - } - if strings.HasPrefix(strings.ToLower(ca.execStmt), "select") { - row := dbt.MustQuery(ca.execSQL) - err := row.Close() - require.NoError(t, err) - } else { - dbt.MustExec(ca.execSQL) + } + wg.Add(1) + go func() { + defer wg.Done() + for _, ca := range cases2 { + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("use stmtstats;") + // prepare stmt + dbt.MustExec(ca.prepare) + for n := 0; n < ExecCountPerSQL; n++ { + setSQLs := ca.setSQLsGen(n) + for _, setSQL := range setSQLs { + dbt.MustExec(setSQL) + } + if strings.HasPrefix(strings.ToLower(ca.execStmt), "select") { + mustQuery(t, dbt, ca.execSQL) + } else { + dbt.MustExec(ca.execSQL) + } } + err = db.Close() + require.NoError(t, err) } - err = db.Close() - require.NoError(t, err) - } + }() // Test for prepare by db client prepare/exec interface. cases3 := []struct { @@ -1714,31 +1827,38 @@ func TestTopSQLStatementStats(t *testing.T) { for _, ca := range cases3 { _, digest := parser.NormalizeDigest(ca.prepare) sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = ca.prepare - db, err := sql.Open("mysql", ts.getDSN()) - require.NoError(t, err) - dbt := testkit.NewDBTestKit(t, db) - dbt.MustExec("use stmtstats;") - // prepare stmt - stmt, err := db.Prepare(ca.prepare) - require.NoError(t, err) - for n := 0; n < ExecCountPerSQL; n++ { - args := ca.argsGen(n) - if strings.HasPrefix(strings.ToLower(ca.prepare), "select") { - row, err := stmt.Query(args...) - require.NoError(t, err) - err = row.Close() - require.NoError(t, err) - } else { - _, err := stmt.Exec(args...) - require.NoError(t, err) + } + wg.Add(1) + go func() { + defer wg.Done() + for _, ca := range cases3 { + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("use stmtstats;") + // prepare stmt + stmt, err := db.Prepare(ca.prepare) + require.NoError(t, err) + for n := 0; n < ExecCountPerSQL; n++ { + args := ca.argsGen(n) + if strings.HasPrefix(strings.ToLower(ca.prepare), "select") { + row, err := stmt.Query(args...) + require.NoError(t, err) + err = row.Close() + require.NoError(t, err) + } else { + _, err := stmt.Exec(args...) + require.NoError(t, err) + } } + err = db.Close() + require.NoError(t, err) } - err = db.Close() - require.NoError(t, err) - } + }() + wg.Wait() // Wait for collect. - time.Sleep(2 * time.Second) + waitCollected(collectedNotifyCh) found := 0 for digest, item := range total { @@ -1747,7 +1867,7 @@ func TestTopSQLStatementStats(t *testing.T) { require.Equal(t, uint64(ExecCountPerSQL), item.ExecCount, sqlStr) require.Equal(t, uint64(ExecCountPerSQL), item.DurationCount, sqlStr) require.True(t, item.SumDurationNs > uint64(time.Millisecond*100*ExecCountPerSQL), sqlStr) - require.True(t, item.SumDurationNs < uint64(time.Millisecond*150*ExecCountPerSQL), sqlStr) + require.True(t, item.SumDurationNs < uint64(time.Millisecond*300*ExecCountPerSQL), sqlStr) if strings.HasPrefix(sqlStr, "set global") { // set global statement use internal SQL to change global variable, so itself doesn't have KV request. continue @@ -1763,6 +1883,319 @@ func TestTopSQLStatementStats(t *testing.T) { require.Equal(t, 20, found) } +func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.StatementStatsMap, chan struct{}, func()) { + // Prepare stmt stats. + stmtstats.SetupAggregator() + + // Register stmt stats collector. + var mu sync.Mutex + collectedNotifyCh := make(chan struct{}) + total := stmtstats.StatementStatsMap{} + mockCollector := newMockCollector(func(data stmtstats.StatementStatsMap) { + mu.Lock() + defer mu.Unlock() + total.Merge(data) + select { + case collectedNotifyCh <- struct{}{}: + default: + } + }) + stmtstats.RegisterCollector(mockCollector) + + ts, cleanup := createTidbTestSuite(t) + + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + defer func() { + err := db.Close() + require.NoError(t, err) + }() + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`)) + + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("drop database if exists stmtstats") + dbt.MustExec("create database stmtstats") + dbt.MustExec("use stmtstats;") + dbt.MustExec("create table t (a int, b int, unique index idx(a));") + dbt.MustExec("create table t2 (a int, b int, unique index idx(a));") + dbt.MustExec("create table t3 (a int, b int, unique index idx(a));") + + // Enable TopSQL + topsqlstate.EnableTopSQL() + config.UpdateGlobal(func(conf *config.Config) { + conf.TopSQL.ReceiverAddress = "mock-agent" + }) + + cleanFn := func() { + stmtstats.UnregisterCollector(mockCollector) + cleanup() + err = failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop") + require.NoError(t, err) + stmtstats.CloseAggregator() + + } + return ts, total, collectedNotifyCh, cleanFn +} + +func TestTopSQLStatementStats2(t *testing.T) { + ts, total, collectedNotifyCh, cleanFn := setupForTestTopSQLStatementStats(t) + defer cleanFn() + + const ExecCountPerSQL = 3 + sqlDigests := map[stmtstats.BinaryDigest]string{} + + // Test case for other statements + cases4 := []struct { + sql string + plan string + isQuery bool + }{ + {"insert into t () values (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),()", "", false}, + {"analyze table t", "", false}, + {"explain analyze select sum(a+b) from t", ".*TableReader.*", true}, + {"trace select sum(b*a), sum(a+b) from t", "", true}, + {"set global tidb_stmt_summary_history_size=5;", "", false}, + {"select * from stmtstats.t where exists (select 1 from stmtstats.t2 where t2.a = 1);", ".*TableReader.*", true}, + } + executeCaseFn := func(execFn func(db *sql.DB)) { + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("use stmtstats;") + require.NoError(t, err) + + for n := 0; n < ExecCountPerSQL; n++ { + execFn(db) + } + err = db.Close() + require.NoError(t, err) + } + execFn := func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + for _, ca := range cases4 { + if ca.isQuery { + mustQuery(t, dbt, ca.sql) + } else { + dbt.MustExec(ca.sql) + } + } + } + for _, ca := range cases4 { + _, digest := parser.NormalizeDigest(ca.sql) + sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = ca.sql + } + executeCaseFn(execFn) + + // Test case for multi-statement. + cases5 := []string{ + "delete from t limit 1;", + "update t set b=1 where b is null limit 1;", + "select sum(a+b*2) from t;", + } + multiStatement5 := strings.Join(cases5, "") + // Test case for multi-statement, but first statements execute failed + cases6 := []string{ + "delete from t6_not_exist;", + "update t set a=1 where a is null limit 1;", + } + multiStatement6 := strings.Join(cases6, "") + // Test case for multi-statement, the first statements execute success but the second statement execute failed. + cases7 := []string{ + "update t set a=1 where a <0 limit 1;", + "delete from t7_not_exist;", + } + // Test case for DDL. + cases8 := []string{ + "create table if not exists t10 (a int, b int)", + "alter table t drop index if exists idx_b", + "alter table t add index idx_b (b)", + } + multiStatement7 := strings.Join(cases7, "") + execFn = func(db *sql.DB) { + dbt := testkit.NewDBTestKit(t, db) + dbt.MustExec("SET tidb_multi_statement_mode='ON'") + dbt.MustExec(multiStatement5) + + _, err := db.Exec(multiStatement6) + require.NotNil(t, err) + require.Equal(t, "Error 1146: Table 'stmtstats.t6_not_exist' doesn't exist", err.Error()) + + _, err = db.Exec(multiStatement7) + require.NotNil(t, err) + require.Equal(t, "Error 1146: Table 'stmtstats.t7_not_exist' doesn't exist", err.Error()) + + for _, ca := range cases8 { + dbt.MustExec(ca) + } + } + executeCaseFn(execFn) + sqlStrs := append([]string{}, cases5...) + sqlStrs = append(sqlStrs, cases7[0]) + sqlStrs = append(sqlStrs, cases8...) + for _, sqlStr := range sqlStrs { + _, digest := parser.NormalizeDigest(sqlStr) + sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = sqlStr + } + + // Wait for collect. + waitCollected(collectedNotifyCh) + + foundMap := map[stmtstats.BinaryDigest]string{} + for digest, item := range total { + if sqlStr, ok := sqlDigests[digest.SQLDigest]; ok { + require.Equal(t, uint64(ExecCountPerSQL), item.ExecCount, sqlStr) + require.True(t, item.SumDurationNs > 1, sqlStr) + foundMap[digest.SQLDigest] = sqlStr + } + } + require.Equal(t, len(sqlDigests), len(foundMap), fmt.Sprintf("%v !=\n %v", sqlDigests, foundMap)) +} + +func TestTopSQLStatementStats3(t *testing.T) { + ts, total, collectedNotifyCh, cleanFn := setupForTestTopSQLStatementStats(t) + defer cleanFn() + + err := failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepInTableReaderNext", "return(2000)") + require.NoError(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepInTableReaderNext") + }() + + cases := []string{ + "select count(a+b) from stmtstats.t", + "select * from stmtstats.t where b is null", + "update stmtstats.t set b = 1 limit 10", + "delete from stmtstats.t limit 1", + } + var wg sync.WaitGroup + sqlDigests := map[stmtstats.BinaryDigest]string{} + for _, ca := range cases { + wg.Add(1) + go func(sqlStr string) { + defer wg.Done() + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + dbt := testkit.NewDBTestKit(t, db) + require.NoError(t, err) + if strings.HasPrefix(sqlStr, "select") { + mustQuery(t, dbt, sqlStr) + } else { + dbt.MustExec(sqlStr) + } + err = db.Close() + require.NoError(t, err) + }(ca) + _, digest := parser.NormalizeDigest(ca) + sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = ca + } + // Wait for collect. + waitCollected(collectedNotifyCh) + + foundMap := map[stmtstats.BinaryDigest]string{} + for digest, item := range total { + if sqlStr, ok := sqlDigests[digest.SQLDigest]; ok { + // since the SQL doesn't execute finish, the ExecCount should be recorded, + // but the DurationCount and SumDurationNs should be 0. + require.Equal(t, uint64(1), item.ExecCount, sqlStr) + require.Equal(t, uint64(0), item.DurationCount, sqlStr) + require.Equal(t, uint64(0), item.SumDurationNs, sqlStr) + foundMap[digest.SQLDigest] = sqlStr + } + } + + // wait sql execute finish. + wg.Wait() + // Wait for collect. + waitCollected(collectedNotifyCh) + + for digest, item := range total { + if sqlStr, ok := sqlDigests[digest.SQLDigest]; ok { + require.Equal(t, uint64(1), item.ExecCount, sqlStr) + require.Equal(t, uint64(1), item.DurationCount, sqlStr) + require.Less(t, uint64(0), item.SumDurationNs, sqlStr) + foundMap[digest.SQLDigest] = sqlStr + } + } +} + +func TestTopSQLStatementStats4(t *testing.T) { + ts, total, collectedNotifyCh, cleanFn := setupForTestTopSQLStatementStats(t) + defer cleanFn() + + err := failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepInTableReaderNext", "return(2000)") + require.NoError(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepInTableReaderNext") + }() + + cases := []struct { + prepare string + sql string + args []interface{} + }{ + {prepare: "select count(a+b) from stmtstats.t", sql: "select count(a+b) from stmtstats.t"}, + {prepare: "select * from stmtstats.t where b is null", sql: "select * from stmtstats.t where b is null"}, + {prepare: "update stmtstats.t set b = ? limit ?", sql: "update stmtstats.t set b = 1 limit 10", args: []interface{}{1, 10}}, + {prepare: "delete from stmtstats.t limit ?", sql: "delete from stmtstats.t limit 1", args: []interface{}{1}}, + } + var wg sync.WaitGroup + sqlDigests := map[stmtstats.BinaryDigest]string{} + for _, ca := range cases { + wg.Add(1) + go func(prepare string, args []interface{}) { + defer wg.Done() + db, err := sql.Open("mysql", ts.getDSN()) + require.NoError(t, err) + stmt, err := db.Prepare(prepare) + require.NoError(t, err) + if strings.HasPrefix(prepare, "select") { + rows, err := stmt.Query(args...) + require.NoError(t, err) + for rows.Next() { + } + err = rows.Close() + require.NoError(t, err) + } else { + _, err := stmt.Exec(args...) + require.NoError(t, err) + } + err = db.Close() + require.NoError(t, err) + }(ca.prepare, ca.args) + _, digest := parser.NormalizeDigest(ca.sql) + sqlDigests[stmtstats.BinaryDigest(digest.Bytes())] = ca.sql + } + // Wait for collect. + waitCollected(collectedNotifyCh) + + foundMap := map[stmtstats.BinaryDigest]string{} + for digest, item := range total { + if sqlStr, ok := sqlDigests[digest.SQLDigest]; ok { + // since the SQL doesn't execute finish, the ExecCount should be recorded, + // but the DurationCount and SumDurationNs should be 0. + require.Equal(t, uint64(1), item.ExecCount, sqlStr) + require.Equal(t, uint64(0), item.DurationCount, sqlStr) + require.Equal(t, uint64(0), item.SumDurationNs, sqlStr) + foundMap[digest.SQLDigest] = sqlStr + } + } + + // wait sql execute finish. + wg.Wait() + // Wait for collect. + waitCollected(collectedNotifyCh) + + for digest, item := range total { + if sqlStr, ok := sqlDigests[digest.SQLDigest]; ok { + require.Equal(t, uint64(1), item.ExecCount, sqlStr) + require.Equal(t, uint64(1), item.DurationCount, sqlStr) + require.Less(t, uint64(0), item.SumDurationNs, sqlStr) + foundMap[digest.SQLDigest] = sqlStr + } + } +} + func (ts *tidbTestTopSQLSuite) loopExec(ctx context.Context, t *testing.T, fn func(db *sql.DB)) { db, err := sql.Open("mysql", ts.getDSN()) require.NoError(t, err, "Error connecting") diff --git a/util/topsql/collector/mock/mock.go b/util/topsql/collector/mock/mock.go index ca6fca7c2d961..50d44bdca0b86 100644 --- a/util/topsql/collector/mock/mock.go +++ b/util/topsql/collector/mock/mock.go @@ -118,6 +118,20 @@ func (c *TopSQLCollector) GetSQLStatsBySQL(sql string, planIsNotNull bool) []*co return stats } +// GetSQLCPUTimeBySQL uses for testing. +func (c *TopSQLCollector) GetSQLCPUTimeBySQL(sql string) uint32 { + sqlDigest := GenSQLDigest(sql) + cpuTime := uint32(0) + c.Lock() + for _, stmt := range c.sqlStatsMap { + if bytes.Equal(stmt.SQLDigest, sqlDigest.Bytes()) { + cpuTime += stmt.CPUTimeMs + } + } + c.Unlock() + return cpuTime +} + // GetSQL uses for testing. func (c *TopSQLCollector) GetSQL(sqlDigest []byte) string { c.Lock() @@ -175,6 +189,16 @@ func (c *TopSQLCollector) WaitCollectCnt(count int64) { } } +// Reset cleans all collected data. +func (c *TopSQLCollector) Reset() { + c.Lock() + defer c.Unlock() + c.sqlMap = make(map[string]string) + c.planMap = make(map[string]string) + c.sqlStatsMap = make(map[string]*collector.SQLCPUTimeRecord) + c.collectCnt.Store(0) +} + // CollectCnt uses for testing. func (c *TopSQLCollector) CollectCnt() int64 { return c.collectCnt.Load() diff --git a/util/topsql/stmtstats/aggregator.go b/util/topsql/stmtstats/aggregator.go index 97daaf0f5964b..5c3c20500f267 100644 --- a/util/topsql/stmtstats/aggregator.go +++ b/util/topsql/stmtstats/aggregator.go @@ -38,6 +38,7 @@ type aggregator struct { statsSet sync.Map // map[*StatementStats]struct{} collectors sync.Map // map[Collector]struct{} running *atomic.Bool + wg sync.WaitGroup } // newAggregator creates an empty aggregator. @@ -45,15 +46,23 @@ func newAggregator() *aggregator { return &aggregator{running: atomic.NewBool(false)} } -// run will block the current goroutine and execute the main loop of aggregator. -func (m *aggregator) run() { +func (m *aggregator) start() { + if m.running.Load() { + return + } m.ctx, m.cancel = context.WithCancel(context.Background()) m.running.Store(true) + m.wg.Add(1) + go m.run() +} + +// run will block the current goroutine and execute the main loop of aggregator. +func (m *aggregator) run() { + tick := time.NewTicker(time.Second) defer func() { - m.running.Store(false) + tick.Stop() + m.wg.Done() }() - tick := time.NewTicker(time.Second) - defer tick.Stop() for { select { case <-m.ctx.Done(): @@ -119,7 +128,14 @@ func (m *aggregator) unregisterCollector(collector Collector) { // close ends the execution of the current aggregator. func (m *aggregator) close() { - m.cancel() + if !m.running.Load() { + return + } + if m.cancel != nil { + m.cancel() + } + m.running.Store(false) + m.wg.Wait() } // closed returns whether the aggregator has been closed. @@ -130,17 +146,13 @@ func (m *aggregator) closed() bool { // SetupAggregator is used to initialize the background aggregator goroutine of the stmtstats module. // SetupAggregator is **not** thread-safe. func SetupAggregator() { - if globalAggregator.closed() { - go globalAggregator.run() - } + globalAggregator.start() } // CloseAggregator is used to stop the background aggregator goroutine of the stmtstats module. // SetupAggregator is **not** thread-safe. func CloseAggregator() { - if !globalAggregator.closed() { - globalAggregator.close() - } + globalAggregator.close() } // RegisterCollector binds a Collector to globalAggregator. diff --git a/util/topsql/stmtstats/aggregator_test.go b/util/topsql/stmtstats/aggregator_test.go index 1de1f5970ac13..fcd3027791c01 100644 --- a/util/topsql/stmtstats/aggregator_test.go +++ b/util/topsql/stmtstats/aggregator_test.go @@ -15,6 +15,7 @@ package stmtstats import ( + "math/rand" "sync" "testing" "time" @@ -69,19 +70,23 @@ func Test_aggregator_register_collect(t *testing.T) { } func Test_aggregator_run_close(t *testing.T) { - wg := sync.WaitGroup{} a := newAggregator() assert.True(t, a.closed()) - wg.Add(1) - go func() { - a.run() - wg.Done() - }() + a.start() time.Sleep(100 * time.Millisecond) assert.False(t, a.closed()) a.close() - wg.Wait() assert.True(t, a.closed()) + + // randomly start and close + for i := 0; i < 100; i++ { + if rand.Intn(2) == 0 { + a.start() + } else { + a.close() + } + } + a.close() } func TestAggregatorDisableAggregate(t *testing.T) { @@ -94,12 +99,7 @@ func TestAggregatorDisableAggregate(t *testing.T) { mu.Unlock() })) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - a.run() - wg.Done() - }() + a.start() stats := &StatementStats{ data: StatementStatsMap{ @@ -121,7 +121,6 @@ func TestAggregatorDisableAggregate(t *testing.T) { state.DisableTopSQL() a.close() - wg.Wait() } type mockCollector struct { diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 1e859d51d0737..36b26ee05cd11 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -104,34 +104,43 @@ func AttachSQLInfo(ctx context.Context, normalizedSQL string, sqlDigest *parser. // Attention: Top SQL pprof profile unable to sample data of those SQL which run very fast, this behavior is expected. // The integration test was just want to make sure each type of SQL will be set goroutine labels and and can be collected. if val.(bool) { - lowerSQL := strings.ToLower(normalizedSQL) - if strings.Contains(lowerSQL, "mysql") { - failpoint.Return(ctx) - } - isDML := false - for _, prefix := range []string{"insert", "update", "delete", "load", "replace", "select", "commit"} { - if strings.HasPrefix(lowerSQL, prefix) { - isDML = true - break - } - } - if !isDML { - failpoint.Return(ctx) - } - start := time.Now() - logutil.BgLogger().Info("attach SQL info", zap.String("sql", normalizedSQL), zap.Bool("has-plan", len(normalizedPlan) > 0)) - for { - if time.Since(start) > 11*time.Millisecond { - break - } - for i := 0; i < 10e5; i++ { - } + sqlPrefixes := []string{"insert", "update", "delete", "load", "replace", "select", "begin", + "commit", "analyze", "explain", "trace", "create", "set global"} + if MockHighCPULoad(normalizedSQL, sqlPrefixes, 1) { + logutil.BgLogger().Info("attach SQL info", zap.String("sql", normalizedSQL), zap.Bool("has-plan", len(normalizedPlan) > 0)) } } }) return ctx } +// MockHighCPULoad mocks high cpu load, only use in failpoint test. +func MockHighCPULoad(sql string, sqlPrefixs []string, load int64) bool { + lowerSQL := strings.ToLower(sql) + if strings.Contains(lowerSQL, "mysql") && !strings.Contains(lowerSQL, "global_variables") { + return false + } + match := false + for _, prefix := range sqlPrefixs { + if strings.HasPrefix(lowerSQL, prefix) { + match = true + break + } + } + if !match { + return false + } + start := time.Now() + for { + if time.Since(start) > 12*time.Millisecond*time.Duration(load) { + break + } + for i := 0; i < 10e5; i++ { + } + } + return true +} + func linkSQLTextWithDigest(sqlDigest []byte, normalizedSQL string, isInternal bool) { if len(normalizedSQL) > MaxSQLTextSize { normalizedSQL = normalizedSQL[:MaxSQLTextSize]