Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: implement the core for multi-schema change #35429

Merged
merged 20 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ var allTestCase = []testCancelJob{
{"alter table t_partition truncate partition p3", true, model.StateNone, true, false, nil},
{"alter table t_partition truncate partition p3", false, model.StatePublic, false, true, nil},
// Add columns.
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateNone, true, false, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateDeleteOnly, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteOnly, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteReorganization, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", false, model.StatePublic, false, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateNone, model.StateNone}, true, false, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateDeleteOnly, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteOnly, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteReorganization, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", false, subStates{model.StatePublic, model.StatePublic}, false, true, nil},
// Drop columns.
// TODO: fix schema state.
{"alter table t drop column c41, drop column c42", true, model.StateNone, true, false, nil},
Expand Down
76 changes: 56 additions & 20 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) {
tblInfo.Columns = newCols
}

func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) {
func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) {
// Check column name duplicate.
cols := tblInfo.Columns
offset := len(cols)
Expand Down Expand Up @@ -115,30 +115,52 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *
return colInfo, pos, offset, nil
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) {
func initAndAddColumnToTable(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo {
cols := tblInfo.Columns
colInfo.ID = allocateColumnID(tblInfo)
colInfo.State = model.StateNone
// To support add column asynchronous, we should mark its offset as the last column.
// So that we can use origin column offset to get value from row.
colInfo.Offset = len(cols)
// Append the column info to the end of the tblInfo.Columns.
// It will reorder to the right offset in "Columns" when it state change to public.
tblInfo.Columns = append(cols, colInfo)
Comment on lines +120 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is duplicated with line106-114? Could we use it for line106-114?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For now, line106-114 is only used by onAddColumns()(the plural version), which will be removed in the following PRs.

return colInfo
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo,
*ast.ColumnPosition, bool /* ifNotExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, 0, errors.Trace(err)
return nil, nil, nil, nil, false, errors.Trace(err)
}
col := &model.ColumnInfo{}
pos := &ast.ColumnPosition{}
offset := 0
err = job.DecodeArgs(col, pos, &offset)
ifNotExists := false
err = job.DecodeArgs(col, pos, &offset, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, 0, errors.Trace(err)
return nil, nil, nil, nil, false, errors.Trace(err)
}

columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
return nil, nil, nil, nil, 0, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
return nil, nil, nil, nil, ifNotExists, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
}
return tblInfo, columnInfo, col, pos, offset, nil

err = checkAfterPositionExists(tblInfo, pos)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, false, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}

return tblInfo, columnInfo, col, pos, false, nil
}

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
Expand All @@ -157,21 +179,18 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}
})

tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job)
tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(t, job)
if err != nil {
if ifNotExists && infoschema.ErrColumnExists.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
return ver, nil
}
return ver, errors.Trace(err)
}
if columnInfo == nil {
columnInfo, _, offset, err = createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
// Set offset arg to job.
if offset != 0 {
job.Args = []interface{}{columnInfo, pos, offset}
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
}
columnInfo = initAndAddColumnToTable(tblInfo, colFromArgs)
logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo))
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -206,9 +225,14 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}
// Update the job state when all affairs done.
job.SchemaState = model.StateWriteReorganization
job.MarkNonRevertible()
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offset.
offset, err := locateOffsetToMove(columnInfo.Offset, pos, tblInfo)
if err != nil {
return ver, errors.Trace(err)
}
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfo.State)
Expand Down Expand Up @@ -276,6 +300,18 @@ func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
}
}

// checkAfterPositionExists makes sure the column specified in AFTER clause is exists.
// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1.
func checkAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error {
if pos != nil && pos.Tp == ast.ColumnPositionAfter {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
if c == nil {
return infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}
}
return nil
}

func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
for _, indexInfo := range indexInfos {
indexInfo.State = state
Expand Down Expand Up @@ -308,7 +344,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfo(tblInfo, columns[i], positions[i])
columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -856,7 +892,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
return ver, errors.Trace(err)
}

_, _, _, err = createColumnInfo(tblInfo, modifyInfo.changingCol, changingColPos)
_, _, _, err = createColumnInfoWithPosCheck(tblInfo, modifyInfo.changingCol, changingColPos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
8 changes: 7 additions & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,13 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) {
require.NoError(t, err)
require.Equal(t, historyJob.State, model.JobStateSynced)
if isAdd {
require.Equal(t, historyJob.SchemaState, model.StatePublic)
if historyJob.Type == model.ActionMultiSchemaChange {
for _, sub := range historyJob.MultiSchemaInfo.SubJobs {
require.Equal(t, sub.SchemaState, model.StatePublic)
}
} else {
require.Equal(t, historyJob.SchemaState, model.StatePublic)
}
} else {
require.Equal(t, historyJob.SchemaState, model.StateNone)
}
Expand Down
14 changes: 11 additions & 3 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,11 @@ func runTestInSchemaState(
_, err = se.Execute(context.Background(), "use test_db_state")
require.NoError(t, err)
cbFunc := func(job *model.Job) {
if job.SchemaState == prevState || checkErr != nil {
if jobStateOrLastSubJobState(job) == prevState || checkErr != nil {
return
}
prevState = job.SchemaState
if job.SchemaState != state {
prevState = jobStateOrLastSubJobState(job)
if prevState != state {
return
}
for _, sqlWithErr := range sqlWithErrs {
Expand Down Expand Up @@ -877,6 +877,14 @@ func runTestInSchemaState(
}
}

func jobStateOrLastSubJobState(job *model.Job) model.SchemaState {
if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
subs := job.MultiSchemaInfo.SubJobs
return subs[len(subs)-1].SchemaState
}
return job.SchemaState
}

func TestShowIndex(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond)
defer clean()
Expand Down
3 changes: 1 addition & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestIssue5092(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// The following two statements are consistent with MariaDB.
tk.MustGetErrCode("alter table t_issue_5092 add column if not exists d int, add column d int", errno.ErrDupFieldName)
tk.MustExec("alter table t_issue_5092 add column dd int, add column if not exists dd int")
tk.MustGetErrCode("alter table t_issue_5092 add column dd int, add column if not exists dd int", errno.ErrUnsupportedDDLOperation)
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
tk.MustExec("alter table t_issue_5092 add column if not exists (d int, e int), add column ff text")
tk.MustExec("alter table t_issue_5092 add column b2 int after b1, add column c2 int first")
tk.MustQuery("show create table t_issue_5092").Check(testkit.Rows("t_issue_5092 CREATE TABLE `t_issue_5092` (\n" +
Expand All @@ -417,7 +417,6 @@ func TestIssue5092(t *testing.T) {
" `c1` int(11) DEFAULT NULL,\n" +
" `f` int(11) DEFAULT NULL,\n" +
" `g` int(11) DEFAULT NULL,\n" +
" `dd` int(11) DEFAULT NULL,\n" +
" `ff` text DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table t_issue_5092")
Expand Down
13 changes: 7 additions & 6 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,12 @@ func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@zimulala zimulala Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the ddl_api level, please add a parallel processing test:
when processing add columns (Multi-Schema change) operation, other DDL requests came in.

Copy link
Contributor Author

@tangenta tangenta Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the test like this? This test puts 2 multi-schema change DDLs to the job queue at the same time.

func TestMultiSchemaChangeAddColumnsParallel(t *testing.T) {
	store, clean := testkit.CreateMockStore(t)
	defer clean()
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("create table t (a int default 1);")
	tk.MustExec("insert into t values ();")
	putTheSameDDLJobTwice(t, func() {
		tk.MustExec("alter table t add column if not exists b int default 2, " +
			"add column if not exists c int default 3;")
		tk.MustQuery("show warnings").Check(testkit.Rows(
			"Note 1060 Duplicate column name 'b'",
			"Note 1060 Duplicate column name 'c'",
		))
	})
	tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
	tk.MustExec("drop table if exists t;")
	tk.MustExec("create table t (a int);")
	putTheSameDDLJobTwice(t, func() {
		tk.MustGetErrCode("alter table t add column b int, add column c int;", errno.ErrDupFieldName)
	})
}

Let me add it in the next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "putTheSameDDLJobTwice" does, but it's a similar test

// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}

// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
Expand Down Expand Up @@ -783,12 +789,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
}
}

if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
appendMultiChangeWarningsToOwnerCtx(ctx, historyJob)

logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID))
return nil
Expand Down
45 changes: 30 additions & 15 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2951,8 +2951,21 @@ func needToOverwriteColCharset(options []*ast.TableOption) bool {
return false
}

// resolveAlterTableAddColumns splits "add columns" to multiple spec. For example,
// `ALTER TABLE ADD COLUMN (c1 INT, c2 INT)` is split into
// `ALTER TABLE ADD COLUMN c1 INT, ADD COLUMN c2 INT`.
func resolveAlterTableAddColumns(spec *ast.AlterTableSpec) []*ast.AlterTableSpec {
specs := make([]*ast.AlterTableSpec, len(spec.NewColumns))
for i, col := range spec.NewColumns {
t := *spec
tangenta marked this conversation as resolved.
Show resolved Hide resolved
t.NewColumns = []*ast.ColumnDef{col}
specs[i] = &t
}
return specs
}

// resolveAlterTableSpec resolves alter table algorithm and removes ignore table spec in specs.
// returns valied specs, and the occurred error.
// returns valid specs, and the occurred error.
func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) ([]*ast.AlterTableSpec, error) {
validSpecs := make([]*ast.AlterTableSpec, 0, len(specs))
algorithm := ast.AlgorithmTypeDefault
Expand All @@ -2964,7 +2977,11 @@ func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec)
if isIgnorableSpec(spec.Tp) {
continue
}
validSpecs = append(validSpecs, spec)
if spec.Tp == ast.AlterTableAddColumns && len(spec.NewColumns) > 1 {
tangenta marked this conversation as resolved.
Show resolved Hide resolved
validSpecs = append(validSpecs, resolveAlterTableAddColumns(spec)...)
} else {
validSpecs = append(validSpecs, spec)
}
}

// Verify whether the algorithm is supported.
Expand Down Expand Up @@ -3045,9 +3062,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
}

if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns:
err = d.AddColumns(sctx, ident, validSpecs)
useMultiSchemaChange = true
case ast.AlterTableDropColumn:
err = d.DropColumns(sctx, ident, validSpecs)
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
Expand All @@ -3058,7 +3076,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
if err != nil {
return errors.Trace(err)
}
return nil
if !useMultiSchemaChange {
return nil
}
}

if len(validSpecs) > 1 {
Expand All @@ -3069,11 +3089,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
var handledCharsetOrCollate bool
switch spec.Tp {
case ast.AlterTableAddColumns:
if len(spec.NewColumns) != 1 {
err = d.AddColumns(sctx, ident, []*ast.AlterTableSpec{spec})
} else {
err = d.AddColumn(sctx, ident, spec)
}
err = d.AddColumn(sctx, ident, spec)
case ast.AlterTableAddPartitions:
err = d.AddTablePartitions(sctx, ident, spec)
case ast.AlterTableCoalescePartitions:
Expand Down Expand Up @@ -3524,6 +3540,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if col == nil {
return nil
}
err = checkAfterPositionExists(t.Meta(), spec.Position)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
Expand All @@ -3532,15 +3552,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
TableName: t.Meta().Name.L,
Type: model.ActionAddColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, spec.Position, 0},
Args: []interface{}{col, spec.Position, 0, spec.IfNotExists},
}

err = d.DoDDLJob(ctx, job)
// column exists, but if_not_exists flags is true, so we ignore this error.
if infoschema.ErrColumnExists.Equal(err) && spec.IfNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
Loading