Skip to content

Commit

Permalink
ddl: implement the core for multi-schema change (#35429)
Browse files Browse the repository at this point in the history
ref #14766
  • Loading branch information
tangenta committed Jun 24, 2022
1 parent e752e1c commit 2c4d1df
Show file tree
Hide file tree
Showing 15 changed files with 526 additions and 70 deletions.
10 changes: 5 additions & 5 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ var allTestCase = []testCancelJob{
{"alter table t_partition truncate partition p3", true, model.StateNone, true, false, nil},
{"alter table t_partition truncate partition p3", false, model.StatePublic, false, true, nil},
// Add columns.
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateNone, true, false, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateDeleteOnly, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteOnly, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteReorganization, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", false, model.StatePublic, false, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateNone, model.StateNone}, true, false, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateDeleteOnly, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteOnly, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteReorganization, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", false, subStates{model.StatePublic, model.StatePublic}, false, true, nil},
// Drop columns.
// TODO: fix schema state.
{"alter table t drop column c41, drop column c42", true, model.StateNone, true, false, nil},
Expand Down
76 changes: 56 additions & 20 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) {
tblInfo.Columns = newCols
}

func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) {
func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) {
// Check column name duplicate.
cols := tblInfo.Columns
offset := len(cols)
Expand Down Expand Up @@ -115,30 +115,52 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *
return colInfo, pos, offset, nil
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) {
func initAndAddColumnToTable(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo {
cols := tblInfo.Columns
colInfo.ID = allocateColumnID(tblInfo)
colInfo.State = model.StateNone
// To support add column asynchronous, we should mark its offset as the last column.
// So that we can use origin column offset to get value from row.
colInfo.Offset = len(cols)
// Append the column info to the end of the tblInfo.Columns.
// It will reorder to the right offset in "Columns" when it state change to public.
tblInfo.Columns = append(cols, colInfo)
return colInfo
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo,
*ast.ColumnPosition, bool /* ifNotExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, 0, errors.Trace(err)
return nil, nil, nil, nil, false, errors.Trace(err)
}
col := &model.ColumnInfo{}
pos := &ast.ColumnPosition{}
offset := 0
err = job.DecodeArgs(col, pos, &offset)
ifNotExists := false
err = job.DecodeArgs(col, pos, &offset, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, 0, errors.Trace(err)
return nil, nil, nil, nil, false, errors.Trace(err)
}

columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
return nil, nil, nil, nil, 0, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
return nil, nil, nil, nil, ifNotExists, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
}
return tblInfo, columnInfo, col, pos, offset, nil

err = checkAfterPositionExists(tblInfo, pos)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, false, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}

return tblInfo, columnInfo, col, pos, false, nil
}

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
Expand All @@ -157,21 +179,18 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}
})

tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job)
tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(t, job)
if err != nil {
if ifNotExists && infoschema.ErrColumnExists.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}
return ver, errors.Trace(err)
}
if columnInfo == nil {
columnInfo, _, offset, err = createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
// Set offset arg to job.
if offset != 0 {
job.Args = []interface{}{columnInfo, pos, offset}
}
columnInfo = initAndAddColumnToTable(tblInfo, colFromArgs)
logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo))
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -206,9 +225,14 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}
// Update the job state when all affairs done.
job.SchemaState = model.StateWriteReorganization
job.MarkNonRevertible()
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offset.
offset, err := locateOffsetToMove(columnInfo.Offset, pos, tblInfo)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfo.State)
Expand Down Expand Up @@ -276,6 +300,18 @@ func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
}
}

// checkAfterPositionExists makes sure the column specified in AFTER clause is exists.
// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1.
func checkAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error {
if pos != nil && pos.Tp == ast.ColumnPositionAfter {
c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
if c == nil {
return infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}
}
return nil
}

func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
for _, indexInfo := range indexInfos {
indexInfo.State = state
Expand Down Expand Up @@ -308,7 +344,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfo(tblInfo, columns[i], positions[i])
columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -856,7 +892,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
return ver, errors.Trace(err)
}

_, _, _, err = createColumnInfo(tblInfo, modifyInfo.changingCol, changingColPos)
_, _, _, err = createColumnInfoWithPosCheck(tblInfo, modifyInfo.changingCol, changingColPos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
8 changes: 7 additions & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,13 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) {
require.NoError(t, err)
require.Equal(t, historyJob.State, model.JobStateSynced)
if isAdd {
require.Equal(t, historyJob.SchemaState, model.StatePublic)
if historyJob.Type == model.ActionMultiSchemaChange {
for _, sub := range historyJob.MultiSchemaInfo.SubJobs {
require.Equal(t, sub.SchemaState, model.StatePublic)
}
} else {
require.Equal(t, historyJob.SchemaState, model.StatePublic)
}
} else {
require.Equal(t, historyJob.SchemaState, model.StateNone)
}
Expand Down
14 changes: 11 additions & 3 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,11 @@ func runTestInSchemaState(
_, err = se.Execute(context.Background(), "use test_db_state")
require.NoError(t, err)
cbFunc := func(job *model.Job) {
if job.SchemaState == prevState || checkErr != nil {
if jobStateOrLastSubJobState(job) == prevState || checkErr != nil {
return
}
prevState = job.SchemaState
if job.SchemaState != state {
prevState = jobStateOrLastSubJobState(job)
if prevState != state {
return
}
for _, sqlWithErr := range sqlWithErrs {
Expand Down Expand Up @@ -877,6 +877,14 @@ func runTestInSchemaState(
}
}

func jobStateOrLastSubJobState(job *model.Job) model.SchemaState {
if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
subs := job.MultiSchemaInfo.SubJobs
return subs[len(subs)-1].SchemaState
}
return job.SchemaState
}

func TestShowIndex(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond)
defer clean()
Expand Down
3 changes: 1 addition & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestIssue5092(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// The following two statements are consistent with MariaDB.
tk.MustGetErrCode("alter table t_issue_5092 add column if not exists d int, add column d int", errno.ErrDupFieldName)
tk.MustExec("alter table t_issue_5092 add column dd int, add column if not exists dd int")
tk.MustGetErrCode("alter table t_issue_5092 add column dd int, add column if not exists dd int", errno.ErrUnsupportedDDLOperation)
tk.MustExec("alter table t_issue_5092 add column if not exists (d int, e int), add column ff text")
tk.MustExec("alter table t_issue_5092 add column b2 int after b1, add column c2 int first")
tk.MustQuery("show create table t_issue_5092").Check(testkit.Rows("t_issue_5092 CREATE TABLE `t_issue_5092` (\n" +
Expand All @@ -417,7 +417,6 @@ func TestIssue5092(t *testing.T) {
" `c1` int(11) DEFAULT NULL,\n" +
" `f` int(11) DEFAULT NULL,\n" +
" `g` int(11) DEFAULT NULL,\n" +
" `dd` int(11) DEFAULT NULL,\n" +
" `ff` text DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table t_issue_5092")
Expand Down
13 changes: 7 additions & 6 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,12 @@ func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}

// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
Expand Down Expand Up @@ -786,12 +792,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
}
}

if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
appendMultiChangeWarningsToOwnerCtx(ctx, historyJob)

logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID))
return nil
Expand Down
45 changes: 30 additions & 15 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2903,8 +2903,21 @@ func needToOverwriteColCharset(options []*ast.TableOption) bool {
return false
}

// resolveAlterTableAddColumns splits "add columns" to multiple spec. For example,
// `ALTER TABLE ADD COLUMN (c1 INT, c2 INT)` is split into
// `ALTER TABLE ADD COLUMN c1 INT, ADD COLUMN c2 INT`.
func resolveAlterTableAddColumns(spec *ast.AlterTableSpec) []*ast.AlterTableSpec {
specs := make([]*ast.AlterTableSpec, len(spec.NewColumns))
for i, col := range spec.NewColumns {
t := *spec
t.NewColumns = []*ast.ColumnDef{col}
specs[i] = &t
}
return specs
}

// resolveAlterTableSpec resolves alter table algorithm and removes ignore table spec in specs.
// returns valied specs, and the occurred error.
// returns valid specs, and the occurred error.
func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) ([]*ast.AlterTableSpec, error) {
validSpecs := make([]*ast.AlterTableSpec, 0, len(specs))
algorithm := ast.AlgorithmTypeDefault
Expand All @@ -2916,7 +2929,11 @@ func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec)
if isIgnorableSpec(spec.Tp) {
continue
}
validSpecs = append(validSpecs, spec)
if spec.Tp == ast.AlterTableAddColumns && len(spec.NewColumns) > 1 {
validSpecs = append(validSpecs, resolveAlterTableAddColumns(spec)...)
} else {
validSpecs = append(validSpecs, spec)
}
}

// Verify whether the algorithm is supported.
Expand Down Expand Up @@ -2997,9 +3014,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
}

if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns:
err = d.AddColumns(sctx, ident, validSpecs)
useMultiSchemaChange = true
case ast.AlterTableDropColumn:
err = d.DropColumns(sctx, ident, validSpecs)
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
Expand All @@ -3010,7 +3028,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
if err != nil {
return errors.Trace(err)
}
return nil
if !useMultiSchemaChange {
return nil
}
}

if len(validSpecs) > 1 {
Expand All @@ -3021,11 +3041,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
var handledCharsetOrCollate bool
switch spec.Tp {
case ast.AlterTableAddColumns:
if len(spec.NewColumns) != 1 {
err = d.AddColumns(sctx, ident, []*ast.AlterTableSpec{spec})
} else {
err = d.AddColumn(sctx, ident, spec)
}
err = d.AddColumn(sctx, ident, spec)
case ast.AlterTableAddPartitions:
err = d.AddTablePartitions(sctx, ident, spec)
case ast.AlterTableCoalescePartitions:
Expand Down Expand Up @@ -3476,6 +3492,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if col == nil {
return nil
}
err = checkAfterPositionExists(t.Meta(), spec.Position)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
Expand All @@ -3484,15 +3504,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
TableName: t.Meta().Name.L,
Type: model.ActionAddColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, spec.Position, 0},
Args: []interface{}{col, spec.Position, 0, spec.IfNotExists},
}

err = d.DoDDLJob(ctx, job)
// column exists, but if_not_exists flags is true, so we ignore this error.
if infoschema.ErrColumnExists.Equal(err) && spec.IfNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 2c4d1df

Please sign in to comment.