Skip to content

Commit

Permalink
ddl: support distributed adding index for normal table (#43289) (#44065)
Browse files Browse the repository at this point in the history
close #43290
  • Loading branch information
ti-chi-bot authored May 23, 2023
1 parent 533ee67 commit fd23141
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 101 deletions.
6 changes: 3 additions & 3 deletions ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ func TestPickBackfillType(t *testing.T) {
},
}
variable.EnableFastReorg.Store(true)
tp, err := pickBackfillType(mockCtx, mockJob, uk)
tp, err := pickBackfillType(mockCtx, mockJob, uk, nil)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxn)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = false
tp, err = pickBackfillType(mockCtx, mockJob, uk)
tp, err = pickBackfillType(mockCtx, mockJob, uk, nil)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxnMerge)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = true
tp, err = pickBackfillType(mockCtx, mockJob, uk)
tp, err = pickBackfillType(mockCtx, mockJob, uk, nil)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeLitMerge)
}
4 changes: 3 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,10 @@ func (dc *ddlCtx) removeReorgCtx(jobID int64) {
func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
rc := dc.getReorgCtx(job.ID)
if rc == nil {
logutil.BgLogger().Error("cannot find reorgCtx", zap.Int64("jobID", job.ID))
return
}
logutil.BgLogger().Info("[ddl] notify reorg worker during canceling ddl job", zap.Int64("jobID", job.ID))
rc.notifyJobState(job.State)
}

Expand Down Expand Up @@ -678,7 +680,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {

scheduler.RegisterSchedulerConstructor("backfill",
func(taskMeta []byte, step int64) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(taskMeta, d)
return NewBackfillSchedulerHandle(taskMeta, d, step == proto.StepTwo)
})

dispatcher.RegisterTaskFlowHandle(BackfillTaskType, NewLitBackfillFlowHandle(d))
Expand Down
12 changes: 0 additions & 12 deletions ddl/dist_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ package ddl

import (
"time"

"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
)

// CheckBackfillJobFinishInterval is export for test.
Expand All @@ -28,11 +24,3 @@ var CheckBackfillJobFinishInterval = 300 * time.Millisecond
const (
distPhysicalTableConcurrency = 16
)

func initDistReorg(reorgMeta *model.DDLReorgMeta) {
isDistReorg := variable.EnableDistTask.Load()
reorgMeta.IsDistReorg = isDistReorg
if isDistReorg {
metrics.TelemetryDistReorgCnt.Inc()
}
}
117 changes: 95 additions & 22 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
package ddl

import (
"bytes"
"context"
"encoding/json"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
"github.com/tikv/client-go/v2/tikv"
)

type litBackfillFlowHandle struct {
Expand All @@ -39,11 +44,6 @@ func NewLitBackfillFlowHandle(d DDL) dispatcher.TaskFlowHandle {

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State != proto.TaskStatePending {
// This flow has only one step, finish task when it is not pending
return nil, nil
}

var globalTaskMeta BackfillGlobalMeta
if err = json.Unmarshal(gTask.Meta, &globalTaskMeta); err != nil {
return nil, err
Expand All @@ -56,33 +56,106 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche

job := &globalTaskMeta.Job
var tblInfo *model.TableInfo
err = kv.RunInNewTxn(d.ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error {
err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})

var subTaskMetas [][]byte
if tblInfo.Partition == nil {
return nil, errors.New("Non-partition table not supported yet")
}

defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
physicalIDs[i] = defs[i].ID
}

subTaskMetas := make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
switch gTask.Step {
case proto.StepOne:
serverNodes, err := dispatcher.GenerateSchedulerNodes(d.ctx)
if err != nil {
return nil, err
}
subTaskMetas = make([][]byte, 0, len(serverNodes))
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)
if err != nil {
return nil, err
}
for range serverNodes {
subTaskMetas = append(subTaskMetas, metaBytes)
}
gTask.Step = proto.StepTwo
return subTaskMetas, nil
case proto.StepTwo:
return nil, nil
default:
}

metaBytes, err := json.Marshal(subTaskMeta)
tbl, err := getTable(d.store, job.SchemaID, tblInfo)
if err != nil {
return nil, err
}
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
}
startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority)
if startKey == nil && endKey == nil {
// Empty table.
gTask.Step = proto.StepOne
return nil, nil
}
if err != nil {
return nil, errors.Trace(err)
}
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
return nil, err
}

subTaskMetas = make([][]byte, 0, 100)
regionBatch := 20
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
}
batch := recordRegionMetas[i:end]
subTaskMeta := &BackfillSubTaskMeta{StartKey: batch[0].StartKey(), EndKey: batch[len(batch)-1].EndKey()}
if i == 0 {
subTaskMeta.StartKey = startKey
}
if end == len(recordRegionMetas) {
subTaskMeta.EndKey = endKey
}
metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}
subTaskMetas = append(subTaskMetas, metaBytes)
}
} else {
if gTask.State != proto.TaskStatePending {
// This flow for partition table has only one step, finish task when it is not pending
return nil, nil
}

subTaskMetas = append(subTaskMetas, metaBytes)
defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
physicalIDs[i] = defs[i].ID
}

subTaskMetas = make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
}

metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}

subTaskMetas = append(subTaskMetas, metaBytes)
}
}

gTask.Step = proto.StepOne
Expand Down
3 changes: 1 addition & 2 deletions ddl/disttask_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ func TestBackfillFlowHandle(t *testing.T) {
require.NoError(t, err)
require.Nil(t, errMeta)

// test normal table not supported yet
tk.MustExec("create table t1(id int primary key, v int)")
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.BackfillTaskType)
_, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.EqualError(t, err, "Non-partition table not supported yet")
require.NoError(t, err)
}

func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType string) *proto.Task {
Expand Down
23 changes: 13 additions & 10 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateNone:
// none -> delete only
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique, d)
if err != nil {
break
}
Expand Down Expand Up @@ -666,10 +666,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
// Initialize SnapshotVer to 0 for later reorganization check.
job.SnapshotVer = 0
job.SchemaState = model.StateWriteReorganization

if job.MultiSchemaInfo == nil && tblInfo.GetPartitionInfo() != nil {
initDistReorg(job.ReorgMeta)
}
case model.StateWriteReorganization:
// reorganization -> public
tbl, err := getTable(d.store, schemaID, tblInfo)
Expand Down Expand Up @@ -716,7 +712,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

// pickBackfillType determines which backfill process will be used.
func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.ReorgType, error) {
func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCtx) (model.ReorgType, error) {
if job.ReorgMeta.ReorgTp != model.ReorgTypeNone {
// The backfill task has been started.
// Don't change the backfill type.
Expand All @@ -736,11 +732,18 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R
if err != nil {
return model.ReorgTypeNone, err
}
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID)
if variable.EnableDistTask.Load() {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil)
}
if err != nil {
return model.ReorgTypeNone, err
}
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
if variable.EnableDistTask.Load() {
job.ReorgMeta.IsDistReorg = true
}
return model.ReorgTypeLitMerge, nil
}
}
Expand Down Expand Up @@ -834,7 +837,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique, d)
if err != nil {
return false, ver, err
}
Expand Down Expand Up @@ -905,7 +908,7 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if ok && bc.Done() {
return true, 0, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID)
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1802,7 +1805,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
return errors.New("unexpected error, can't find index info")
}
if indexInfo.Unique {
bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID)
bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID, nil)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ go_library(
"//util/size",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
Loading

0 comments on commit fd23141

Please sign in to comment.