Skip to content

Commit

Permalink
ddl: allocate the element ID correctly for GC delete range (pingcap#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Apr 13, 2022
1 parent f0a9b8b commit e3b5d22
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 121 deletions.
25 changes: 5 additions & 20 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,11 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

err = deleteRangeForDropSchemaObjectJob(w, job)
if err != nil {
return errors.Trace(err)
if jobNeedGC(job) {
err = w.deleteRange(w.ddlJobCtx, job)
if err != nil {
return errors.Trace(err)
}
}

switch job.Type {
Expand Down Expand Up @@ -495,23 +497,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func deleteRangeForDropSchemaObjectJob(w *worker, job *model.Job) error {
if jobNeedGC(job) {
if job.Type == model.ActionMultiSchemaChange {
for _, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := cloneFromSubJob(job, sub)
err := deleteRangeForDropSchemaObjectJob(w, proxyJob)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
return w.deleteRange(w.ddlJobCtx, job)
}
return nil
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down
69 changes: 43 additions & 26 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (dr *delRange) addDelRangeJob(ctx context.Context, job *model.Job) error {
}
defer dr.sessPool.put(sctx)

err = insertJobIntoDeleteRangeTable(ctx, sctx, job)
if job.MultiSchemaInfo != nil {
err = insertJobIntoDeleteRangeTableMultiSchema(ctx, sctx, job)
} else {
err = insertJobIntoDeleteRangeTable(ctx, sctx, job, &elementIDAlloc{})
}
if err != nil {
logutil.BgLogger().Error("[ddl] add job into delete-range table failed", zap.Int64("jobID", job.ID), zap.String("jobType", job.Type.String()), zap.Error(err))
return errors.Trace(err)
Expand All @@ -106,6 +110,20 @@ func (dr *delRange) addDelRangeJob(ctx context.Context, job *model.Job) error {
return nil
}

func insertJobIntoDeleteRangeTableMultiSchema(ctx context.Context, sctx sessionctx.Context, job *model.Job) error {
var ea elementIDAlloc
for _, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := cloneFromSubJob(job, sub)
if jobNeedGC(proxyJob) {
err := insertJobIntoDeleteRangeTable(ctx, sctx, proxyJob, &ea)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}

// removeFromGCDeleteRange implements delRangeManager interface.
func (dr *delRange) removeFromGCDeleteRange(ctx context.Context, jobID int64, tableIDs []int64) error {
sctx, err := dr.sessPool.get()
Expand Down Expand Up @@ -237,24 +255,14 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
return nil
}

type elementIDAlloc struct {
id int64
}

func (ea *elementIDAlloc) alloc() int64 {
ea.id++
return ea.id
}

// insertJobIntoDeleteRangeTable parses the job into delete-range arguments,
// and inserts a new record into gc_delete_range table. The primary key is
// (job ID, element ID), so we ignore key conflict error.
func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job) error {
func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job, ea *elementIDAlloc) error {
now, err := getNowTSO(sctx)
if err != nil {
return errors.Trace(err)
}
var ea elementIDAlloc

s := sctx.(sqlexec.SQLExecutor)
switch job.Type {
Expand All @@ -268,7 +276,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}
if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &ea); err != nil {
if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, ea); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -285,15 +293,17 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range physicalTableIDs {
startKey = tablecodec.EncodeTablePrefix(pid)
endKey := tablecodec.EncodeTablePrefix(pid + 1)
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil {
elemID := ea.allocForPartitionID(pid)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
return nil
}
startKey = tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
elemID := ea.allocForTableID(tableID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
Expand All @@ -302,7 +312,8 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, physicalTableID := range physicalTableIDs {
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil {
elemID := ea.allocForPartitionID(physicalTableID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -319,14 +330,16 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
elemID := ea.allocForIndexID(indexID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
elemID := ea.allocForIndexID(indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
Expand All @@ -341,14 +354,16 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
elemID := ea.allocForIndexID(indexID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
elemID := ea.allocForIndexID(indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
}
case model.ActionDropColumn:
var colName model.CIStr
Expand All @@ -361,12 +376,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if len(indexIDs) > 0 {
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, ea); err != nil {
return errors.Trace(err)
}
}
} else {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, ea)
}
}
case model.ActionModifyColumn:
Expand All @@ -379,10 +394,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, ea)
}
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, ea); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -404,7 +419,8 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID,
if i != len(indexIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts)
elemID := ea.allocForIndexID(indexID)
paramsList = append(paramsList, jobID, elemID, startKeyEncoded, endKeyEncoded, ts)
}
_, err := s.ExecuteInternal(ctx, buf.String(), paramsList...)
return errors.Trace(err)
Expand Down Expand Up @@ -437,7 +453,8 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl
if i != len(tableIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts)
elemID := ea.allocForTableID(tableID)
paramsList = append(paramsList, jobID, elemID, startKeyEncoded, endKeyEncoded, ts)
}
// set session disk full opt
s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
Expand Down
55 changes: 55 additions & 0 deletions ddl/delete_range_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

const (
elemTable byte = 't'
elemPartition byte = 'p'
elemIndex byte = 'i'
)

type elementObjID struct {
tp byte
id int64
}

type elementIDAlloc struct {
objIDs map[elementObjID]int64
}

func (e *elementIDAlloc) allocForIndexID(indexID int64) int64 {
return e.alloc(elemIndex, indexID)
}

func (e *elementIDAlloc) allocForTableID(tableID int64) int64 {
return e.alloc(elemTable, tableID)
}

func (e *elementIDAlloc) allocForPartitionID(partitionID int64) int64 {
return e.alloc(elemPartition, partitionID)
}

func (e *elementIDAlloc) alloc(tp byte, schemaObjID int64) int64 {
if e.objIDs == nil {
e.objIDs = make(map[elementObjID]int64)
}
objID := elementObjID{tp: tp, id: schemaObjID}
if elemID, found := e.objIDs[objID]; found {
return elemID
}
newElemID := int64(len(e.objIDs) + 1)
e.objIDs[objID] = newElemID
return newElemID
}
20 changes: 1 addition & 19 deletions ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/testbridge"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -77,30 +76,13 @@ func wrapJobIDExtCallback(oldCallback ddl.Callback) *testDDLJobIDCallback {
}
}

func setupJobIDExtCallback(ctx sessionctx.Context) (jobExt *testDDLJobIDCallback, tearDown func()) {
dom := domain.GetDomain(ctx)
originHook := dom.DDL().GetHook()
jobIDExt := wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
return jobIDExt, func() {
dom.DDL().SetHook(originHook)
}
}

func checkDelRangeCnt(tk *testkit.TestKit, jobID int64, cnt int) {
query := `select sum(cnt) from
(select count(1) cnt from mysql.gc_delete_range where job_id = ? union
(select count(1) cnt from mysql.gc_delete_range where job_id = ? union all
select count(1) cnt from mysql.gc_delete_range_done where job_id = ?) as gdr;`
tk.MustQuery(query, jobID, jobID).Check(testkit.Rows(strconv.Itoa(cnt)))
}

func checkDelRangeAdded(tk *testkit.TestKit, jobID int64, elemID int64) {
query := `select sum(cnt) from
(select count(1) cnt from mysql.gc_delete_range where job_id = ? and element_id = ? union
select count(1) cnt from mysql.gc_delete_range_done where job_id = ? and element_id = ?) as gdr;`
tk.MustQuery(query, jobID, elemID, jobID, elemID).Check(testkit.Rows("1"))
}

type testDDLJobIDCallback struct {
ddl.Callback
jobID int64
Expand Down
Loading

0 comments on commit e3b5d22

Please sign in to comment.