Skip to content

Commit

Permalink
ddl: replace local ingest impl with backfill operators (pingcap#54149)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored and ti-chi-bot committed Jun 26, 2024
1 parent b3ed886 commit 11c56fc
Show file tree
Hide file tree
Showing 19 changed files with 396 additions and 676 deletions.
159 changes: 153 additions & 6 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
Expand Down Expand Up @@ -619,6 +620,154 @@ func SetBackfillTaskChanSizeForTest(n int) {
backfillTaskChanSize = n
}

func (dc *ddlCtx) runAddIndexInLocalIngestMode(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
reorgInfo *reorgInfo,
) error {
// TODO(tangenta): support adjust worker count dynamically.
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
return errors.Trace(err)
}
job := reorgInfo.Job
opCtx := NewLocalOperatorCtx(ctx, job.ID)
idxCnt := len(reorgInfo.elements)
indexIDs := make([]int64, 0, idxCnt)
indexInfos := make([]*model.IndexInfo, 0, idxCnt)
uniques := make([]bool, 0, idxCnt)
hasUnique := false
for _, e := range reorgInfo.elements {
indexIDs = append(indexIDs, e.ID)
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, e.ID)
if indexInfo == nil {
logutil.DDLIngestLogger().Warn("index info not found",
zap.Int64("jobID", job.ID),
zap.Int64("tableID", t.Meta().ID),
zap.Int64("indexID", e.ID))
return errors.Errorf("index info not found: %d", e.ID)
}
indexInfos = append(indexInfos, indexInfo)
uniques = append(uniques, indexInfo.Unique)
hasUnique = hasUnique || indexInfo.Unique
}

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
if err != nil {
return errors.Trace(err)
}
defer ingest.LitBackCtxMgr.Unregister(job.ID)
engines, err := bcCtx.Register(indexIDs, uniques, t.Meta())
if err != nil {
logutil.DDLIngestLogger().Error("cannot register new engine",
zap.Int64("jobID", job.ID),
zap.Error(err),
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}
defer bcCtx.UnregisterEngines()
sctx, err := sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer sessPool.Put(sctx)

cpMgr, err := ingest.NewCheckpointManager(
ctx,
bcCtx,
sessPool,
job.ID,
indexIDs,
bcCtx.GetLocalBackend().LocalStoreDir,
dc.store.(kv.StorageWithPD).GetPDClient(),
)
if err != nil {
logutil.DDLIngestLogger().Warn("create checkpoint manager failed",
zap.Int64("jobID", job.ID),
zap.Error(err))
} else {
defer cpMgr.Close()
cpMgr.Reset(t.GetPhysicalID(), reorgInfo.StartKey, reorgInfo.EndKey)
bcCtx.AttachCheckpointManager(cpMgr)
}

reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID)
rowCntListener := &localRowCntListener{
prevPhysicalRowCnt: reorgCtx.getRowCount(),
reorgCtx: dc.getReorgCtx(reorgInfo.Job.ID),
counter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)),
}

avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t)
concurrency := int(variable.GetDDLReorgWorkerCounter())

pipe, err := NewAddIndexIngestPipeline(
opCtx,
dc.store,
sessPool,
bcCtx,
engines,
job.ID,
t,
indexInfos,
reorgInfo.StartKey,
reorgInfo.EndKey,
job.ReorgMeta,
avgRowSize,
concurrency,
cpMgr,
rowCntListener,
)
if err != nil {
return err
}
err = pipe.Execute()
if err != nil {
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
}
if err != nil {
return err
}
for i, isUK := range uniques {
if isUK {
err := bcCtx.CollectRemoteDuplicateRows(indexIDs[i], t)
if err != nil {
return err
}
}
}
return nil
}

type localRowCntListener struct {
EmptyRowCntListener
reorgCtx *reorgCtx
counter prometheus.Counter

// prevPhysicalRowCnt records the row count from previous physical tables (partitions).
prevPhysicalRowCnt int64
// curPhysicalRowCnt records the row count of current physical table.
curPhysicalRowCnt int64
}

func (s *localRowCntListener) Written(rowCnt int) {
s.curPhysicalRowCnt += int64(rowCnt)
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + s.curPhysicalRowCnt)
s.counter.Add(float64(rowCnt))
}

func (s *localRowCntListener) SetTotal(total int) {
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total))
}

// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
Expand Down Expand Up @@ -653,21 +802,19 @@ func (dc *ddlCtx) writePhysicalTableRecord(
failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(reorgInfo.Job.ErrorCount)) + ", mock unknown type: ast.whenClause."))
}
})
if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return dc.runAddIndexInLocalIngestMode(ctx, sessPool, t, reorgInfo)
}

jc := reorgInfo.NewJobContext()

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)

scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
scheduler, err := newTxnBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
if err != nil {
return errors.Trace(err)
}
defer scheduler.close(true)
if lit, ok := scheduler.(*ingestBackfillScheduler); ok {
if lit.importStarted() {
return nil
}
}

err = scheduler.setupWorkers()
if err != nil {
Expand Down
Loading

0 comments on commit 11c56fc

Please sign in to comment.