Skip to content

Commit

Permalink
ddl: check DDL history job in test (#33079)
Browse files Browse the repository at this point in the history
ref #33078, fix #33134
  • Loading branch information
wjhuang2016 authored Mar 17, 2022
1 parent 95a346c commit f950b21
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 1 deletion.
4 changes: 4 additions & 0 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func buildCreateColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colNam
func testCreateColumn(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
job := buildCreateColumnJob(dbInfo, tblInfo, colName, pos, defaultValue)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand Down Expand Up @@ -132,6 +133,7 @@ func buildCreateColumnsJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colNa
func testCreateColumns(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colNames []string, positions []*ast.ColumnPosition, defaultValue interface{}) *model.Job {
job := buildCreateColumnsJob(dbInfo, tblInfo, colNames, positions, defaultValue)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand All @@ -152,6 +154,7 @@ func buildDropColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName

func testDropColumn(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := buildDropColumnJob(dbInfo, tblInfo, colName)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
if isError {
require.Error(t, err)
Expand Down Expand Up @@ -181,6 +184,7 @@ func buildDropColumnsJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName

func testDropColumns(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colNames []string, isError bool) *model.Job {
job := buildDropColumnsJob(dbInfo, tblInfo, colNames)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
if isError {
require.Error(t, err)
Expand Down
2 changes: 2 additions & 0 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -596,6 +597,7 @@ func TestBatchCreateTable(t *testing.T) {
newinfo.View = &model.ViewInfo{Cols: viewCols, Security: model.SecurityDefiner, Algorithm: model.AlgorithmMerge, SelectStmt: stmtBuffer.String(), CheckOption: model.CheckOptionCascaded, Definer: &auth.UserIdentity{CurrentUser: true}}
}

tk.Session().SetValue(sessionctx.QueryString, "skip")
err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), []*model.TableInfo{newinfo}, ddl.OnExistError)
require.NoError(t, err)
}
Expand Down
54 changes: 53 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -610,13 +611,62 @@ func recordLastDDLInfo(ctx sessionctx.Context, job *model.Job) {
ctx.GetSessionVars().LastDDLInfo.SeqNum = job.SeqNum
}

func checkHistoryJobInTest(ctx sessionctx.Context, historyJob *model.Job) {
if !(flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil) {
return
}

// Check binlog.
if historyJob.BinlogInfo.FinishedTS == 0 {
panic(fmt.Sprintf("job ID %d, BinlogInfo.FinishedTS is 0", historyJob.ID))
}

// Check DDL query.
switch historyJob.Type {
case model.ActionUpdateTiFlashReplicaStatus, model.ActionUnlockTable:
if historyJob.Query != "" {
panic(fmt.Sprintf("job ID %d, type %s, query %s", historyJob.ID, historyJob.Type.String(), historyJob.Query))
}
return
default:
if historyJob.Query == "skip" {
// Skip the check if the test explicitly set the query.
return
}
}
p := parser.New()
p.SetSQLMode(ctx.GetSessionVars().SQLMode)
p.SetParserConfig(ctx.GetSessionVars().BuildParserConfig())
stmt, _, err := p.ParseSQL(historyJob.Query)
if err != nil {
panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s, err %s", historyJob.ID, historyJob.Query, err.Error()))
}
if len(stmt) != 1 && historyJob.Type != model.ActionCreateTables {
panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query))
}
for _, st := range stmt {
if _, ok := st.(ast.DDLNode); !ok {
panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query))
}
}
}

func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
switch job.Type {
case model.ActionUpdateTiFlashReplicaStatus, model.ActionUnlockTable:
job.Query = ""
default:
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
}
}

// doDDLJob will return
// - nil: found in history DDL job and no job error
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Get a global job ID and put the DDL job in the queue.
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
d.limitJobCh <- task
// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
Expand Down Expand Up @@ -673,6 +723,8 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
continue
}

checkHistoryJobInTest(ctx, historyJob)

// If a job is a history job, the state must be JobStateSynced or JobStateRollbackDone or JobStateCancelled.
if historyJob.IsSynced() {
// Judge whether there are some warnings when executing DDL under the certain SQL mode.
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6917,6 +6917,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("table too large")
}

ddlQuery, _ := ctx.Value(sessionctx.QueryString).(string)
// Initialize the cached table meta lock info in `mysql.table_cache_meta`.
// The operation shouldn't fail in most cases, and if it does, return the error directly.
// This DML and the following DDL is not atomic, that's not a problem.
Expand All @@ -6925,6 +6926,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
if err != nil {
return errors.Trace(err)
}
ctx.SetValue(sessionctx.QueryString, ddlQuery)

job := &model.Job{
SchemaID: schema.ID,
Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bo
func testCreatePrimaryKey(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string) *model.Job {
job := buildCreateIdxJob(dbInfo, tblInfo, true, "primary", colName)
job.Type = model.ActionAddPrimaryKey
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand All @@ -157,6 +158,7 @@ func testCreatePrimaryKey(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *

func testCreateIndex(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := buildCreateIdxJob(dbInfo, tblInfo, unique, indexName, colName)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand All @@ -172,6 +174,7 @@ func testAddColumn(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.D
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand All @@ -187,6 +190,7 @@ func testAddColumns(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand All @@ -210,6 +214,7 @@ func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName s

func testDropIndex(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := buildDropIdxJob(dbInfo, tblInfo, indexName)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func TestInvalidDDLJob(t *testing.T) {
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
require.Equal(t, err.Error(), "[ddl:8204]invalid ddl job type: none")
}
Expand Down Expand Up @@ -490,6 +491,7 @@ func (s *testDDLSerialSuiteToVerify) TestAddBatchJobError() {
require.Nil(s.T(), failpoint.Enable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr", `return(true)`))
// Test the job runner should not hang forever.
job := &model.Job{SchemaID: 1, TableID: 1}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
require.Error(s.T(), err)
require.Equal(s.T(), err.Error(), "mockAddBatchDDLJobsErr")
Expand Down Expand Up @@ -539,6 +541,7 @@ func doDDLJobErrWithSchemaState(ctx sessionctx.Context, d *ddl, t *testing.T, sc
BinlogInfo: &model.HistoryInfo{},
}
// TODO: check error detail
ctx.SetValue(sessionctx.QueryString, "skip")
require.Error(t, d.doDDLJob(ctx, job))
testCheckJobCancelled(t, d, job, state)

Expand All @@ -554,6 +557,7 @@ func doDDLJobSuccess(ctx sessionctx.Context, d *ddl, t *testing.T, schemaID, tab
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
}
Expand Down
2 changes: 2 additions & 0 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func testCreateForeignKey(t *testing.T, d *ddl, ctx sessionctx.Context, dbInfo *
}
err := ctx.NewTxn(context.Background())
require.NoError(t, err)
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
require.NoError(t, err)
return job
Expand All @@ -73,6 +74,7 @@ func testDropForeignKey(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *mo
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(foreignKeyName)},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand Down
2 changes: 2 additions & 0 deletions ddl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func buildDropPartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, partN

func testDropPartition(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
job := buildDropPartitionJob(dbInfo, tblInfo, partNames)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand All @@ -139,6 +140,7 @@ func buildTruncatePartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, p

func testTruncatePartition(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
job := buildTruncatePartitionJob(dbInfo, tblInfo, pids)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
v := getSchemaVer(t, ctx)
Expand Down
1 change: 1 addition & 0 deletions ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func testCreatePlacementPolicy(t *testing.T, ctx sessionctx.Context, d *ddl, pol
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policyInfo},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions ddl/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -70,6 +71,7 @@ func runInterruptedJob(d *ddl, job *model.Job, doneCh chan error) {
err error
)

ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
if errors.Is(err, context.Canceled) {
endlessLoopTime := time.Now().Add(time.Minute)
Expand Down
3 changes: 3 additions & 0 deletions ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func testCreateSchema(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *mode
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{dbInfo},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.doDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
Expand All @@ -68,6 +69,7 @@ func buildDropSchemaJob(dbInfo *model.DBInfo) *model.Job {

func testDropSchema(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) (*model.Job, int64) {
job := buildDropSchemaJob(dbInfo)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)
ver := getSchemaVer(t, ctx)
Expand Down Expand Up @@ -176,6 +178,7 @@ func ExportTestSchema(t *testing.T) {
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
require.True(t, terror.ErrorEqual(err, infoschema.ErrDatabaseDropExists), "err %v", err)

Expand Down
2 changes: 2 additions & 0 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -69,6 +70,7 @@ func TestDDLStatsInfo(t *testing.T) {

done := make(chan error, 1)
go func() {
ctx.SetValue(sessionctx.QueryString, "skip")
done <- d.doDDLJob(ctx, job)
}()

Expand Down
7 changes: 7 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func testRenameTable(
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaID, tblInfo.Name, oldSchemaName},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.doDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
Expand All @@ -65,6 +66,7 @@ func testRenameTables(
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.doDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
Expand All @@ -87,6 +89,7 @@ func testLockTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{arg},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)

Expand Down Expand Up @@ -125,6 +128,7 @@ func testTruncateTable(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *mod
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newTableID},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
require.NoError(t, err)

Expand Down Expand Up @@ -269,6 +273,7 @@ func testAlterCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchema
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)

Expand All @@ -285,6 +290,7 @@ func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSche
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.doDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
Expand Down Expand Up @@ -391,6 +397,7 @@ func TestCreateTables(t *testing.T) {
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{infos},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.doDDLJob(ctx, job)
require.NoError(t, err)

Expand Down
3 changes: 3 additions & 0 deletions ddl/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func testCreateTable(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tblInfo},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.doDDLJob(ctx, job)
require.NoError(t, err)

Expand All @@ -189,6 +190,7 @@ func testCreateView(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.
}

require.True(t, tblInfo.IsView())
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.doDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
Expand All @@ -205,6 +207,7 @@ func testDropTable(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.D
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.doDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
Expand Down
Loading

0 comments on commit f950b21

Please sign in to comment.