Skip to content

Commit

Permalink
ddl: add job version 2 and use typed args for truncate table (#55854)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
D3Hunter authored Sep 9, 2024
1 parent 722fbdc commit 72c1580
Show file tree
Hide file tree
Showing 20 changed files with 548 additions and 112 deletions.
20 changes: 17 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ddl
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -62,15 +63,14 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

const (
// currentVersion is for all new DDL jobs.
currentVersion = 1
// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
DDLOwnerKey = "/tidb/ddl/fg/owner"
ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock"
Expand Down Expand Up @@ -713,7 +713,21 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))
// if we are running in test, random choose a job version to run with.
// TODO add a separate CI flow to run with different job version, so we can cover
// more cases in a single run.
if intest.InTest || config.GetGlobalConfig().Store == "unistore" {
jobVer := model.JobVersion1
// 50% percent to use JobVersion2 in test.
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
if rnd.Intn(2) == 0 {
jobVer = model.JobVersion2
}
model.SetJobVerInUse(jobVer)
}
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid),
zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()),
zap.Stringer("jobVersion", model.GetJobVerInUse()))

d.sessPool = sess.NewSessionPool(ctxPool)
d.executor.sessPool, d.jobSubmitter.sessPool = d.sessPool, d.sessPool
Expand Down
17 changes: 16 additions & 1 deletion pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(err)
}
}
case model.ActionDropTable, model.ActionTruncateTable:
case model.ActionDropTable:
tableID := job.TableID
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
var startKey kv.Key
Expand All @@ -313,6 +313,21 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
case model.ActionTruncateTable:
tableID := job.TableID
args, err := model.GetTruncateTableArgsAfterRun(job)
if err != nil {
return errors.Trace(err)
}
oldPartitionIDs := args.OldPartitionIDs
if len(oldPartitionIDs) > 0 {
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, oldPartitionIDs, ea, "truncate table: partition table IDs"); err != nil {
return errors.Trace(err)
}
}
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
Expand Down
45 changes: 29 additions & 16 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4186,25 +4186,28 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
return errors.Trace(dbterror.ErrTruncateIllegalForeignKey.GenWithStackByArgs(msg))
}

var partCount int
var oldPartitionIDs []int64
if tblInfo.Partition != nil {
partCount = len(tblInfo.Partition.Definitions)
oldPartitionIDs = make([]int64, 0, len(tblInfo.Partition.Definitions))
for _, def := range tblInfo.Partition.Definitions {
oldPartitionIDs = append(oldPartitionIDs, def.ID)
}
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
// Args[0] is the new table ID, args[2] is the ids for table partitions, we
// add a placeholder here, they will be filled by job submitter.
// the last param is not required for execution, we need it to calculate
// number of new IDs to generate.
Args: []any{int64(0), fkCheck, []int64{}, partCount},
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.FillArgs(&model.TruncateTableArgs{
FKCheck: fkCheck,
OldPartitionIDs: oldPartitionIDs,
})
err = e.DoDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -6421,13 +6424,23 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re
}
}

func getTruncateTableNewTableID(job *model.Job) int64 {
if job.Version == model.JobVersion1 {
return job.Args[0].(int64)
}
return job.ArgsV2.(*model.TruncateTableArgs).NewTableID
}

// HandleLockTablesOnSuccessSubmit handles the table lock for the job which is submitted
// successfully. exported for testing purpose.
func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) {
if jobW.Type == model.ActionTruncateTable {
if ok, lockTp := ctx.CheckTableLocked(jobW.TableID); ok {
newTableID := jobW.Args[0].(int64)
ctx.AddTableLock([]model.TableLockTpInfo{{SchemaID: jobW.SchemaID, TableID: newTableID, Tp: lockTp}})
ctx.AddTableLock([]model.TableLockTpInfo{{
SchemaID: jobW.SchemaID,
TableID: getTruncateTableNewTableID(jobW.Job),
Tp: lockTp,
}})
}
}
}
Expand All @@ -6437,7 +6450,7 @@ func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) {
func HandleLockTablesOnFinish(ctx sessionctx.Context, jobW *JobWrapper, ddlErr error) {
if jobW.Type == model.ActionTruncateTable {
if ddlErr != nil {
newTableID := jobW.Args[0].(int64)
newTableID := getTruncateTableNewTableID(jobW.Job)
ctx.ReleaseTableLockByTableIDs([]int64{newTableID})
return
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ddl/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ func TestHandleLockTable(t *testing.T) {
require.True(t, locked)
require.Equal(t, tp, lockType)
}
jobW := ddl.NewJobWrapper(&model.Job{Type: model.ActionTruncateTable, TableID: 1, Args: []any{int64(2)}}, false)
job := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionTruncateTable,
TableID: 1,
}
job.FillArgs(&model.TruncateTableArgs{NewTableID: 2})
jobW := ddl.NewJobWrapper(job, false)

t.Run("target table not locked", func(t *testing.T) {
se.ReleaseAllTableLocks()
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
if schema {
return strconv.FormatInt(job.SchemaID, 10)
}
return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(job.Args[0].(int64), 10)
newTableID := getTruncateTableNewTableID(job)
return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
}
if schema {
return strconv.FormatInt(job.SchemaID, 10)
Expand Down
45 changes: 35 additions & 10 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {

for _, jobW := range jobWs {
job := jobW.Job
job.Version = currentVersion
if job.Version == 0 {
// if not set, fix it to version 1
// TODO replace this with assert after we add code v2 for all jobs.
job.Version = model.JobVersion1
}
job.StartTS = startTS
job.BDRRole = bdrRole

Expand Down Expand Up @@ -359,6 +363,14 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {
return kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)

for _, jobW := range jobWs {
if jobW.Version == 0 {
// if not set, fix it to version 1
// TODO replace this with assert after we add code v2 for all jobs.
jobW.Version = model.JobVersion1
}
}

count := getRequiredGIDCount(jobWs)
ids, err := t.GenGlobalIDs(count)
if err != nil {
Expand All @@ -372,7 +384,6 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {

for _, jobW := range jobWs {
job := jobW.Job
job.Version = currentVersion
job.StartTS = txn.StartTS()
setJobStateToQueueing(job)
if err = buildJobDependence(t, job); err != nil {
Expand Down Expand Up @@ -508,8 +519,12 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int {
pInfo := jobW.Args[1].(*model.PartitionInfo)
count += len(pInfo.Definitions)
case model.ActionTruncateTable:
partCount := jobW.Args[3].(int)
count += 1 + partCount
if jobW.Version == model.JobVersion1 {
partCount := jobW.Args[3].(int)
count += 1 + partCount
} else {
count += 1 + len(jobW.ArgsV2.(*model.TruncateTableArgs).OldPartitionIDs)
}
}
}
return count
Expand Down Expand Up @@ -579,13 +594,23 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
pInfo.NewTableID = pInfo.Definitions[0].ID
case model.ActionTruncateTable:
if !jobW.IDAllocated {
jobW.Args[0] = alloc.next()
partCount := jobW.Args[3].(int)
partIDs := make([]int64, partCount)
for i := range partIDs {
partIDs[i] = alloc.next()
if jobW.Version == model.JobVersion1 {
jobW.Args[0] = alloc.next()
partCount := jobW.Args[3].(int)
partIDs := make([]int64, partCount)
for i := range partIDs {
partIDs[i] = alloc.next()
}
jobW.Args[2] = partIDs
} else {
args := jobW.ArgsV2.(*model.TruncateTableArgs)
args.NewTableID = alloc.next()
partIDs := make([]int64, len(args.OldPartitionIDs))
for i := range partIDs {
partIDs[i] = alloc.next()
}
args.NewPartitionIDs = partIDs
}
jobW.Args[2] = partIDs
}
}
jobW.ID = alloc.next()
Expand Down
60 changes: 33 additions & 27 deletions pkg/ddl/job_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ func TestCombinedIDAllocation(t *testing.T) {

genCreateTblJob := func(tp model.ActionType, partitionCnt int) *model.Job {
return &model.Job{
Type: tp,
Args: []any{genTblInfo(partitionCnt)},
Version: model.JobVersion1,
Type: tp,
Args: []any{genTblInfo(partitionCnt)},
}
}

Expand All @@ -147,24 +148,27 @@ func TestCombinedIDAllocation(t *testing.T) {
infos = append(infos, genTblInfo(c))
}
return &model.Job{
Type: model.ActionCreateTables,
Args: []any{infos},
Version: model.JobVersion1,
Type: model.ActionCreateTables,
Args: []any{infos},
}
}

genCreateDBJob := func() *model.Job {
info := &model.DBInfo{}
return &model.Job{
Type: model.ActionCreateSchema,
Args: []any{info},
Version: model.JobVersion1,
Type: model.ActionCreateSchema,
Args: []any{info},
}
}

genRGroupJob := func() *model.Job {
info := &model.ResourceGroupInfo{}
return &model.Job{
Type: model.ActionCreateResourceGroup,
Args: []any{info},
Version: model.JobVersion1,
Type: model.ActionCreateResourceGroup,
Args: []any{info},
}
}

Expand All @@ -173,16 +177,18 @@ func TestCombinedIDAllocation(t *testing.T) {
Definitions: make([]model.PartitionDefinition, partCnt),
}
return &model.Job{
Type: model.ActionAlterTablePartitioning,
Args: []any{[]string{}, info},
Version: model.JobVersion1,
Type: model.ActionAlterTablePartitioning,
Args: []any{[]string{}, info},
}
}

genTruncPartitionJob := func(partCnt int) *model.Job {
oldIDs := make([]int64, partCnt)
return &model.Job{
Type: model.ActionTruncateTablePartition,
Args: []any{oldIDs, []int64{}},
Version: model.JobVersion1,
Type: model.ActionTruncateTablePartition,
Args: []any{oldIDs, []int64{}},
}
}

Expand All @@ -191,8 +197,9 @@ func TestCombinedIDAllocation(t *testing.T) {
Definitions: make([]model.PartitionDefinition, partCnt),
}
return &model.Job{
Type: model.ActionAddTablePartition,
Args: []any{info},
Version: model.JobVersion1,
Type: model.ActionAddTablePartition,
Args: []any{info},
}
}

Expand All @@ -206,16 +213,19 @@ func TestCombinedIDAllocation(t *testing.T) {
require.Equal(t, 1, partCnt)
}
return &model.Job{
Type: tp,
Args: []any{[]string{}, info},
Version: model.JobVersion1,
Type: tp,
Args: []any{[]string{}, info},
}
}

genTruncTblJob := func(partCnt int) *model.Job {
return &model.Job{
Type: model.ActionTruncateTable,
Args: []any{int64(0), false, []int64{}, partCnt},
j := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionTruncateTable,
}
j.FillArgs(&model.TruncateTableArgs{OldPartitionIDs: make([]int64, partCnt)})
return j
}

cases := []idAllocationCase{
Expand Down Expand Up @@ -439,14 +449,10 @@ func TestCombinedIDAllocation(t *testing.T) {
checkPartitionInfo(info)
checkID(info.NewTableID)
case model.ActionTruncateTable:
var (
newTblID int64
fkCheck bool
partIDs []int64
)
require.NoError(t, j.DecodeArgs(&newTblID, &fkCheck, &partIDs))
checkID(newTblID)
for _, id := range partIDs {
args, err := model.GetTruncateTableArgsBeforeRun(j)
require.NoError(t, err)
checkID(args.NewTableID)
for _, id := range args.NewPartitionIDs {
checkID(id)
}
}
Expand Down
Loading

0 comments on commit 72c1580

Please sign in to comment.