Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: make meta mutator part of jobContext #56399

Merged
merged 4 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/exprctx"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/metabuild"
"github.com/pingcap/tidb/pkg/meta/model"
Expand All @@ -52,10 +51,10 @@ import (
"go.uber.org/zap"
)

func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
func onAddColumn(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(jobCtx, t, job)
ver, err = onDropColumn(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -69,7 +68,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
}
})

tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(t, job)
tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(jobCtx.metaMut, job)
if err != nil {
if ifNotExists && infoschema.ErrColumnExists.Equal(err) {
job.Warning = toTError(err)
Expand All @@ -92,15 +91,15 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
case model.StateNone:
// none -> delete only
columnInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
columnInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -109,7 +108,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
case model.StateWriteOnly:
// write only -> reorganization
columnInfo.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -126,7 +125,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64
}
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []
// 2. before flashback start, check timestamp, disable GC and close PD schedule, get flashback key ranges.
// 3. phase 1, lock flashback key ranges.
// 4. phase 2, send flashback RPC, do flashback jobs.
func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -768,7 +768,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, t, job, flashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, jobCtx.metaMut, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -785,13 +785,13 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
}
job.Args[keyRangesOffset] = keyRanges
job.SchemaState = model.StateWriteOnly
return updateSchemaVersion(jobCtx, t, job)
return updateSchemaVersion(jobCtx, job)
// Stage 3, lock related key ranges.
case model.StateWriteOnly:
// TODO: Support flashback in unistore.
if inFlashbackTest {
job.SchemaState = model.StateWriteReorganization
return updateSchemaVersion(jobCtx, t, job)
return updateSchemaVersion(jobCtx, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(w.ctx, jobCtx.store, keyRanges)
Expand Down Expand Up @@ -847,7 +847,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
asyncNotifyEvent(jobCtx, notifier.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, t, job)
return updateSchemaVersion(jobCtx, job)
}
return ver, nil
}
Expand Down
41 changes: 21 additions & 20 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func checkDropColumnForStatePublic(colInfo *model.ColumnInfo) (err error) {
return nil
}

func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(jobCtx, t, job)
func onDropColumn(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(jobCtx, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
// Convert the "not exists" error to a warning.
Expand All @@ -153,7 +153,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
job.MarkNonRevertible()
job.SchemaState = colInfo.State
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, false)
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
}

originalState := colInfo.State
Expand All @@ -167,7 +167,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
if err != nil {
return ver, errors.Trace(err)
}
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -185,7 +185,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
}
tblInfo.Indices = newIndices
}
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -199,7 +199,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -209,7 +209,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -234,9 +234,9 @@ func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int6
return ver, errors.Trace(err)
}

func checkDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
func checkDropColumn(jobCtx *jobContext, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, schemaID)
if err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
Expand Down Expand Up @@ -291,15 +291,15 @@ func isDroppableColumn(tblInfo *model.TableInfo, colName pmodel.CIStr) error {
return nil
}

func onSetDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
func onSetDefaultValue(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

return updateColumnDefaultValue(jobCtx, t, job, newCol, &newCol.Name)
return updateColumnDefaultValue(jobCtx, job, newCol, &newCol.Name)
}

func setIdxIDName(idxInfo *model.IndexInfo, newID int64, newName pmodel.CIStr) {
Expand Down Expand Up @@ -896,17 +896,18 @@ func updateChangingObjState(changingCol *model.ColumnInfo, changingIdxs []*model
}
}

func checkAndApplyAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
func checkAndApplyAutoRandomBits(jobCtx *jobContext, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
oldCol *model.ColumnInfo, newCol *model.ColumnInfo, newAutoRandBits uint64) error {
if newAutoRandBits == 0 {
return nil
}
idAcc := m.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID)
metaMut := jobCtx.metaMut
idAcc := metaMut.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID)
err := checkNewAutoRandomBits(idAcc, oldCol, newCol, newAutoRandBits, tblInfo.AutoRandomRangeBits, tblInfo.SepAutoInc())
if err != nil {
return err
}
return applyNewAutoRandomBits(jobCtx, m, dbInfo, tblInfo, oldCol, newAutoRandBits)
return applyNewAutoRandomBits(jobCtx, dbInfo, tblInfo, oldCol, newAutoRandBits)
}

// checkNewAutoRandomBits checks whether the new auto_random bits number can cause overflow.
Expand Down Expand Up @@ -967,7 +968,7 @@ func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {

// applyNewAutoRandomBits set auto_random bits to TableInfo and
// migrate auto_increment ID to auto_random ID if possible.
func applyNewAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.DBInfo,
func applyNewAutoRandomBits(jobCtx *jobContext, dbInfo *model.DBInfo,
tblInfo *model.TableInfo, oldCol *model.ColumnInfo, newAutoRandBits uint64) error {
tblInfo.AutoRandomBits = newAutoRandBits
needMigrateFromAutoIncToAutoRand := mysql.HasAutoIncrementFlag(oldCol.GetFlag())
Expand All @@ -979,7 +980,7 @@ func applyNewAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.D
errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
}
idAcc := m.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID).RowID()
idAcc := jobCtx.metaMut.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID).RowID()
nextAutoIncID, err := idAcc.Get()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1037,16 +1038,16 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc
return nil
}

func updateColumnDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Job, newCol *model.ColumnInfo, oldColName *pmodel.CIStr) (ver int64, _ error) {
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
func updateColumnDefaultValue(jobCtx *jobContext, job *model.Job, newCol *model.ColumnInfo, oldColName *pmodel.CIStr) (ver int64, _ error) {
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, false)
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
}

oldCol := model.FindColumnInfo(tblInfo.Columns, oldColName.L)
Expand Down Expand Up @@ -1078,7 +1079,7 @@ func updateColumnDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Jo
}
}

ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
36 changes: 18 additions & 18 deletions pkg/ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
)

func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
func (w *worker) onAddCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
return rollingBackAddConstraint(jobCtx, t, job)
return rollingBackAddConstraint(jobCtx, job)
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
Expand All @@ -44,7 +44,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
}
})

dbInfo, tblInfo, constraintInfoInMeta, constraintInfoInJob, err := checkAddCheckConstraint(t, job)
dbInfo, tblInfo, constraintInfoInMeta, constraintInfoInJob, err := checkAddCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
// If not enforced, add it directly.
if !constraintInfoInMeta.Enforced {
constraintInfoInMeta.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -94,11 +94,11 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
case model.StateNone:
job.SchemaState = model.StateWriteOnly
constraintInfoInMeta.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
job.SchemaState = model.StateWriteReorganization
constraintInfoInMeta.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfoInMeta)
if err != nil {
Expand All @@ -108,7 +108,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *
return ver, errors.Trace(err)
}
constraintInfoInMeta.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -162,8 +162,8 @@ func checkAddCheckConstraint(t *meta.Mutator, job *model.Job) (*model.DBInfo, *m
// onDropCheckConstraint can be called from two case:
// 1: rollback in add constraint.(in rollback function the job.args will be changed)
// 2: user drop constraint ddl.
func onDropCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
tblInfo, constraintInfo, err := checkDropCheckConstraint(t, job)
func onDropCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
tblInfo, constraintInfo, err := checkDropCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -172,7 +172,7 @@ func onDropCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job)
case model.StatePublic:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// write only state constraint will still take effect to check the newly inserted data.
// So the dependent column shouldn't be dropped even in this intermediate state.
Expand All @@ -183,7 +183,7 @@ func onDropCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job)
tblInfo.Constraints = append(tblInfo.Constraints[0:i], tblInfo.Constraints[i+1:]...)
}
}
ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -216,14 +216,14 @@ func checkDropCheckConstraint(t *meta.Mutator, job *model.Job) (*model.TableInfo
return tblInfo, constraintInfo, nil
}

func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(t, job)
func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}

if job.IsRollingback() {
return rollingBackAlterConstraint(jobCtx, t, job)
return rollingBackAlterConstraint(jobCtx, job)
}

// Current State is desired.
Expand All @@ -239,11 +239,11 @@ func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job
job.SchemaState = model.StateWriteReorganization
constraintInfo.State = model.StateWriteReorganization
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo)
if err != nil {
Expand All @@ -253,15 +253,15 @@ func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, t *meta.Mutator, job
return ver, errors.Trace(err)
}
constraintInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
} else {
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
Expand Down
Loading