Skip to content

Commit

Permalink
ddl/ingest: move backend ctx out of checkpoint manager (#54292)
Browse files Browse the repository at this point in the history
ref #42164
  • Loading branch information
tangenta authored Jul 12, 2024
1 parent 4fe31ed commit ceb20b7
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 151 deletions.
1 change: 0 additions & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(

cpMgr, err := ingest.NewCheckpointManager(
ctx,
bcCtx,
sessPool,
job.ID,
indexIDs,
Expand Down
17 changes: 11 additions & 6 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,15 +750,17 @@ func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(Index
return
}
w.rowCntListener.Written(rs.Added)
flushed, imported, err := w.backendCtx.Flush(ingest.FlushModeAuto)
if err != nil {
w.ctx.onError(err)
return
}
if w.cpMgr != nil {
totalCnt, nextKey := w.cpMgr.Status()
rs.Total = totalCnt
rs.Next = nextKey
err := w.cpMgr.UpdateWrittenKeys(ck.ID, rs.Added)
if err != nil {
w.ctx.onError(err)
return
}
w.cpMgr.UpdateWrittenKeys(ck.ID, rs.Added)
w.cpMgr.AdvanceWatermark(flushed, imported)
}
send(rs)
}
Expand Down Expand Up @@ -928,12 +930,15 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
_, _, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
flushed, imported, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
if err != nil {
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
if s.cpMgr != nil {
s.cpMgr.AdvanceWatermark(flushed, imported)
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
}
w.writeDDLSeqNum(job)
w.removeJobCtx(job)
failpoint.InjectCall("afterFinishDDLJob", job)
err = AddHistoryDDLJob(w.ctx, w.sess, t, job, updateRawArgs)
return errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,12 +1911,12 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
w.ddlCtx.mu.RLock()
w.ddlCtx.mu.hook.OnUpdateReorgInfo(reorgInfo.Job, reorgInfo.PhysicalTableID)
w.ddlCtx.mu.RUnlock()
failpoint.InjectCall("beforeUpdateReorgInfo-addTableIndex", reorgInfo.Job)

finish, err = updateReorgInfo(w.sessPool, tbl, reorgInfo)
if err != nil {
return errors.Trace(err)
}
failpoint.InjectCall("afterUpdatePartitionReorgInfo", reorgInfo.Job)
// Every time we finish a partition, we update the progress of the job.
if rc := w.getReorgCtx(reorgInfo.Job.ID); rc != nil {
reorgInfo.Job.SetRowCount(rc.getRowCount())
Expand Down Expand Up @@ -2140,7 +2140,7 @@ func estimateTableRowSize(
func estimateRowSizeFromRegion(ctx context.Context, store kv.Storage, tbl table.Table) (int, error) {
hStore, ok := store.(helper.Storage)
if !ok {
return 0, errors.New("not a helper.Storage")
return 0, fmt.Errorf("not a helper.Storage")
}
h := &helper.Helper{
Store: hStore,
Expand Down Expand Up @@ -2175,11 +2175,11 @@ func estimateRowSizeFromRegion(ctx context.Context, store kv.Storage, tbl table.
return 0, err
}
if len(regionInfos.Regions) != regionLimit {
return 0, errors.New("less than 3 regions")
return 0, fmt.Errorf("less than 3 regions")
}
sample := regionInfos.Regions[1]
if sample.ApproximateKeys == 0 || sample.ApproximateSize == 0 {
return 0, errors.New("zero approximate size")
return 0, fmt.Errorf("zero approximate size")
}
return int(uint64(sample.ApproximateSize)*size.MB) / int(sample.ApproximateKeys), nil
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) {
}
_ = bc.FinishAndUnregisterEngines()
bc.backend.Close()
if bc.checkpointMgr != nil {
bc.checkpointMgr.Close()
}
m.memRoot.Release(structSizeBackendCtx)
m.memRoot.ReleaseWithTag(encodeBackendTag(jobID))
logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("job ID", jobID),
Expand Down
42 changes: 26 additions & 16 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
type CheckpointManager struct {
ctx context.Context
cancel context.CancelFunc
flushCtrl FlushController
sessPool *sess.Pool
jobID int64
indexIDs []int64
Expand Down Expand Up @@ -103,7 +102,6 @@ type FlushController interface {
// NewCheckpointManager creates a new checkpoint manager.
func NewCheckpointManager(
ctx context.Context,
flushCtrl FlushController,
sessPool *sess.Pool,
jobID int64,
indexIDs []int64,
Expand All @@ -118,7 +116,6 @@ func NewCheckpointManager(
cm := &CheckpointManager{
ctx: ctx2,
cancel: cancel,
flushCtrl: flushCtrl,
sessPool: sessPool,
jobID: jobID,
indexIDs: indexIDs,
Expand Down Expand Up @@ -215,15 +212,17 @@ func (s *CheckpointManager) UpdateTotalKeys(taskID int, delta int, last bool) {

// UpdateWrittenKeys updates the written keys of the task.
// This is called by the writer after writing the local engine to update the current number of rows written.
func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error {
func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) {
s.mu.Lock()
cp := s.checkpoints[taskID]
cp.writtenKeys += delta
s.mu.Unlock()
}

flushed, imported, err := s.flushCtrl.Flush(FlushModeAuto)
if !flushed || err != nil {
return err
// AdvanceWatermark advances the watermark according to flushed or imported status.
func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool) {
if !flushed {
return
}

failpoint.Inject("resignAfterFlush", func() {
Expand All @@ -238,17 +237,10 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error {
s.mu.Lock()
defer s.mu.Unlock()
s.afterFlush()
if imported && s.importedKeyLowWatermark.Cmp(s.flushedKeyLowWatermark) != 0 {
// TODO(lance6716): add warning log if cmp > 0
s.importedKeyLowWatermark = s.flushedKeyLowWatermark
s.importedKeyCnt = s.flushedKeyCnt
s.dirty = true

s.pidImported = s.pidFlushed
s.startKeyImported = s.startKeyFlushed
s.endKeyImported = s.endKeyFlushed
if imported {
s.afterImport()
}
return nil
}

// afterFlush should be called after all engine is flushed.
Expand All @@ -266,10 +258,28 @@ func (s *CheckpointManager) afterFlush() {
}
}

func (s *CheckpointManager) afterImport() {
if s.importedKeyLowWatermark.Cmp(s.flushedKeyLowWatermark) > 0 {
s.logger.Warn("lower watermark of flushed key is less than imported key",
zap.String("flushed", hex.EncodeToString(s.flushedKeyLowWatermark)),
zap.String("imported", hex.EncodeToString(s.importedKeyLowWatermark)),
)
return
}
s.importedKeyLowWatermark = s.flushedKeyLowWatermark
s.importedKeyCnt = s.flushedKeyCnt
s.dirty = true

s.startKeyImported = s.startKeyFlushed
s.pidImported = s.pidFlushed
s.endKeyImported = s.endKeyFlushed
}

// Close closes the checkpoint manager.
func (s *CheckpointManager) Close() {
s.mu.Lock()
s.afterFlush()
s.afterImport()
s.mu.Unlock()

err := s.updateCheckpoint()
Expand Down
44 changes: 20 additions & 24 deletions pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,37 @@ func TestCheckpointManager(t *testing.T) {
}, 8, 8, 0)
ctx := context.Background()
sessPool := session.NewSessionPool(rs)
flushCtrl := &dummyFlushCtrl{imported: false}
tmpFolder := t.TempDir()
createDummyFile(t, tmpFolder)
mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder, mockGetTSClient{})
mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, []int64{1}, tmpFolder, mockGetTSClient{})
require.NoError(t, err)
defer mgr.Close()

mgr.Register(1, []byte{'1', '9'})
mgr.Register(2, []byte{'2', '9'})
mgr.UpdateTotalKeys(1, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.UpdateWrittenKeys(1, 100)
mgr.AdvanceWatermark(true, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.UpdateWrittenKeys(1, 100)
mgr.AdvanceWatermark(true, false)
// The data is not imported to the storage yet.
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
flushCtrl.imported = true // Mock the data is imported to the storage.
require.NoError(t, mgr.UpdateWrittenKeys(2, 0))
mgr.UpdateWrittenKeys(2, 0)
mgr.AdvanceWatermark(true, true) // Mock the data is imported to the storage.
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))

// Only when the last batch is completed, the job can be completed.
mgr.UpdateTotalKeys(2, 50, false)
mgr.UpdateTotalKeys(2, 50, true)
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
mgr.UpdateWrittenKeys(2, 50)
mgr.AdvanceWatermark(true, true)
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
mgr.UpdateWrittenKeys(2, 50)
mgr.AdvanceWatermark(true, true)
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'}))

Expand All @@ -99,8 +102,10 @@ func TestCheckpointManager(t *testing.T) {
mgr.UpdateTotalKeys(3, 100, true)
mgr.UpdateTotalKeys(4, 100, true)
mgr.UpdateTotalKeys(5, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(5, 100))
require.NoError(t, mgr.UpdateWrittenKeys(4, 100))
mgr.UpdateWrittenKeys(5, 100)
mgr.AdvanceWatermark(true, true)
mgr.UpdateWrittenKeys(4, 100)
mgr.AdvanceWatermark(true, true)
require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'}))
}
Expand All @@ -116,15 +121,15 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) {
}, 8, 8, 0)
ctx := context.Background()
sessPool := session.NewSessionPool(rs)
flushCtrl := &dummyFlushCtrl{imported: true}
tmpFolder := t.TempDir()
createDummyFile(t, tmpFolder)
mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder, mockGetTSClient{})
mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, []int64{1}, tmpFolder, mockGetTSClient{})
require.NoError(t, err)

mgr.Register(1, []byte{'1', '9'})
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.UpdateWrittenKeys(1, 100)
mgr.AdvanceWatermark(true, true)
mgr.Close() // Wait the global checkpoint to be updated to the reorg table.
r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;")
require.NoError(t, err)
Expand Down Expand Up @@ -168,10 +173,9 @@ func TestCheckpointManagerResumeReorg(t *testing.T) {
}, 8, 8, 0)
ctx := context.Background()
sessPool := session.NewSessionPool(rs)
flushCtrl := &dummyFlushCtrl{imported: false}
tmpFolder := t.TempDir()
// checkpoint manager should not use local checkpoint if the folder is empty
mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder, nil)
mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, []int64{1}, tmpFolder, nil)
require.NoError(t, err)
defer mgr.Close()
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
Expand All @@ -182,7 +186,7 @@ func TestCheckpointManagerResumeReorg(t *testing.T) {
require.EqualValues(t, 123456, mgr.GetTS())

createDummyFile(t, tmpFolder)
mgr2, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder, nil)
mgr2, err := ingest.NewCheckpointManager(ctx, sessPool, 1, []int64{1}, tmpFolder, nil)
require.NoError(t, err)
defer mgr2.Close()
require.True(t, mgr2.IsKeyProcessed([]byte{'1', '9'}))
Expand All @@ -192,11 +196,3 @@ func TestCheckpointManagerResumeReorg(t *testing.T) {
require.EqualValues(t, []byte{'1', '9'}, globalNextKey)
require.EqualValues(t, 123456, mgr.GetTS())
}

type dummyFlushCtrl struct {
imported bool
}

func (d *dummyFlushCtrl) Flush(mode ingest.FlushMode) (bool, bool, error) {
return true, d.imported, nil
}
17 changes: 0 additions & 17 deletions pkg/ddl/ingest/tests/BUILD.bazel

This file was deleted.

61 changes: 0 additions & 61 deletions pkg/ddl/ingest/tests/partition_table_test.go

This file was deleted.

Loading

0 comments on commit ceb20b7

Please sign in to comment.