Skip to content

Commit

Permalink
ddl: args v2 for drop table/view/sequence (pingcap#56219)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and winoros committed Sep 23, 2024
1 parent 79c6958 commit f3efa8a
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 45 deletions.
8 changes: 6 additions & 2 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ var (

var (
dropSchemaJob *model.Job
dropTable0Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)}
dropTable1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)}
dropTable0Job *model.Job
dropTable1Job *model.Job
dropTable0Partition1Job *model.Job
reorganizeTable0Partition1Job *model.Job
removeTable0Partition1Job *model.Job
Expand Down Expand Up @@ -743,6 +743,10 @@ func init() {
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.TablePartitionArgs{OldPhysicalTblIDs: []int64{73}})
dropTable0Partition1Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropTablePartition,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.TablePartitionArgs{OldPhysicalTblIDs: []int64{73}})
dropTable0Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropTable,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.DropTableArgs{OldPartitionIDs: []int64{72, 73, 74}})
dropTable1Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropTable,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID}, &model.DropTableArgs{})
}

type mockInsertDeleteRange struct {
Expand Down
7 changes: 3 additions & 4 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,11 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
case model.ActionDropTable:
tableID := job.TableID
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
var startKey kv.Key
var physicalTableIDs []int64
var ruleIDs []string
if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil {
args, err := model.GetFinishedDropTableArgs(job)
if err != nil {
return errors.Trace(err)
}
physicalTableIDs := args.OldPartitionIDs
if len(physicalTableIDs) > 0 {
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "drop table: partition table IDs"); err != nil {
return errors.Trace(err)
Expand Down
18 changes: 12 additions & 6 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4064,14 +4064,16 @@ func (e *executor) dropTableObject(
jobType model.ActionType
)

var jobArgs []any
var (
objectIdents []ast.Ident
fkCheck bool
)
switch tableObjectType {
case tableObject:
dropExistErr = infoschema.ErrTableDropExists
jobType = model.ActionDropTable
objectIdents := make([]ast.Ident, len(objects))
fkCheck := ctx.GetSessionVars().ForeignKeyChecks
jobArgs = []any{objectIdents, fkCheck}
objectIdents = make([]ast.Ident, len(objects))
fkCheck = ctx.GetSessionVars().ForeignKeyChecks
for i, tn := range objects {
objectIdents[i] = ast.Ident{Schema: tn.Schema, Name: tn.Name}
}
Expand Down Expand Up @@ -4151,19 +4153,23 @@ func (e *executor) dropTableObject(
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tableInfo.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: schema.State,
TableName: tableInfo.Meta().Name.L,
Type: jobType,
BinlogInfo: &model.HistoryInfo{},
Args: jobArgs,
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.DropTableArgs{
Identifiers: objectIdents,
FKCheck: fkCheck,
}

err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, args)
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
continue
Expand Down
5 changes: 2 additions & 3 deletions pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,12 @@ func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache,
if !variable.EnableForeignKey.Load() {
return nil
}
var objectIdents []ast.Ident
var fkCheck bool
err := job.DecodeArgs(&objectIdents, &fkCheck)
args, err := model.GetDropTableArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return errors.Trace(err)
}
objectIdents, fkCheck := args.Identifiers, args.FKCheck
referredFK, err := checkTableHasForeignKeyReferredInOwner(infoCache, job.SchemaName, job.TableName, objectIdents, fkCheck)
if err != nil {
return err
Expand Down
8 changes: 3 additions & 5 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
}
return len(args.AllDroppedTableIDs), nil
case model.ActionDropTable:
var startKey kv.Key
var physicalTableIDs []int64
var ruleIDs []string
if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil {
args, err := model.GetFinishedDropTableArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
return len(physicalTableIDs) + 1, nil
return len(args.OldPartitionIDs) + 1, nil
case model.ActionTruncateTable:
args, err := model.GetFinishedTruncateTableArgs(job)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
startKey := tablecodec.EncodeTablePrefix(job.TableID)
job.Args = append(job.Args, startKey, oldIDs, ruleIDs)
job.FillFinishedArgs(&model.DropTableArgs{
StartKey: startKey,
OldPartitionIDs: oldIDs,
OldRuleIDs: ruleIDs,
})
if !tblInfo.IsSequence() && !tblInfo.IsView() {
dropTableEvent := notifier.NewDropTableEvent(tblInfo)
asyncNotifyEvent(jobCtx, dropTableEvent, job)
Expand Down
4 changes: 3 additions & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/meta/model",
visibility = ["//visibility:public"],
deps = [
"//pkg/parser/ast",
"//pkg/parser/auth",
"//pkg/parser/charset",
"//pkg/parser/duration",
Expand All @@ -44,8 +45,9 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 39,
shard_count = 41,
deps = [
"//pkg/parser/ast",
"//pkg/parser/charset",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
71 changes: 71 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/intest"
)
Expand Down Expand Up @@ -279,6 +280,76 @@ func GetBatchCreateTableArgs(job *Job) (*BatchCreateTableArgs, error) {
return getOrDecodeArgsV2[*BatchCreateTableArgs](job)
}

// DropTableArgs is the arguments for drop table/view/sequence job.
// when dropping multiple objects, each object will have a separate job
type DropTableArgs struct {
// below fields are only for drop table.
// when dropping multiple tables, the Identifiers is the same.
Identifiers []ast.Ident `json:"identifiers,omitempty"`
FKCheck bool `json:"fk_check,omitempty"`

// below fields are finished job args
StartKey []byte `json:"start_key,omitempty"`
OldPartitionIDs []int64 `json:"old_partition_ids,omitempty"`
OldRuleIDs []string `json:"old_rule_ids,omitempty"`
}

func (a *DropTableArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
// only drop table job has in args.
if job.Type == ActionDropTable {
job.Args = []any{a.Identifiers, a.FKCheck}
}
return
}
job.Args = []any{a}
}

func (a *DropTableArgs) fillFinishedJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.StartKey, a.OldPartitionIDs, a.OldRuleIDs}
return
}
job.Args = []any{a}
}

func (a *DropTableArgs) decodeV1(job *Job) error {
intest.Assert(job.Type == ActionDropTable, "only drop table job can call GetDropTableArgs")
return job.DecodeArgs(&a.Identifiers, &a.FKCheck)
}

// GetDropTableArgs gets the drop-table args.
func GetDropTableArgs(job *Job) (*DropTableArgs, error) {
if job.Version == JobVersion1 {
args := &DropTableArgs{}
if err := args.decodeV1(job); err != nil {
return nil, errors.Trace(err)
}
return args, nil
}
return getOrDecodeArgsV2[*DropTableArgs](job)
}

// GetFinishedDropTableArgs gets the drop-table args after the job is finished.
func GetFinishedDropTableArgs(job *Job) (*DropTableArgs, error) {
if job.Version == JobVersion1 {
var (
startKey []byte
oldPartitionIDs []int64
oldRuleIDs []string
)
if err := job.DecodeArgs(&startKey, &oldPartitionIDs, &oldRuleIDs); err != nil {
return nil, errors.Trace(err)
}
return &DropTableArgs{
StartKey: startKey,
OldPartitionIDs: oldPartitionIDs,
OldRuleIDs: oldRuleIDs,
}, nil
}
return getOrDecodeArgsV2[*DropTableArgs](job)
}

// TruncateTableArgs is the arguments for truncate table job.
type TruncateTableArgs struct {
FKCheck bool `json:"fk_check,omitempty"`
Expand Down
47 changes: 47 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package model

import (
"encoding/json"
"testing"

"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -200,6 +202,51 @@ func TestBatchCreateTableArgs(t *testing.T) {
require.EqualValues(t, inArgs.Tables, args.Tables)
}

func TestDropTableArgs(t *testing.T) {
inArgs := &DropTableArgs{
Identifiers: []ast.Ident{
{Schema: model.NewCIStr("db"), Name: model.NewCIStr("tbl")},
{Schema: model.NewCIStr("db2"), Name: model.NewCIStr("tbl2")},
},
FKCheck: true,
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropTable)))
args, err := GetDropTableArgs(j2)
require.NoError(t, err)
require.EqualValues(t, inArgs, args)
}
for _, tp := range []ActionType{ActionDropView, ActionDropSequence} {
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp)))
if v == JobVersion1 {
require.Equal(t, json.RawMessage("null"), j2.RawArgs)
} else {
args, err := GetDropTableArgs(j2)
require.NoError(t, err)
require.EqualValues(t, inArgs, args)
}
}
}
}

func TestFinishedDropTableArgs(t *testing.T) {
inArgs := &DropTableArgs{
StartKey: []byte("xxx"),
OldPartitionIDs: []int64{1, 2},
OldRuleIDs: []string{"schema/test/a/par1", "schema/test/a/par2"},
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getFinishedJobBytes(t, inArgs, v, ActionDropTable)))
args, err := GetFinishedDropTableArgs(j2)
require.NoError(t, err)
require.EqualValues(t, inArgs, args)
}
}

func TestTruncateTableArgs(t *testing.T) {
inArgs := &TruncateTableArgs{
NewTableID: 1,
Expand Down
Loading

0 comments on commit f3efa8a

Please sign in to comment.