Skip to content

Commit

Permalink
lightning: maintain task meta in singleTaskMetaMgr (#34214) (#34225)
Browse files Browse the repository at this point in the history
close #34213
  • Loading branch information
ti-srebot authored May 23, 2022
1 parent b8fbab3 commit 1ebb154
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
32 changes: 27 additions & 5 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,15 +1034,18 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}
type singleMgrBuilder struct {
taskID int64
}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
pd: pd,
taskID: b.taskID,
}
}

Expand All @@ -1051,15 +1054,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
}
}
return err
}

Expand All @@ -1068,7 +1090,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
return m.initialized, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql/driver"
"sort"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -323,3 +324,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) {
c.Assert(err, IsNil)

}

func (s *taskMetaMgrSuite) TestSingleTaskMetaMgr(c *C) {
metaBuilder := singleMgrBuilder{
taskID: time.Now().UnixNano(),
}
metaMgr := metaBuilder.TaskMetaMgr(nil)

ok, err := metaMgr.CheckTaskExist(context.Background())
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)

err = metaMgr.InitTask(context.Background(), 1<<30)
c.Assert(err, IsNil)

ok, err = metaMgr.CheckTaskExist(context.Background())
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
c.Assert(len(tasks), Equals, 1)
c.Assert(tasks[0].sourceBytes, Equals, uint64(1<<30))
return nil, nil
})
c.Assert(err, IsNil)
}
18 changes: 16 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser(
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
metaBuilder = singleMgrBuilder{
taskID: cfg.TaskID,
}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx)
if err != nil {
return errors.Trace(err)
}
// If task checkpoint is initialized, it means check has been performed before.
// We don't need and shouldn't check again, because lightning may have already imported some data.
needCheck = taskCheckpoints == nil
}
if needCheck {
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 1ebb154

Please sign in to comment.