From dafcd02fa163432f5d6b98489bb850434ded8cd5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 30 Jun 2022 21:08:32 +0800 Subject: [PATCH 1/2] ddl/sanity_check: refactor checkDeleteRangeCnt --- ddl/sanity_check.go | 101 ++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 55 deletions(-) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index 9566c36201d99..b52c90cb7b87f 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -20,53 +20,50 @@ import ( "fmt" "strings" + "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" ) -func checkRangeCntByTableIDs(physicalTableIDs []int64, cnt int64) { - if len(physicalTableIDs) > 0 { - if len(physicalTableIDs) != int(cnt) { - panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt)) +func (d *ddl) checkDeleteRangeCnt(job *model.Job) { + actualCnt, err := queryDeleteRangeCnt(d.sessPool, job.ID) + if err != nil { + if strings.Contains(err.Error(), "Not Supported") { + return // For mock session, we don't support executing SQLs. } - } else if cnt != 1 { - panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", 1, cnt)) - } -} - -func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []int64, cnt int64) { - if len(indexIDs) == 0 { - return + logutil.BgLogger().Error("query delete range count failed", zap.Error(err)) + panic(err) } - expectedCnt := len(indexIDs) - if len(partitionTableIDs) > 0 { - expectedCnt *= len(partitionTableIDs) + expectedCnt, err := expectedDeleteRangeCnt(job) + if err != nil { + logutil.BgLogger().Error("decode job's delete range count failed", zap.Error(err)) + panic(err) } - if expectedCnt != int(cnt) { - panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", expectedCnt, cnt)) + if actualCnt != expectedCnt { + panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt)) } } -func (d *ddl) checkDeleteRangeCnt(job *model.Job) { - sctx, _ := d.sessPool.get() +func queryDeleteRangeCnt(sessPool *sessionPool, jobID int64) (int, error) { + sctx, _ := sessPool.get() s, _ := sctx.(sqlexec.SQLExecutor) defer func() { - d.sessPool.put(sctx) + sessPool.put(sctx) }() query := `select sum(cnt) from (select count(1) cnt from mysql.gc_delete_range where job_id = %? union all select count(1) cnt from mysql.gc_delete_range_done where job_id = %?) as gdr;` - rs, err := s.ExecuteInternal(context.TODO(), query, job.ID, job.ID) + rs, err := s.ExecuteInternal(context.TODO(), query, jobID, jobID) if err != nil { - if strings.Contains(err.Error(), "Not Supported") { - return - } - panic(err) + return 0, errors.Trace(err) } defer func() { _ = rs.Close() @@ -74,83 +71,77 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) { req := rs.NewChunk(nil) err = rs.Next(context.TODO(), req) if err != nil { - panic("should not happened, err:" + err.Error()) + return 0, errors.Trace(err) } cnt, _ := req.GetRow(0).GetMyDecimal(0).ToInt() + return int(cnt), nil +} +func expectedDeleteRangeCnt(job *model.Job) (int, error) { switch job.Type { case model.ActionDropSchema: var tableIDs []int64 if err := job.DecodeArgs(&tableIDs); err != nil { - panic("should not happened") - } - if len(tableIDs) != int(cnt) { - panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(tableIDs), cnt)) + return 0, errors.Trace(err) } + return len(tableIDs), nil case model.ActionDropTable, model.ActionTruncateTable: var startKey kv.Key var physicalTableIDs []int64 var ruleIDs []string if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil { - panic("Error in drop/truncate table, please report a bug with this stack trace and how it happened") + return 0, errors.Trace(err) } - checkRangeCntByTableIDs(physicalTableIDs, cnt) + return mathutil.Max(len(physicalTableIDs), 1), nil case model.ActionDropTablePartition, model.ActionTruncateTablePartition: var physicalTableIDs []int64 if err := job.DecodeArgs(&physicalTableIDs); err != nil { - panic("should not happened") - } - if len(physicalTableIDs) != int(cnt) { - panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt)) + return 0, errors.Trace(err) } + return len(physicalTableIDs), nil case model.ActionAddIndex, model.ActionAddPrimaryKey: var indexID int64 var partitionIDs []int64 if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil { - panic("should not happened") + return 0, errors.Trace(err) } - checkRangeCntByTableIDs(partitionIDs, cnt) + return mathutil.Max(len(partitionIDs), 1), nil case model.ActionDropIndex, model.ActionDropPrimaryKey: var indexName interface{} var indexID int64 var partitionIDs []int64 if err := job.DecodeArgs(&indexName, &indexID, &partitionIDs); err != nil { - panic("should not happened") + return 0, errors.Trace(err) } - checkRangeCntByTableIDsAndIndexIDs(partitionIDs, []int64{indexID}, cnt) + return mathutil.Max(len(partitionIDs), 1), nil case model.ActionDropIndexes: var indexIDs []int64 var partitionIDs []int64 if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil { - panic("should not happened") + return 0, errors.Trace(err) } - checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + physicalCnt := mathutil.Max(len(partitionIDs), 1) + return physicalCnt * len(indexIDs), nil case model.ActionDropColumn: var colName model.CIStr var ifExists bool var indexIDs []int64 var partitionIDs []int64 if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil { - panic("should not happened") - } - checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) - case model.ActionDropColumns: - var colNames []model.CIStr - var ifExists []bool - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&colNames, &ifExists, &indexIDs, &partitionIDs); err != nil { - panic("should not happened") + return 0, errors.Trace(err) } - checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + physicalCnt := mathutil.Max(len(partitionIDs), 1) + return physicalCnt * len(indexIDs), nil case model.ActionModifyColumn: var indexIDs []int64 var partitionIDs []int64 if err := job.DecodeArgs(&indexIDs, &partitionIDs); err != nil { - panic("should not happened") + return 0, errors.Trace(err) } - checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + physicalCnt := mathutil.Max(len(partitionIDs), 1) + return physicalCnt * len(indexIDs), nil } + return 0, nil } // checkHistoryJobInTest does some sanity check to make sure something is correct after DDL complete. From f32ce61f256f66f2a8cb3a9a65b26af46b17151e Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 30 Jun 2022 23:09:53 +0800 Subject: [PATCH 2/2] support multi-schema change --- ddl/sanity_check.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index b52c90cb7b87f..090bc03f6d0a9 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -140,6 +140,17 @@ func expectedDeleteRangeCnt(job *model.Job) (int, error) { } physicalCnt := mathutil.Max(len(partitionIDs), 1) return physicalCnt * len(indexIDs), nil + case model.ActionMultiSchemaChange: + totalExpectedCnt := 0 + for _, sub := range job.MultiSchemaInfo.SubJobs { + p := sub.ToProxyJob(job) + cnt, err := expectedDeleteRangeCnt(p) + if err != nil { + return 0, err + } + totalExpectedCnt += cnt + } + return totalExpectedCnt, nil } return 0, nil }