Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add job version 2 and use typed args for truncate table #55854

Merged
merged 12 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ddl
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -62,15 +63,14 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

const (
// currentVersion is for all new DDL jobs.
currentVersion = 1
// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
DDLOwnerKey = "/tidb/ddl/fg/owner"
ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock"
Expand Down Expand Up @@ -713,7 +713,21 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))
// if we are running in test, random choose a job version to run with.
// TODO add a separate CI flow to run with different job version, so we can cover
// more cases in a single run.
if intest.InTest || config.GetGlobalConfig().Store == "unistore" {
jobVer := model.JobVersion1
// 50% percent to use JobVersion2 in test.
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
if rnd.Intn(2) == 0 {
jobVer = model.JobVersion2
}
model.SetJobVerInUse(jobVer)
}
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid),
zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()),
zap.Stringer("jobVersion", model.GetJobVerInUse()))

d.sessPool = sess.NewSessionPool(ctxPool)
d.executor.sessPool, d.jobSubmitter.sessPool = d.sessPool, d.sessPool
Expand Down
17 changes: 16 additions & 1 deletion pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(err)
}
}
case model.ActionDropTable, model.ActionTruncateTable:
case model.ActionDropTable:
tableID := job.TableID
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
var startKey kv.Key
Expand All @@ -313,6 +313,21 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
case model.ActionTruncateTable:
tableID := job.TableID
args, err := model.GetTruncateTableArgsAfterRun(job)
if err != nil {
return errors.Trace(err)
}
oldPartitionIDs := args.OldPartitionIDs
if len(oldPartitionIDs) > 0 {
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, oldPartitionIDs, ea, "truncate table: partition table IDs"); err != nil {
return errors.Trace(err)
}
}
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
Expand Down
45 changes: 29 additions & 16 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4186,25 +4186,28 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
return errors.Trace(dbterror.ErrTruncateIllegalForeignKey.GenWithStackByArgs(msg))
}

var partCount int
var oldPartitionIDs []int64
if tblInfo.Partition != nil {
partCount = len(tblInfo.Partition.Definitions)
oldPartitionIDs = make([]int64, 0, len(tblInfo.Partition.Definitions))
for _, def := range tblInfo.Partition.Definitions {
oldPartitionIDs = append(oldPartitionIDs, def.ID)
}
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
// Args[0] is the new table ID, args[2] is the ids for table partitions, we
// add a placeholder here, they will be filled by job submitter.
// the last param is not required for execution, we need it to calculate
// number of new IDs to generate.
Args: []any{int64(0), fkCheck, []int64{}, partCount},
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.FillArgs(&model.TruncateTableArgs{
FKCheck: fkCheck,
OldPartitionIDs: oldPartitionIDs,
})
err = e.DoDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -6421,13 +6424,23 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re
}
}

func getTruncateTableNewTableID(job *model.Job) int64 {
if job.Version == model.JobVersion1 {
return job.Args[0].(int64)
}
return job.ArgsV2.(*model.TruncateTableArgs).NewTableID
}

// HandleLockTablesOnSuccessSubmit handles the table lock for the job which is submitted
// successfully. exported for testing purpose.
func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) {
if jobW.Type == model.ActionTruncateTable {
if ok, lockTp := ctx.CheckTableLocked(jobW.TableID); ok {
newTableID := jobW.Args[0].(int64)
ctx.AddTableLock([]model.TableLockTpInfo{{SchemaID: jobW.SchemaID, TableID: newTableID, Tp: lockTp}})
ctx.AddTableLock([]model.TableLockTpInfo{{
SchemaID: jobW.SchemaID,
TableID: getTruncateTableNewTableID(jobW.Job),
Tp: lockTp,
}})
}
}
}
Expand All @@ -6437,7 +6450,7 @@ func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) {
func HandleLockTablesOnFinish(ctx sessionctx.Context, jobW *JobWrapper, ddlErr error) {
if jobW.Type == model.ActionTruncateTable {
if ddlErr != nil {
newTableID := jobW.Args[0].(int64)
newTableID := getTruncateTableNewTableID(jobW.Job)
ctx.ReleaseTableLockByTableIDs([]int64{newTableID})
return
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ddl/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ func TestHandleLockTable(t *testing.T) {
require.True(t, locked)
require.Equal(t, tp, lockType)
}
jobW := ddl.NewJobWrapper(&model.Job{Type: model.ActionTruncateTable, TableID: 1, Args: []any{int64(2)}}, false)
job := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionTruncateTable,
TableID: 1,
}
job.FillArgs(&model.TruncateTableArgs{NewTableID: 2})
jobW := ddl.NewJobWrapper(job, false)

t.Run("target table not locked", func(t *testing.T) {
se.ReleaseAllTableLocks()
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
if schema {
return strconv.FormatInt(job.SchemaID, 10)
}
return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(job.Args[0].(int64), 10)
newTableID := getTruncateTableNewTableID(job)
return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
}
if schema {
return strconv.FormatInt(job.SchemaID, 10)
Expand Down
45 changes: 35 additions & 10 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {

for _, jobW := range jobWs {
job := jobW.Job
job.Version = currentVersion
if job.Version == 0 {
// if not set, fix it to version 1
// TODO replace this with assert after we add code v2 for all jobs.
job.Version = model.JobVersion1
}
job.StartTS = startTS
job.BDRRole = bdrRole

Expand Down Expand Up @@ -359,6 +363,14 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {
return kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)

for _, jobW := range jobWs {
if jobW.Version == 0 {
// if not set, fix it to version 1
// TODO replace this with assert after we add code v2 for all jobs.
jobW.Version = model.JobVersion1
}
}

count := getRequiredGIDCount(jobWs)
ids, err := t.GenGlobalIDs(count)
if err != nil {
Expand All @@ -372,7 +384,6 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {

for _, jobW := range jobWs {
job := jobW.Job
job.Version = currentVersion
job.StartTS = txn.StartTS()
setJobStateToQueueing(job)
if err = buildJobDependence(t, job); err != nil {
Expand Down Expand Up @@ -508,8 +519,12 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int {
pInfo := jobW.Args[1].(*model.PartitionInfo)
count += len(pInfo.Definitions)
case model.ActionTruncateTable:
partCount := jobW.Args[3].(int)
count += 1 + partCount
if jobW.Version == model.JobVersion1 {
partCount := jobW.Args[3].(int)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
count += 1 + partCount
} else {
count += 1 + len(jobW.ArgsV2.(*model.TruncateTableArgs).OldPartitionIDs)
}
}
}
return count
Expand Down Expand Up @@ -579,13 +594,23 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
pInfo.NewTableID = pInfo.Definitions[0].ID
case model.ActionTruncateTable:
if !jobW.IDAllocated {
jobW.Args[0] = alloc.next()
partCount := jobW.Args[3].(int)
partIDs := make([]int64, partCount)
for i := range partIDs {
partIDs[i] = alloc.next()
if jobW.Version == model.JobVersion1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for these getter logic, I prefer we always use a function to convert all version representations to TruncateTableArgs, then caller use TruncateTableArgs.

like

if !jobW.IDAllocated {
  arg := getTruncateTableArgs(job)
  arg.NewTableID = alloc.next()
  ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for new jobs, i plan put JobArgs into JobWrapper, and fill to job on insert, so we can use it directly here.

jobW.Args[0] = alloc.next()
partCount := jobW.Args[3].(int)
partIDs := make([]int64, partCount)
for i := range partIDs {
partIDs[i] = alloc.next()
}
jobW.Args[2] = partIDs
} else {
args := jobW.ArgsV2.(*model.TruncateTableArgs)
args.NewTableID = alloc.next()
partIDs := make([]int64, len(args.OldPartitionIDs))
for i := range partIDs {
partIDs[i] = alloc.next()
}
args.NewPartitionIDs = partIDs
}
jobW.Args[2] = partIDs
}
}
jobW.ID = alloc.next()
Expand Down
60 changes: 33 additions & 27 deletions pkg/ddl/job_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ func TestCombinedIDAllocation(t *testing.T) {

genCreateTblJob := func(tp model.ActionType, partitionCnt int) *model.Job {
return &model.Job{
Type: tp,
Args: []any{genTblInfo(partitionCnt)},
Version: model.JobVersion1,
Type: tp,
Args: []any{genTblInfo(partitionCnt)},
}
}

Expand All @@ -147,24 +148,27 @@ func TestCombinedIDAllocation(t *testing.T) {
infos = append(infos, genTblInfo(c))
}
return &model.Job{
Type: model.ActionCreateTables,
Args: []any{infos},
Version: model.JobVersion1,
Type: model.ActionCreateTables,
Args: []any{infos},
}
}

genCreateDBJob := func() *model.Job {
info := &model.DBInfo{}
return &model.Job{
Type: model.ActionCreateSchema,
Args: []any{info},
Version: model.JobVersion1,
Type: model.ActionCreateSchema,
Args: []any{info},
}
}

genRGroupJob := func() *model.Job {
info := &model.ResourceGroupInfo{}
return &model.Job{
Type: model.ActionCreateResourceGroup,
Args: []any{info},
Version: model.JobVersion1,
Type: model.ActionCreateResourceGroup,
Args: []any{info},
}
}

Expand All @@ -173,16 +177,18 @@ func TestCombinedIDAllocation(t *testing.T) {
Definitions: make([]model.PartitionDefinition, partCnt),
}
return &model.Job{
Type: model.ActionAlterTablePartitioning,
Args: []any{[]string{}, info},
Version: model.JobVersion1,
Type: model.ActionAlterTablePartitioning,
Args: []any{[]string{}, info},
}
}

genTruncPartitionJob := func(partCnt int) *model.Job {
oldIDs := make([]int64, partCnt)
return &model.Job{
Type: model.ActionTruncateTablePartition,
Args: []any{oldIDs, []int64{}},
Version: model.JobVersion1,
Type: model.ActionTruncateTablePartition,
Args: []any{oldIDs, []int64{}},
}
}

Expand All @@ -191,8 +197,9 @@ func TestCombinedIDAllocation(t *testing.T) {
Definitions: make([]model.PartitionDefinition, partCnt),
}
return &model.Job{
Type: model.ActionAddTablePartition,
Args: []any{info},
Version: model.JobVersion1,
Type: model.ActionAddTablePartition,
Args: []any{info},
}
}

Expand All @@ -206,16 +213,19 @@ func TestCombinedIDAllocation(t *testing.T) {
require.Equal(t, 1, partCnt)
}
return &model.Job{
Type: tp,
Args: []any{[]string{}, info},
Version: model.JobVersion1,
Type: tp,
Args: []any{[]string{}, info},
}
}

genTruncTblJob := func(partCnt int) *model.Job {
return &model.Job{
Type: model.ActionTruncateTable,
Args: []any{int64(0), false, []int64{}, partCnt},
j := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionTruncateTable,
}
j.FillArgs(&model.TruncateTableArgs{OldPartitionIDs: make([]int64, partCnt)})
return j
}

cases := []idAllocationCase{
Expand Down Expand Up @@ -439,14 +449,10 @@ func TestCombinedIDAllocation(t *testing.T) {
checkPartitionInfo(info)
checkID(info.NewTableID)
case model.ActionTruncateTable:
var (
newTblID int64
fkCheck bool
partIDs []int64
)
require.NoError(t, j.DecodeArgs(&newTblID, &fkCheck, &partIDs))
checkID(newTblID)
for _, id := range partIDs {
args, err := model.GetTruncateTableArgsBeforeRun(j)
require.NoError(t, err)
checkID(args.NewTableID)
for _, id := range args.NewPartitionIDs {
checkID(id)
}
}
Expand Down
Loading