Skip to content

Commit

Permalink
*: Drop partition DDL handling for overlapping partitions during Stat…
Browse files Browse the repository at this point in the history
…e Changes (#56082)

close #55888
  • Loading branch information
mjonss authored Oct 9, 2024
1 parent 1eb0c8c commit 1e24d39
Show file tree
Hide file tree
Showing 17 changed files with 821 additions and 225 deletions.
5 changes: 2 additions & 3 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2282,9 +2282,8 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s
}
if pi.Type == pmodel.PartitionTypeList {
// TODO: make sure that checks in ddl_api and ddl_worker is the same.
err = checkAddListPartitions(meta)
if err != nil {
return errors.Trace(err)
if meta.Partition.GetDefaultListPartition() != -1 {
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}

Expand Down
199 changes: 124 additions & 75 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func checkAddPartition(t *meta.Mutator, job *model.Job) (*model.TableInfo, *mode
func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.onDropTablePartition(jobCtx, job)
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -344,20 +344,6 @@ func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, [
return physicalTableIDs, partNames, rollbackBundles
}

// Check if current table already contains DEFAULT list partition
func checkAddListPartitions(tblInfo *model.TableInfo) error {
for i := range tblInfo.Partition.Definitions {
for j := range tblInfo.Partition.Definitions[i].InValues {
for _, val := range tblInfo.Partition.Definitions[i].InValues[j] {
if val == "DEFAULT" { // should already be normalized
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}
}
}
return nil
}

// checkAddPartitionValue check add Partition Values,
// For Range: values less than value must be strictly increasing for each partition.
// For List: if a Default partition exists,
Expand Down Expand Up @@ -398,9 +384,8 @@ func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) er
}
}
case pmodel.PartitionTypeList:
err := checkAddListPartitions(meta)
if err != nil {
return err
if meta.Partition.GetDefaultListPartition() != -1 {
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}
return nil
Expand Down Expand Up @@ -2144,75 +2129,118 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames
return infosync.UpdateLabelRules(ctx, patch)
}

// onDropTablePartition deletes old partition meta.
func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
// rollbackLikeDropPartition does rollback for Reorganize partition and Add partition.
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
partNames, partInfo := args.PartNames, args.PartInfo
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
if job.Type != model.ActionDropTablePartition {
// If rollback from reorganize partition, remove DroppingDefinitions from tableInfo
tblInfo.Partition.DroppingDefinitions = nil
// If rollback from adding table partition, remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if job.Type == model.ActionAlterTablePartitioning {
// ALTER TABLE t PARTITION BY ... creates an additional
// Table ID
// Note, for REMOVE PARTITIONING, it is the same
// as for the single partition, to be changed to table.
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
tblInfo.Partition = nil
}
}

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
dropIndices = append(dropIndices, indexInfo)
}
}
for _, indexInfo := range dropIndices {
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)
var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
dropIndices = append(dropIndices, indexInfo)
}
}
for _, indexInfo := range dropIndices {
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)
}
if tblInfo.Partition != nil {
tblInfo.Partition.ClearReorgIntermediateInfo()
}

if tblInfo.Partition.Type == pmodel.PartitionTypeNone {
tblInfo.Partition = nil
} else {
tblInfo.Partition.ClearReorgIntermediateInfo()
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
args.OldPhysicalTblIDs = physicalTableIDs
job.FillFinishedArgs(args)
return ver, nil
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
args.OldPhysicalTblIDs = physicalTableIDs
job.FillFinishedArgs(args)
return ver, nil
}

// onDropTablePartition deletes old partition meta.
// States:
// StateNone
//
// Old partitions are queued to be deleted (delete_range), global index up-to-date
//
// StateDeleteReorganization
//
// Old partitions are not accessible/used by any sessions.
// Inserts/updates of global index which still have entries pointing to old partitions
// will overwrite those entries
// In the background we are reading all old partitions and deleting their entries from
// the global indexes.
//
// StateDeleteOnly
//
// old partitions are no longer visible, but if there is inserts/updates to the global indexes,
// duplicate key errors will be given, even if the entries are from dropped partitions
// Note that overlapping ranges (i.e. a dropped partitions with 'less than (N)' will now .. ?!?
//
// StateWriteOnly
//
// old partitions are blocked for read and write. But for read we are allowing
// "overlapping" partition to be read instead. Which means that write can only
// happen in the 'overlapping' partitions original range, not into the extended
// range open by the dropped partitions.
//
// StatePublic
//
// Original state, unaware of DDL
func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
partNames := args.PartNames
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

var physicalTableIDs []int64
Expand All @@ -2221,15 +2249,30 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
originalState := job.SchemaState
switch job.SchemaState {
case model.StatePublic:
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
// Here we mark the partitions to be dropped, so they are not read or written
err = CheckDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Reason, see https://github.com/pingcap/tidb/issues/55888
// Only mark the partitions as to be dropped, so they are not used, but not yet removed.
originalDefs := tblInfo.Partition.Definitions
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
tblInfo.Partition.Definitions = originalDefs
tblInfo.Partition.DDLState = model.StateWriteOnly
tblInfo.Partition.DDLAction = model.ActionDropTablePartition

job.SchemaState = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateWriteOnly:
// Since the previous state do not use the dropping partitions,
// we can now actually remove them, allowing to write into the overlapping range
// of the higher range partition or LIST default partition.
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, partNames)
if err != nil {
// TODO: Add failpoint error/cancel injection and test failure/rollback and cancellation!
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}
Expand Down Expand Up @@ -2265,12 +2308,14 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
return ver, err
}

tblInfo.Partition.DDLState = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// This state is not a real 'DeleteOnly' state, because tidb does not maintain the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
tblInfo.Partition.DDLState = model.StateDeleteReorganization
job.SchemaState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteReorganization:
Expand Down Expand Up @@ -2330,6 +2375,8 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []any{physicalTableIDs} // TODO remove it.
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
Expand Down Expand Up @@ -2464,6 +2511,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
pi.DroppingDefinitions = truncatingDefinitions
pi.NewPartitionIDs = newIDs[:]

tblInfo.Partition.DDLAction = model.ActionTruncateTablePartition
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
Expand Down Expand Up @@ -3072,7 +3120,7 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job) (*model.TableInfo, [
func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.onDropTablePartition(jobCtx, job)
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -3277,6 +3325,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = model.StateDeleteOnly
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
18 changes: 3 additions & 15 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,12 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job,
}
args.PartNames = partNames
model.FillRollbackArgsForAddPartition(job, args)
/*
_, err = job.Encode(true)
if err != nil {
return ver, errors.Trace(err)
}
*/
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}
Expand Down Expand Up @@ -427,7 +422,7 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
}
// We cannot drop the index here, we need to wait until
// the next schema version
// i.e. rollback in onDropTablePartition
// i.e. rollback in rollbackLikeDropPartition
// New index that became public in this state,
// mark it to be dropped in next schema version
if indexInfo.Global {
Expand Down Expand Up @@ -508,13 +503,6 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
}
args.PartNames = partNames
job.FillArgs(args)
/*
_, err = job.Encode(true)
if err != nil {
return ver, errors.Trace(err)
}
*/
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_test(
srcs = [
"db_partition_test.go",
"main_test.go",
"multi_domain_test.go",
"placement_test.go",
"reorg_partition_test.go",
],
Expand Down Expand Up @@ -39,6 +40,7 @@ go_test(
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/dbterror",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
Loading

0 comments on commit 1e24d39

Please sign in to comment.