diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 6c009279e9e0f..a4cc5ecb2f154 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1051,7 +1051,9 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } -type singleMgrBuilder struct{} +type singleMgrBuilder struct { + taskID int64 +} func (b singleMgrBuilder) Init(context.Context) error { return nil @@ -1059,7 +1061,8 @@ func (b singleMgrBuilder) Init(context.Context) error { func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { return &singleTaskMetaMgr{ - pd: pd, + pd: pd, + taskID: b.taskID, } } @@ -1068,15 +1071,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { } type singleTaskMetaMgr struct { - pd *pdutil.PdController + pd *pdutil.PdController + taskID int64 + initialized bool + sourceBytes uint64 + clusterAvail uint64 } func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + m.sourceBytes = uint64(source) + m.initialized = true return nil } func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { - _, err := action(nil) + newTasks, err := action([]taskMeta{ + { + taskID: m.taskID, + status: taskMetaStatusInitial, + sourceBytes: m.sourceBytes, + clusterAvail: m.clusterAvail, + }, + }) + for _, t := range newTasks { + if m.taskID == t.taskID { + m.sourceBytes = t.sourceBytes + m.clusterAvail = t.clusterAvail + } + } return err } @@ -1085,7 +1107,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut } func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { - return true, nil + return m.initialized, nil } func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 571a9d6cd5844..8480bf077d6de 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -7,6 +7,7 @@ import ( "database/sql/driver" "sort" "testing" + "time" "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -358,3 +359,28 @@ func (t *testChecksumMgr) Checksum(ctx context.Context, tableInfo *checkpoints.T t.callCnt++ return &t.checksum, nil } + +func TestSingleTaskMetaMgr(t *testing.T) { + metaBuilder := singleMgrBuilder{ + taskID: time.Now().UnixNano(), + } + metaMgr := metaBuilder.TaskMetaMgr(nil) + + ok, err := metaMgr.CheckTaskExist(context.Background()) + require.NoError(t, err) + require.False(t, ok) + + err = metaMgr.InitTask(context.Background(), 1<<30) + require.NoError(t, err) + + ok, err = metaMgr.CheckTaskExist(context.Background()) + require.NoError(t, err) + require.True(t, ok) + + err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) { + require.Len(t, tasks, 1) + require.Equal(t, uint64(1<<30), tasks[0].sourceBytes) + return nil, nil + }) + require.NoError(t, err) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 624a03a3c2c61..5ac2d791131f8 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -362,7 +362,9 @@ func NewRestoreControllerWithPauser( needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } case isSSTImport: - metaBuilder = singleMgrBuilder{} + metaBuilder = singleMgrBuilder{ + taskID: cfg.TaskID, + } default: metaBuilder = noopMetaMgrBuilder{} } @@ -1928,7 +1930,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err = rc.taskMgr.InitTask(ctx, source); err != nil { return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs() } - if rc.cfg.App.CheckRequirements { + } + if rc.cfg.App.CheckRequirements { + needCheck := true + if rc.cfg.Checkpoint.Enable { + taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx) + if err != nil { + return common.ErrReadCheckpoint.Wrap(err).GenWithStack("get task checkpoint failed") + } + // If task checkpoint is initialized, it means check has been performed before. + // We don't need and shouldn't check again, because lightning may have already imported some data. + needCheck = taskCheckpoints == nil + } + if needCheck { err = rc.localResource(source) if err != nil { return common.ErrCheckLocalResource.Wrap(err).GenWithStackByArgs()