Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add job version 2 and use typed args for truncate table #55854

Merged
merged 12 commits into from
Sep 9, 2024
Merged
2 changes: 0 additions & 2 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ import (
)

const (
// currentVersion is for all new DDL jobs.
currentVersion = 1
// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
DDLOwnerKey = "/tidb/ddl/fg/owner"
ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock"
Expand Down
26 changes: 25 additions & 1 deletion pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(err)
}
}
case model.ActionDropTable, model.ActionTruncateTable:
case model.ActionDropTable:
tableID := job.TableID
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
var startKey kv.Key
Expand All @@ -313,6 +313,30 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
case model.ActionTruncateTable:
tableID := job.TableID
var oldPartitionIDs []int64
if job.Version == model.JobVersion1 {
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
var startKey kv.Key
if err := job.DecodeArgs(&startKey, &oldPartitionIDs); err != nil {
return errors.Trace(err)
}
} else {
argsV2, err := model.GetOrDecodeArgsV2[model.TruncateTableArgs](job)
if err != nil {
return errors.Trace(err)
}
oldPartitionIDs = argsV2.OldPartitionIDs
}
if len(oldPartitionIDs) > 0 {
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, oldPartitionIDs, ea, "truncate table: partition table IDs"); err != nil {
return errors.Trace(err)
}
}
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
Expand Down
47 changes: 34 additions & 13 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4186,24 +4186,35 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
return errors.Trace(dbterror.ErrTruncateIllegalForeignKey.GenWithStackByArgs(msg))
}

var partCount int
var oldPartitionIDs []int64
if tblInfo.Partition != nil {
partCount = len(tblInfo.Partition.Definitions)
oldPartitionIDs = make([]int64, 0, len(tblInfo.Partition.Definitions))
for _, def := range tblInfo.Partition.Definitions {
oldPartitionIDs = append(oldPartitionIDs, def.ID)
}
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
if job.Version == model.JobVersion1 {
// Args[0] is the new table ID, args[2] is the ids for table partitions, we
// add a placeholder here, they will be filled by job submitter.
// the last param is not required for execution, we need it to calculate
// number of new IDs to generate.
Args: []any{int64(0), fkCheck, []int64{}, partCount},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
job.Args = []any{int64(0), fkCheck, []int64{}, len(oldPartitionIDs)}
} else if job.Version == model.JobVersion2 {
job.ArgsV2 = &model.TruncateTableArgs{
FKCheck: fkCheck,
OldPartitionIDs: oldPartitionIDs,
}
}
err = e.DoDDLJob(ctx, job)
if err != nil {
Expand Down Expand Up @@ -6426,7 +6437,12 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re
func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) {
if jobW.Type == model.ActionTruncateTable {
if ok, lockTp := ctx.CheckTableLocked(jobW.TableID); ok {
newTableID := jobW.Args[0].(int64)
var newTableID int64
if jobW.Version == model.JobVersion1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like GetTruncateTableArgsBeforeRun, this part can also be encapsulated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newTableID = jobW.Args[0].(int64)
} else {
newTableID = jobW.ArgsV2.(*model.TruncateTableArgs).NewTableID
}
ctx.AddTableLock([]model.TableLockTpInfo{{SchemaID: jobW.SchemaID, TableID: newTableID, Tp: lockTp}})
}
}
Expand All @@ -6437,7 +6453,12 @@ func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) {
func HandleLockTablesOnFinish(ctx sessionctx.Context, jobW *JobWrapper, ddlErr error) {
if jobW.Type == model.ActionTruncateTable {
if ddlErr != nil {
newTableID := jobW.Args[0].(int64)
var newTableID int64
if jobW.Version == model.JobVersion1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

newTableID = jobW.Args[0].(int64)
} else {
newTableID = jobW.ArgsV2.(*model.TruncateTableArgs).NewTableID
}
ctx.ReleaseTableLockByTableIDs([]int64{newTableID})
return
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/ddl/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,19 @@ func TestHandleLockTable(t *testing.T) {
require.True(t, locked)
require.Equal(t, tp, lockType)
}
jobW := ddl.NewJobWrapper(&model.Job{Type: model.ActionTruncateTable, TableID: 1, Args: []any{int64(2)}}, false)
job := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionTruncateTable,
TableID: 1,
}
if job.Version == model.JobVersion1 {
job.Args = []any{int64(2)}
} else {
job.ArgsV2 = &model.TruncateTableArgs{
NewTableID: 2,
}
}
jobW := ddl.NewJobWrapper(job, false)

t.Run("target table not locked", func(t *testing.T) {
se.ReleaseAllTableLocks()
Expand Down
8 changes: 7 additions & 1 deletion pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,13 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
if schema {
return strconv.FormatInt(job.SchemaID, 10)
}
return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(job.Args[0].(int64), 10)
var newTableID int64
if job.Version == model.JobVersion1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

newTableID = job.Args[0].(int64)
} else {
newTableID = job.ArgsV2.(*model.TruncateTableArgs).NewTableID
}
return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
}
if schema {
return strconv.FormatInt(job.SchemaID, 10)
Expand Down
45 changes: 35 additions & 10 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {

for _, jobW := range jobWs {
job := jobW.Job
job.Version = currentVersion
if job.Version == 0 {
// if not set, fix it to version 1
// TODO remove this after we add code v2 for all jobs.
job.Version = model.JobVersion1
}
job.StartTS = startTS
job.BDRRole = bdrRole

Expand Down Expand Up @@ -359,6 +363,14 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {
return kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)

for _, jobW := range jobWs {
if jobW.Version == 0 {
// if not set, fix it to version 1
// TODO remove this after we add code v2 for all jobs.
jobW.Version = model.JobVersion1
}
}

count := getRequiredGIDCount(jobWs)
ids, err := t.GenGlobalIDs(count)
if err != nil {
Expand All @@ -372,7 +384,6 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {

for _, jobW := range jobWs {
job := jobW.Job
job.Version = currentVersion
job.StartTS = txn.StartTS()
setJobStateToQueueing(job)
if err = buildJobDependence(t, job); err != nil {
Expand Down Expand Up @@ -508,8 +519,12 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int {
pInfo := jobW.Args[1].(*model.PartitionInfo)
count += len(pInfo.Definitions)
case model.ActionTruncateTable:
partCount := jobW.Args[3].(int)
count += 1 + partCount
if jobW.Version == model.JobVersion1 {
partCount := jobW.Args[3].(int)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
count += 1 + partCount
} else if jobW.Version == model.JobVersion2 {
count += 1 + len(jobW.ArgsV2.(*model.TruncateTableArgs).OldPartitionIDs)
}
}
}
return count
Expand Down Expand Up @@ -579,13 +594,23 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
pInfo.NewTableID = pInfo.Definitions[0].ID
case model.ActionTruncateTable:
if !jobW.IDAllocated {
jobW.Args[0] = alloc.next()
partCount := jobW.Args[3].(int)
partIDs := make([]int64, partCount)
for i := range partIDs {
partIDs[i] = alloc.next()
if jobW.Version == model.JobVersion1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for these getter logic, I prefer we always use a function to convert all version representations to TruncateTableArgs, then caller use TruncateTableArgs.

like

if !jobW.IDAllocated {
  arg := getTruncateTableArgs(job)
  arg.NewTableID = alloc.next()
  ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for new jobs, i plan put JobArgs into JobWrapper, and fill to job on insert, so we can use it directly here.

jobW.Args[0] = alloc.next()
partCount := jobW.Args[3].(int)
partIDs := make([]int64, partCount)
for i := range partIDs {
partIDs[i] = alloc.next()
}
jobW.Args[2] = partIDs
} else if jobW.Version == model.JobVersion2 {
args := jobW.ArgsV2.(*model.TruncateTableArgs)
args.NewTableID = alloc.next()
partIDs := make([]int64, len(args.OldPartitionIDs))
for i := range partIDs {
partIDs[i] = alloc.next()
}
args.NewPartitionIDs = partIDs
}
jobW.Args[2] = partIDs
}
}
jobW.ID = alloc.next()
Expand Down
22 changes: 18 additions & 4 deletions pkg/ddl/job_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,18 @@ func TestCombinedIDAllocation(t *testing.T) {
}

genTruncTblJob := func(partCnt int) *model.Job {
return &model.Job{
Type: model.ActionTruncateTable,
Args: []any{int64(0), false, []int64{}, partCnt},
j := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionTruncateTable,
}
if j.Version == model.JobVersion1 {
j.Args = []any{int64(0), false, []int64{}, partCnt}
} else {
j.ArgsV2 = &model.TruncateTableArgs{
OldPartitionIDs: make([]int64, partCnt),
}
}
return j
}

cases := []idAllocationCase{
Expand Down Expand Up @@ -444,7 +452,13 @@ func TestCombinedIDAllocation(t *testing.T) {
fkCheck bool
partIDs []int64
)
require.NoError(t, j.DecodeArgs(&newTblID, &fkCheck, &partIDs))
if j.Version == model.JobVersion1 {
require.NoError(t, j.DecodeArgs(&newTblID, &fkCheck, &partIDs))
} else {
argsV2, err := model.GetOrDecodeArgsV2[model.TruncateTableArgs](j)
require.NoError(t, err)
newTblID, fkCheck, partIDs = argsV2.NewTableID, argsV2.FKCheck, argsV2.NewPartitionIDs
}
checkID(newTblID)
for _, id := range partIDs {
checkID(id)
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4488,13 +4488,14 @@ func isPartExprUnsigned(ectx expression.EvalContext, tbInfo *model.TableInfo) bo
}

// truncateTableByReassignPartitionIDs reassigns new partition ids.
func truncateTableByReassignPartitionIDs(t *meta.Meta, tblInfo *model.TableInfo, pids []int64) (err error) {
// it also returns the new partition IDs for cases described below.
func truncateTableByReassignPartitionIDs(t *meta.Meta, tblInfo *model.TableInfo, pids []int64) ([]int64, error) {
if len(pids) < len(tblInfo.Partition.Definitions) {
// To make it compatible with older versions when pids was not given
// and if there has been any add/reorganize partition increasing the number of partitions
morePids, err := t.GenGlobalIDs(len(tblInfo.Partition.Definitions) - len(pids))
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
pids = append(pids, morePids...)
}
Expand All @@ -4505,7 +4506,7 @@ func truncateTableByReassignPartitionIDs(t *meta.Meta, tblInfo *model.TableInfo,
newDefs = append(newDefs, newDef)
}
tblInfo.Partition.Definitions = newDefs
return nil
return pids, nil
}

type partitionExprProcessor func(expression.BuildContext, *model.TableInfo, ast.ExprNode) error
Expand Down
17 changes: 16 additions & 1 deletion pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,29 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
return 0, errors.Trace(err)
}
return len(tableIDs), nil
case model.ActionDropTable, model.ActionTruncateTable:
case model.ActionDropTable:
var startKey kv.Key
var physicalTableIDs []int64
var ruleIDs []string
if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil {
return 0, errors.Trace(err)
}
return len(physicalTableIDs) + 1, nil
case model.ActionTruncateTable:
var startKey kv.Key
var oldPartitionIDs []int64
if job.Version == model.JobVersion1 {
if err := job.DecodeArgs(&startKey, &oldPartitionIDs); err != nil {
return 0, errors.Trace(err)
}
} else {
argsV2, err := model.GetOrDecodeArgsV2[model.TruncateTableArgs](job)
if err != nil {
return 0, errors.Trace(err)
}
oldPartitionIDs = argsV2.OldPartitionIDs
}
return len(oldPartitionIDs) + 1, nil
case model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
Expand Down
34 changes: 29 additions & 5 deletions pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,41 @@ func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error
// SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable.
func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error {
// Truncate table has two table ID, should be handled differently.
err := job.DecodeArgs(&diff.TableID)
var (
newTableID int64
err error
argsV2 *model.TruncateTableArgs
)
if job.Version == model.JobVersion1 {
err = job.DecodeArgs(&newTableID)
if err != nil {
return errors.Trace(err)
}
} else {
argsV2, err = model.GetOrDecodeArgsV2[model.TruncateTableArgs](job)
if err != nil {
return errors.Trace(err)
}
newTableID = argsV2.NewTableID
}
err = job.DecodeArgs(&diff.TableID)
if err != nil {
return errors.Trace(err)
}
diff.TableID = newTableID
diff.OldTableID = job.TableID

// affects are used to update placement rule cache
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
if job.Version == model.JobVersion1 {
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
} else {
if len(argsV2.OldPartIDsWithPolicy) > 0 {
diff.AffectedOpts = buildPlacementAffects(argsV2.OldPartIDsWithPolicy, argsV2.NewPartIDsWithPolicy)
}
}
return nil
}
Expand Down
Loading