Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: replace local ingest impl with backfill operators #54149

Merged
merged 32 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9e3363a
ddl: replace local ingest impl with backfill operators
tangenta Jun 21, 2024
10ad6c8
fix linter
tangenta Jun 21, 2024
b266168
fix build
tangenta Jun 21, 2024
b63c560
wip
tangenta Jun 21, 2024
2b9cd57
Merge remote-tracking branch 'upstream/master' into add-index-replace-op
tangenta Jun 21, 2024
0a381d6
fix linter
tangenta Jun 21, 2024
c616974
wip
tangenta Jun 21, 2024
04f7ebf
debug
tangenta Jun 24, 2024
949a0e4
fix panic issue
tangenta Jun 24, 2024
1bff0a4
clarify flushed & imported semantic of RowCntListener
tangenta Jun 24, 2024
aa74f54
fix row count incorrect issue
tangenta Jun 24, 2024
1e8c0bd
fix linter
tangenta Jun 24, 2024
bd6db72
remove old ingest worker impl
tangenta Jun 24, 2024
83d0105
fix TestAddIndexIngestPanic
tangenta Jun 24, 2024
e972365
unify collect remote and finish import
tangenta Jun 24, 2024
4a27784
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Jun 24, 2024
8ab823b
bring back TestAddIndexMockFlushError
tangenta Jun 25, 2024
86f3a6d
refine naming
tangenta Jun 25, 2024
3518897
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Jun 25, 2024
545c9e9
address comment
tangenta Jun 25, 2024
f0c2885
fix build
tangenta Jun 25, 2024
26f16c6
fix linter
tangenta Jun 25, 2024
b81b2c3
use logutil.DDLIngestLogger() instead
tangenta Jun 25, 2024
3dfe9bf
pass reorgCtx into localRowCntListener
tangenta Jun 25, 2024
b8cbbec
remove debug log
tangenta Jun 25, 2024
3c0fb73
fix typo
tangenta Jun 25, 2024
2c1f4ab
make job row count update in time
tangenta Jun 25, 2024
2b765cd
fix TestCheckpointManager
tangenta Jun 25, 2024
68692e1
skip TestAddIndexIngestRecoverPartition
tangenta Jun 25, 2024
559aab2
address comment
tangenta Jun 26, 2024
e9dbe39
address comment
tangenta Jun 26, 2024
462896f
enable disttask by default
tangenta Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 146 additions & 6 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/pingcap/tidb/pkg/util"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/topsql"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -619,6 +621,146 @@ func SetBackfillTaskChanSizeForTest(n int) {
backfillTaskChanSize = n
}

func (dc *ddlCtx) runAddIndexInLocalIngestMode(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
reorgInfo *reorgInfo,
) error {
// TODO(tangenta): support adjust worker count dynamically.
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
return errors.Trace(err)
}
job := reorgInfo.Job.Clone()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
ctx = tidblogutil.WithCategory(ctx, "ddl-ingest")
opCtx := NewLocalOperatorCtx(ctx, job.ID)
bcCtx, err := getBackendCtx(ctx, dc.store, dc.etcdCli, job)
if err != nil {
return errors.Trace(err)
}
idxCnt := len(reorgInfo.elements)
indexIDs := make([]int64, 0, idxCnt)
indexInfos := make([]*model.IndexInfo, 0, idxCnt)
uniques := make([]bool, 0, idxCnt)
for _, e := range reorgInfo.elements {
indexIDs = append(indexIDs, e.ID)
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, e.ID)
if indexInfo == nil {
logutil.DDLIngestLogger().Warn("index info not found",
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
zap.Int64("table ID", t.Meta().ID),
zap.Int64("index ID", e.ID))
return errors.Errorf("index info not found: %d", e.ID)
}
indexInfos = append(indexInfos, indexInfo)
uniques = append(uniques, indexInfo.Unique)
}

engines, err := bcCtx.Register(indexIDs, uniques, job.TableName)
if err != nil {
tidblogutil.Logger(opCtx).Error("cannot register new engine",
zap.Error(err),
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}
defer bcCtx.UnregisterEngines()
sctx, err := sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer sessPool.Put(sctx)

cpMgr, err := ingest.NewCheckpointManager(
ctx,
bcCtx,
sessPool,
job.ID,
indexIDs,
bcCtx.GetLocalBackend().LocalStoreDir,
dc.store.(kv.StorageWithPD).GetPDClient(),
)
if err != nil {
logutil.DDLIngestLogger().Warn("create checkpoint manager failed", zap.Error(err))
} else {
defer cpMgr.Close()
cpMgr.Reset(t.GetPhysicalID(), startKey, endKey)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
bcCtx.AttachCheckpointManager(cpMgr)
}

reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID)
previousTotal := reorgCtx.getRowCount()
var rowCnt int64
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
rowCntListener := &localRowCntListener{
flushed: func(cnt int) {
rowCnt += int64(cnt)
reorgCtx.setRowCount(previousTotal + rowCnt)
},
setTotal: func(total int) {
reorgCtx.setRowCount(previousTotal + int64(total))
},
counter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)),
}

avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t)
concurrency := int(variable.GetDDLReorgWorkerCounter())

pipe, err := NewAddIndexIngestPipeline(
opCtx,
dc.store,
sessPool,
bcCtx,
engines,
job.ID,
t,
indexInfos,
startKey,
endKey,
job.ReorgMeta,
avgRowSize,
concurrency,
cpMgr,
rowCntListener,
)
if err != nil {
return err
}
err = pipe.Execute()
if err != nil {
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
}
if err != nil {
return err
}
for _, indexID := range indexIDs {
err := bcCtx.CollectRemoteDuplicateRows(indexID, t)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}
return nil
}

type localRowCntListener struct {
EmptyRowCntListener
flushed func(int)
setTotal func(int)
counter prometheus.Counter
}

func (s *localRowCntListener) Flushed(rowCnt int) {
s.flushed(rowCnt)
s.counter.Add(float64(rowCnt))
}

func (s *localRowCntListener) SetTotal(total int) {
s.setTotal(total)
}

// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
Expand Down Expand Up @@ -653,21 +795,19 @@ func (dc *ddlCtx) writePhysicalTableRecord(
failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(reorgInfo.Job.ErrorCount)) + ", mock unknown type: ast.whenClause."))
}
})
if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return dc.runAddIndexInLocalIngestMode(ctx, sessPool, t, reorgInfo)
}

jc := reorgInfo.NewJobContext()

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)

scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
scheduler, err := newTxnBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
if err != nil {
return errors.Trace(err)
}
defer scheduler.close(true)
if lit, ok := scheduler.(*ingestBackfillScheduler); ok {
if lit.importStarted() {
return nil
}
}

err = scheduler.setupWorkers()
if err != nil {
Expand Down
18 changes: 13 additions & 5 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/tikv/client-go/v2/tikv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -139,15 +141,21 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(
}

func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
job := &s.taskMeta.Job
return getBackendCtx(
s.BaseTaskExecutor.Ctx(),
s.d.store,
s.d.etcdCli,
&s.taskMeta.Job,
)
}

func getBackendCtx(ctx context.Context, store kv.Storage, etcdCli *clientv3.Client, job *model.Job) (ingest.BackendCtx, error) {
hasUnique, err := hasUniqueIndex(job)
if err != nil {
return nil, err
}
ddlObj := s.d
discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()

return ingest.LitBackCtxMgr.Register(s.BaseTaskExecutor.Ctx(), job.ID, hasUnique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
return ingest.LitBackCtxMgr.Register(ctx, job.ID, hasUnique, etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
}

func hasUniqueIndex(job *model.Job) (bool, error) {
Expand Down
Loading