Skip to content

Commit

Permalink
ddl: make meta mutator part of jobContext (pingcap#56399)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and EricZequan committed Sep 29, 2024
1 parent c1117da commit 20326cc
Show file tree
Hide file tree
Showing 22 changed files with 634 additions and 625 deletions.
15 changes: 7 additions & 8 deletions pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/exprctx"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/metabuild"
"github.com/pingcap/tidb/pkg/meta/model"
Expand All @@ -52,10 +51,10 @@ import (
"go.uber.org/zap"
)

func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
func onAddColumn(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(jobCtx, t, job)
ver, err = onDropColumn(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -69,7 +68,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
}
})

tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(t, job)
tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(jobCtx.metaMut, job)
if err != nil {
if ifNotExists && infoschema.ErrColumnExists.Equal(err) {
job.Warning = toTError(err)
Expand All @@ -92,15 +91,15 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
case model.StateNone:
// none -> delete only
columnInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
columnInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -109,7 +108,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
case model.StateWriteOnly:
// write only -> reorganization
columnInfo.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -126,7 +125,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
}
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []
// 2. before flashback start, check timestamp, disable GC and close PD schedule, get flashback key ranges.
// 3. phase 1, lock flashback key ranges.
// 4. phase 2, send flashback RPC, do flashback jobs.
func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -768,7 +768,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, t, job, flashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, jobCtx.metaMut, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -785,13 +785,13 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
}
job.Args[keyRangesOffset] = keyRanges
job.SchemaState = model.StateWriteOnly
return updateSchemaVersion(jobCtx, t, job)
return updateSchemaVersion(jobCtx, job)
// Stage 3, lock related key ranges.
case model.StateWriteOnly:
// TODO: Support flashback in unistore.
if inFlashbackTest {
job.SchemaState = model.StateWriteReorganization
return updateSchemaVersion(jobCtx, t, job)
return updateSchemaVersion(jobCtx, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(w.ctx, jobCtx.store, keyRanges)
Expand Down Expand Up @@ -847,7 +847,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
asyncNotifyEvent(jobCtx, notifier.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, t, job)
return updateSchemaVersion(jobCtx, job)
}
return ver, nil
}
Expand Down
41 changes: 21 additions & 20 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func checkDropColumnForStatePublic(colInfo *model.ColumnInfo) (err error) {
return nil
}

func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(jobCtx, t, job)
func onDropColumn(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(jobCtx, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
// Convert the "not exists" error to a warning.
Expand All @@ -153,7 +153,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
job.MarkNonRevertible()
job.SchemaState = colInfo.State
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, false)
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
}

originalState := colInfo.State
Expand All @@ -167,7 +167,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
if err != nil {
return ver, errors.Trace(err)
}
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -185,7 +185,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
}
tblInfo.Indices = newIndices
}
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -199,7 +199,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -209,7 +209,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -234,9 +234,9 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
return ver, errors.Trace(err)
}

func checkDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
func checkDropColumn(jobCtx *jobContext, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, schemaID)
if err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
Expand Down Expand Up @@ -291,15 +291,15 @@ func isDroppableColumn(tblInfo *model.TableInfo, colName pmodel.CIStr) error {
return nil
}

func onSetDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
func onSetDefaultValue(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

return updateColumnDefaultValue(jobCtx, t, job, newCol, &newCol.Name)
return updateColumnDefaultValue(jobCtx, job, newCol, &newCol.Name)
}

func setIdxIDName(idxInfo *model.IndexInfo, newID int64, newName pmodel.CIStr) {
Expand Down Expand Up @@ -896,17 +896,18 @@ func updateChangingObjState(changingCol *model.ColumnInfo, changingIdxs []*model
}
}

func checkAndApplyAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
func checkAndApplyAutoRandomBits(jobCtx *jobContext, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
oldCol *model.ColumnInfo, newCol *model.ColumnInfo, newAutoRandBits uint64) error {
if newAutoRandBits == 0 {
return nil
}
idAcc := m.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID)
metaMut := jobCtx.metaMut
idAcc := metaMut.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID)
err := checkNewAutoRandomBits(idAcc, oldCol, newCol, newAutoRandBits, tblInfo.AutoRandomRangeBits, tblInfo.SepAutoInc())
if err != nil {
return err
}
return applyNewAutoRandomBits(jobCtx, m, dbInfo, tblInfo, oldCol, newAutoRandBits)
return applyNewAutoRandomBits(jobCtx, dbInfo, tblInfo, oldCol, newAutoRandBits)
}

// checkNewAutoRandomBits checks whether the new auto_random bits number can cause overflow.
Expand Down Expand Up @@ -967,7 +968,7 @@ func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {

// applyNewAutoRandomBits set auto_random bits to TableInfo and
// migrate auto_increment ID to auto_random ID if possible.
func applyNewAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.DBInfo,
func applyNewAutoRandomBits(jobCtx *jobContext, dbInfo *model.DBInfo,
tblInfo *model.TableInfo, oldCol *model.ColumnInfo, newAutoRandBits uint64) error {
tblInfo.AutoRandomBits = newAutoRandBits
needMigrateFromAutoIncToAutoRand := mysql.HasAutoIncrementFlag(oldCol.GetFlag())
Expand All @@ -979,7 +980,7 @@ func applyNewAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.D
errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
}
idAcc := m.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID).RowID()
idAcc := jobCtx.metaMut.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID).RowID()
nextAutoIncID, err := idAcc.Get()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1037,16 +1038,16 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc
return nil
}

func updateColumnDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Job, newCol *model.ColumnInfo, oldColName *pmodel.CIStr) (ver int64, _ error) {
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
func updateColumnDefaultValue(jobCtx *jobContext, job *model.Job, newCol *model.ColumnInfo, oldColName *pmodel.CIStr) (ver int64, _ error) {
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, false)
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
}

oldCol := model.FindColumnInfo(tblInfo.Columns, oldColName.L)
Expand Down Expand Up @@ -1078,7 +1079,7 @@ func updateColumnDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Jo
}
}

ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
36 changes: 18 additions & 18 deletions pkg/ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
)

func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
func (w *worker) onAddCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
return rollingBackAddConstraint(jobCtx, t, job)
return rollingBackAddConstraint(jobCtx, job)
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
Expand All @@ -44,7 +44,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
}
})

dbInfo, tblInfo, constraintInfoInMeta, constraintInfoInJob, err := checkAddCheckConstraint(t, job)
dbInfo, tblInfo, constraintInfoInMeta, constraintInfoInJob, err := checkAddCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
// If not enforced, add it directly.
if !constraintInfoInMeta.Enforced {
constraintInfoInMeta.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -94,11 +94,11 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
case model.StateNone:
job.SchemaState = model.StateWriteOnly
constraintInfoInMeta.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
job.SchemaState = model.StateWriteReorganization
constraintInfoInMeta.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfoInMeta)
if err != nil {
Expand All @@ -108,7 +108,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
return ver, errors.Trace(err)
}
constraintInfoInMeta.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -162,8 +162,8 @@ func checkAddCheckConstraint(t *meta.Mutator, job *model.Job) (*model.DBInfo, *m
// onDropCheckConstraint can be called from two case:
// 1: rollback in add constraint.(in rollback function the job.args will be changed)
// 2: user drop constraint ddl.
func onDropCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
tblInfo, constraintInfo, err := checkDropCheckConstraint(t, job)
func onDropCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
tblInfo, constraintInfo, err := checkDropCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -172,7 +172,7 @@ func onDropCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job)
case model.StatePublic:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// write only state constraint will still take effect to check the newly inserted data.
// So the dependent column shouldn't be dropped even in this intermediate state.
Expand All @@ -183,7 +183,7 @@ func onDropCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job)
tblInfo.Constraints = append(tblInfo.Constraints[0:i], tblInfo.Constraints[i+1:]...)
}
}
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -216,14 +216,14 @@ func checkDropCheckConstraint(t *meta.Mutator, job *model.Job) (*model.TableInfo
return tblInfo, constraintInfo, nil
}

func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(t, job)
func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}

if job.IsRollingback() {
return rollingBackAlterConstraint(jobCtx, t, job)
return rollingBackAlterConstraint(jobCtx, job)
}

// Current State is desired.
Expand All @@ -239,11 +239,11 @@ func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job
job.SchemaState = model.StateWriteReorganization
constraintInfo.State = model.StateWriteReorganization
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo)
if err != nil {
Expand All @@ -253,15 +253,15 @@ func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job
return ver, errors.Trace(err)
}
constraintInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
} else {
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
Expand Down
Loading

0 comments on commit 20326cc

Please sign in to comment.