Skip to content

Commit

Permalink
ddl: refactor job args for alter sequence/auto-id-cache/shard-rowid/c…
Browse files Browse the repository at this point in the history
…ache/ttl related ddl job. (#56206)

ref #53930
  • Loading branch information
joccau authored Sep 25, 2024
1 parent a0a741e commit a159c49
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 44 deletions.
44 changes: 27 additions & 17 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2196,17 +2196,18 @@ func (e *executor) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal
return err
}
job := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionShardRowID,
SchemaID: schema.ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: tbInfo.Name.L,
BinlogInfo: &model.HistoryInfo{},
Args: []any{uVal},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.ShardRowIDArgs{ShardRowIDBits: uVal}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -3565,18 +3566,20 @@ func (e *executor) AlterTableAutoIDCache(ctx sessionctx.Context, ident ast.Ident
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionModifyTableAutoIDCache,
BinlogInfo: &model.HistoryInfo{},
Args: []any{newCache},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.ModifyTableAutoIDCacheArgs{
NewCache: newCache,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -3753,18 +3756,22 @@ func (e *executor) AlterTableTTLInfoOrEnable(ctx sessionctx.Context, ident ast.I
}

job = &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []any{ttlInfo, ttlEnable, ttlCronJobSchedule},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AlterTTLInfoArgs{
TTLInfo: ttlInfo,
TTLEnable: ttlEnable,
TTLCronJobSchedule: ttlCronJobSchedule,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand All @@ -3787,6 +3794,7 @@ func (e *executor) AlterTableRemoveTTL(ctx sessionctx.Context, ident ast.Ident)

if tblInfo.TTLInfo != nil {
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
Expand All @@ -3796,7 +3804,7 @@ func (e *executor) AlterTableRemoveTTL(ctx sessionctx.Context, ident ast.Ident)
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, &model.EmptyArgs{})
return errors.Trace(err)
}

Expand Down Expand Up @@ -5607,18 +5615,21 @@ func (e *executor) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequence
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: db.ID,
TableID: tbl.Meta().ID,
SchemaName: db.Name.L,
TableName: tbl.Meta().Name.L,
Type: model.ActionAlterSequence,
BinlogInfo: &model.HistoryInfo{},
Args: []any{ident, stmt.SeqOptions},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AlterSequenceArgs{
Ident: ident,
SeqOptions: stmt.SeqOptions,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -6106,18 +6117,18 @@ func (e *executor) AlterTableCache(sctx sessionctx.Context, ti ast.Ident) (err e
sctx.SetValue(sessionctx.QueryString, ddlQuery)

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableID: t.Meta().ID,
Type: model.ActionAlterCacheTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}

return e.DoDDLJob(sctx, job)
return e.doDDLJob2(sctx, job, &model.EmptyArgs{})
}

func checkCacheTableSize(store kv.Storage, tableID int64) (bool, error) {
Expand Down Expand Up @@ -6166,18 +6177,17 @@ func (e *executor) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableID: t.Meta().ID,
Type: model.ActionAlterNoCacheTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

return e.DoDDLJob(ctx, job)
return e.doDDLJob2(ctx, job, &model.EmptyArgs{})
}

func (e *executor) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrName pmodel.CIStr, constr *ast.Constraint) error {
Expand Down
8 changes: 3 additions & 5 deletions pkg/ddl/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,13 @@ func alterSequenceOptions(sequenceOptions []*ast.SequenceOption, ident ast.Ident

func onAlterSequence(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
var (
sequenceOpts []*ast.SequenceOption
ident ast.Ident
)
if err := job.DecodeArgs(&ident, &sequenceOpts); err != nil {
args, err := model.GetAlterSequenceArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
ident, sequenceOpts := args.Ident, args.SeqOptions

// Get the old tableInfo.
tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, schemaID)
Expand Down
11 changes: 6 additions & 5 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ func onRebaseAutoID(jobCtx *jobContext, t *meta.Meta, job *model.Job, tp autoid.
}

func onModifyTableAutoIDCache(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, error) {
var cache int64
if err := job.DecodeArgs(&cache); err != nil {
args, err := model.GetModifyTableAutoIDCacheArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
Expand All @@ -670,7 +670,7 @@ func onModifyTableAutoIDCache(jobCtx *jobContext, t *meta.Meta, job *model.Job)
return 0, errors.Trace(err)
}

tblInfo.AutoIDCache = cache
tblInfo.AutoIDCache = args.NewCache
ver, err := updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -680,12 +680,13 @@ func onModifyTableAutoIDCache(jobCtx *jobContext, t *meta.Meta, job *model.Job)
}

func (w *worker) onShardRowID(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var shardRowIDBits uint64
err := job.DecodeArgs(&shardRowIDBits)
args, err := model.GetShardRowIDArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

shardRowIDBits := args.ShardRowIDBits
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
Expand Down
23 changes: 13 additions & 10 deletions pkg/ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,17 @@ func testAlterCacheTable(
tblInfo *model.TableInfo,
) *model.Job {
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: newSchemaID,
TableID: tblInfo.ID,
Type: model.ActionAlterCacheTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: newSchemaName.L, Table: tblInfo.Name.L},
},
}
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))
err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, &model.EmptyArgs{}, true))
require.NoError(t, err)

v := getSchemaVer(t, ctx)
Expand All @@ -430,17 +430,17 @@ func testAlterNoCacheTable(
tblInfo *model.TableInfo,
) *model.Job {
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: newSchemaID,
TableID: tblInfo.ID,
Type: model.ActionAlterNoCacheTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: newSchemaName.L, Table: tblInfo.Name.L},
},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)))
require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, &model.EmptyArgs{}, true)))

v := getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v})
Expand Down Expand Up @@ -577,20 +577,23 @@ func TestAlterTTL(t *testing.T) {
}

job = &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
TableID: tblInfo.ID,
TableName: tblInfo.Name.L,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []any{&model.TTLInfo{
}
ctx.SetValue(sessionctx.QueryString, "skip")
args := &model.AlterTTLInfoArgs{
TTLInfo: &model.TTLInfo{
ColumnName: tblInfo.Columns[1].Name,
IntervalExprStr: "1",
IntervalTimeUnit: int(ast.TimeUnitYear),
}},
},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)))
require.NoError(t, de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)))

v := getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})
Expand All @@ -602,16 +605,16 @@ func TestAlterTTL(t *testing.T) {

// submit a ddl job to modify ttlEnabled
job = &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
TableID: tblInfo.ID,
TableName: tblInfo.Name.L,
Type: model.ActionAlterTTLRemove,
BinlogInfo: &model.HistoryInfo{},
Args: []any{true},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)))
require.NoError(t, de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, &model.EmptyArgs{}, true)))

v = getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})
Expand Down
8 changes: 3 additions & 5 deletions pkg/ddl/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@ func onTTLInfoRemove(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int6

func onTTLInfoChange(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) {
// at least one for them is not nil
var ttlInfo *model.TTLInfo
var ttlInfoEnable *bool
var ttlInfoJobInterval *string

if err := job.DecodeArgs(&ttlInfo, &ttlInfoEnable, &ttlInfoJobInterval); err != nil {
args, err := model.GetAlterTTLInfoArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
ttlInfo, ttlInfoEnable, ttlInfoJobInterval := args.TTLInfo, args.TTLEnable, args.TTLCronJobSchedule

tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 47,
shard_count = 50,
deps = [
"//pkg/parser/ast",
"//pkg/parser/charset",
Expand Down
Loading

0 comments on commit a159c49

Please sign in to comment.