Skip to content

Commit

Permalink
worker(engine/dm): no need extStorage if sync is not fresh (#7319)
Browse files Browse the repository at this point in the history
ref #7304
  • Loading branch information
GMHDBJD committed Oct 12, 2022
1 parent 3752703 commit 84cb040
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 33 deletions.
57 changes: 53 additions & 4 deletions engine/executor/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/pingcap/tiflow/dm/config"
dmconfig "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/engine/framework"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
"github.com/pingcap/tiflow/engine/jobmaster/dm/metadata"
dcontext "github.com/pingcap/tiflow/engine/pkg/context"
"github.com/pingcap/tiflow/engine/pkg/deps"
Expand Down Expand Up @@ -102,6 +103,22 @@ func TestQueryStatusAPI(t *testing.T) {
Result: &dmpkg.ProcessResult{Errors: []*dmpkg.ProcessError{processError}},
Status: []byte(syncStatusBytes),
}
taskCfg = &config.TaskCfg{
JobCfg: config.JobCfg{
TargetDB: &dmconfig.DBConfig{},
Upstreams: []*config.UpstreamCfg{
{
MySQLInstance: dmconfig.MySQLInstance{
Mydumper: &dmconfig.MydumperConfig{},
Loader: &dmconfig.LoaderConfig{},
Syncer: &dmconfig.SyncerConfig{},
SourceID: "task-id",
},
DBCfg: &dmconfig.DBConfig{},
},
},
},
}
)

dctx := dcontext.Background()
Expand All @@ -111,7 +128,7 @@ func TestQueryStatusAPI(t *testing.T) {
}))
dctx = dctx.WithDeps(dp)

dmWorker := newDMWorker(dctx, "", frameModel.WorkerDMDump, &config.SubTaskConfig{SourceID: "task-id"}, 0)
dmWorker := newDMWorker(dctx, "", frameModel.WorkerDMDump, taskCfg)
unitHolder := &mockUnitHolder{}
dmWorker.unitHolder = unitHolder

Expand Down Expand Up @@ -148,7 +165,23 @@ func TestStopWorker(t *testing.T) {
}))
dctx = dctx.WithDeps(dp)

dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, &config.SubTaskConfig{SourceID: "task-id"}, 0)
taskCfg := &config.TaskCfg{
JobCfg: config.JobCfg{
TargetDB: &dmconfig.DBConfig{},
Upstreams: []*config.UpstreamCfg{
{
MySQLInstance: dmconfig.MySQLInstance{
Mydumper: &dmconfig.MydumperConfig{},
Loader: &dmconfig.LoaderConfig{},
Syncer: &dmconfig.SyncerConfig{},
SourceID: "task-id",
},
DBCfg: &dmconfig.DBConfig{},
},
},
},
}
dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, taskCfg)
dmWorker.BaseWorker = framework.MockBaseWorker("worker-id", "master-id", dmWorker)
dmWorker.BaseWorker.Init(context.Background())
dmWorker.unitHolder = &mockUnitHolder{}
Expand All @@ -169,7 +202,23 @@ func TestOperateTask(t *testing.T) {
}))
dctx = dctx.WithDeps(dp)

dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, &config.SubTaskConfig{SourceID: "task-id"}, 0)
taskCfg := &config.TaskCfg{
JobCfg: config.JobCfg{
TargetDB: &dmconfig.DBConfig{},
Upstreams: []*config.UpstreamCfg{
{
MySQLInstance: dmconfig.MySQLInstance{
Mydumper: &dmconfig.MydumperConfig{},
Loader: &dmconfig.LoaderConfig{},
Syncer: &dmconfig.SyncerConfig{},
SourceID: "task-id",
},
DBCfg: &dmconfig.DBConfig{},
},
},
},
}
dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, taskCfg)
dmWorker.BaseWorker = framework.MockBaseWorker("worker-id", "master-id", dmWorker)
dmWorker.BaseWorker.Init(context.Background())
mockUnitHolder := &mockUnitHolder{}
Expand Down
20 changes: 11 additions & 9 deletions engine/executor/dm/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func (f workerFactory) DeserializeConfig(configBytes []byte) (registry.WorkerCon
// NewWorkerImpl implements WorkerFactory.NewWorkerImpl
func (f workerFactory) NewWorkerImpl(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, conf framework.WorkerConfig) (framework.WorkerImpl, error) {
cfg := conf.(*config.TaskCfg)
log.Info("new dm worker", zap.String(logutil.ConstFieldJobKey, masterID), zap.String(logutil.ConstFieldWorkerKey, workerID), zap.Uint64("config_modify_revision", cfg.ModRevision))
dmSubtaskCfg := cfg.ToDMSubTaskCfg(masterID)
return newDMWorker(ctx, masterID, f.workerType, dmSubtaskCfg, cfg.ModRevision), nil
log.Info("new dm worker", zap.String(logutil.ConstFieldJobKey, masterID), zap.Stringer("worker_type", f.workerType), zap.String(logutil.ConstFieldWorkerKey, workerID), zap.Any("task_config", cfg))
return newDMWorker(ctx, masterID, f.workerType, cfg), nil
}

// IsRetryableError implements WorkerFactory.IsRetryableError
Expand All @@ -101,22 +100,25 @@ type dmWorker struct {
messageHandlerManager p2p.MessageHandlerManager

cfgModRevision uint64
needExtStorage bool
}

func newDMWorker(ctx *dcontext.Context, masterID frameModel.MasterID, workerType framework.WorkerType, cfg *dmconfig.SubTaskConfig, cfgModRevision uint64) *dmWorker {
func newDMWorker(ctx *dcontext.Context, masterID frameModel.MasterID, workerType framework.WorkerType, cfg *config.TaskCfg) *dmWorker {
// TODO: support config later
// nolint:errcheck
bf, _ := backoff.NewBackoff(dmconfig.DefaultBackoffFactor, dmconfig.DefaultBackoffJitter, dmconfig.DefaultBackoffMin, dmconfig.DefaultBackoffMax)
autoResume := &worker.AutoResumeInfo{Backoff: bf, LatestPausedTime: time.Now(), LatestResumeTime: time.Now()}
dmSubtaskCfg := cfg.ToDMSubTaskCfg(masterID)
w := &dmWorker{
cfg: cfg,
cfg: dmSubtaskCfg,
stage: metadata.StageInit,
workerType: workerType,
taskID: cfg.SourceID,
taskID: dmSubtaskCfg.SourceID,
masterID: masterID,
unitHolder: newUnitHolderImpl(workerType, cfg),
unitHolder: newUnitHolderImpl(workerType, dmSubtaskCfg),
autoResume: autoResume,
cfgModRevision: cfgModRevision,
cfgModRevision: cfg.ModRevision,
needExtStorage: cfg.NeedExtStorage,
}

// nolint:errcheck
Expand All @@ -135,7 +137,7 @@ func (w *dmWorker) InitImpl(ctx context.Context) error {
if err := w.messageAgent.UpdateClient(w.masterID, w); err != nil {
return err
}
if w.cfg.Mode != dmconfig.ModeIncrement {
if w.cfg.Mode != dmconfig.ModeIncrement && w.needExtStorage {
if err := w.setupStorage(ctx); err != nil {
return err
}
Expand Down
19 changes: 18 additions & 1 deletion engine/executor/dm/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,24 @@ func TestWorker(t *testing.T) {
require.NoError(t, dp.Provide(func() p2p.MessageHandlerManager {
return p2p.NewMockMessageHandlerManager()
}))
dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, &dmconfig.SubTaskConfig{}, 0)
taskCfg := &config.TaskCfg{
JobCfg: config.JobCfg{
TargetDB: &dmconfig.DBConfig{},
Upstreams: []*config.UpstreamCfg{
{
MySQLInstance: dmconfig.MySQLInstance{
Mydumper: &dmconfig.MydumperConfig{},
Loader: &dmconfig.LoaderConfig{},
Syncer: &dmconfig.SyncerConfig{},
SourceID: "task-id",
},
DBCfg: &dmconfig.DBConfig{},
},
},
},
NeedExtStorage: true,
}
dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, taskCfg)
unitHolder := &mockUnitHolder{}
dmWorker.unitHolder = unitHolder
dmWorker.BaseWorker = framework.MockBaseWorker("worker-id", "master-id", dmWorker)
Expand Down
18 changes: 10 additions & 8 deletions engine/jobmaster/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ func TestQueryStatusAPI(t *testing.T) {
BaseJobMaster: mockBaseJobmaster,
metadata: metadata.NewMetaData(metaKVClient, log.L()),
}
job = &metadata.Job{
jobCfg = &config.JobCfg{ModRevision: 4}
taskCfg = jobCfg.ToTaskCfg()
job = &metadata.Job{
Tasks: map[string]*metadata.Task{
"task1": {Stage: metadata.StagePaused, Cfg: &config.TaskCfg{ModRevision: 4}},
"task2": {Stage: metadata.StageFinished, Cfg: &config.TaskCfg{ModRevision: 4}},
"task3": {Stage: metadata.StageFinished, Cfg: &config.TaskCfg{ModRevision: 4}},
"task4": {Stage: metadata.StageRunning, Cfg: &config.TaskCfg{ModRevision: 4}},
"task5": {Stage: metadata.StageRunning, Cfg: &config.TaskCfg{ModRevision: 4}},
"task6": {Stage: metadata.StageRunning, Cfg: &config.TaskCfg{ModRevision: 4}},
"task7": {Stage: metadata.StageFinished, Cfg: &config.TaskCfg{ModRevision: 4}},
"task1": {Stage: metadata.StagePaused, Cfg: taskCfg},
"task2": {Stage: metadata.StageFinished, Cfg: taskCfg},
"task3": {Stage: metadata.StageFinished, Cfg: taskCfg},
"task4": {Stage: metadata.StageRunning, Cfg: taskCfg},
"task5": {Stage: metadata.StageRunning, Cfg: taskCfg},
"task6": {Stage: metadata.StageRunning, Cfg: taskCfg},
"task7": {Stage: metadata.StageFinished, Cfg: taskCfg},
},
}
dumpStatus = &pb.DumpStatus{
Expand Down
4 changes: 2 additions & 2 deletions engine/jobmaster/dm/checkpoint/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func onlineDDLName(jobID string, cfg *config.JobCfg) string {

func isLoadFresh(ctx context.Context, jobID string, taskCfg *config.TaskCfg, db *conn.BaseDB) (bool, error) {
// nolint:gosec
query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", loadTableName(jobID, (*config.JobCfg)(taskCfg)))
query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", loadTableName(jobID, taskCfg.ToJobCfg()))
var status string
err := db.DB.QueryRowContext(ctx, query, jobID, taskCfg.Upstreams[0].SourceID).Scan(&status)
switch {
Expand All @@ -226,7 +226,7 @@ func isLoadFresh(ctx context.Context, jobID string, taskCfg *config.TaskCfg, db

func isSyncFresh(ctx context.Context, jobID string, taskCfg *config.TaskCfg, db *conn.BaseDB) (bool, error) {
// nolint:gosec
query := fmt.Sprintf("SELECT 1 FROM %s WHERE `id` = ? AND `is_global` = true", syncTableName(jobID, (*config.JobCfg)(taskCfg)))
query := fmt.Sprintf("SELECT 1 FROM %s WHERE `id` = ? AND `is_global` = true", syncTableName(jobID, taskCfg.ToJobCfg()))
var status string
err := db.DB.QueryRowContext(ctx, query, taskCfg.Upstreams[0].SourceID).Scan(&status)
switch {
Expand Down
30 changes: 25 additions & 5 deletions engine/jobmaster/dm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ func (c *JobCfg) Clone() (*JobCfg, error) {
func (c *JobCfg) ToTaskCfgs() map[string]*TaskCfg {
taskCfgs := make(map[string]*TaskCfg, len(c.Upstreams))
for _, mysqlInstance := range c.Upstreams {
// nolint:errcheck
jobCfg, _ := c.Clone()
taskCfg := (*TaskCfg)(jobCfg)
taskCfg := c.ToTaskCfg()
taskCfg.Upstreams = []*UpstreamCfg{mysqlInstance}
taskCfgs[mysqlInstance.SourceID] = taskCfg
}
Expand All @@ -176,7 +174,7 @@ func FromTaskCfgs(taskCfgs []*TaskCfg) *JobCfg {
return nil
}

jobCfg := (*JobCfg)(taskCfgs[0])
jobCfg := taskCfgs[0].ToJobCfg()
// nolint:errcheck
jobCfg, _ = jobCfg.Clone()
for i := 1; i < len(taskCfgs); i++ {
Expand Down Expand Up @@ -248,9 +246,31 @@ func (c *JobCfg) verifySourceID() error {
return nil
}

// ToTaskCfg converts JobCfg to TaskCfg.
func (c *JobCfg) ToTaskCfg() *TaskCfg {
// nolint:errcheck
clone, _ := c.Clone()
return &TaskCfg{
JobCfg: *clone,
}
}

// TaskCfg shares same struct as JobCfg, but it only serves one upstream.
// TaskCfg can be converted to an equivalent DM subtask by ToDMSubTaskCfg.
type TaskCfg JobCfg
// TaskCfg add some internal config for jobmaster/worker.
type TaskCfg struct {
JobCfg

// FIXME: remove this item after fix https://github.com/pingcap/tiflow/issues/7304
NeedExtStorage bool
}

// ToJobCfg converts TaskCfg to JobCfg.
func (c *TaskCfg) ToJobCfg() *JobCfg {
// nolint:errcheck
clone, _ := c.JobCfg.Clone()
return clone
}

// ToDMSubTaskCfg adapts a TaskCfg to a SubTaskCfg for worker now.
// TODO: fully support all fields
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (jm *JobMaster) removeCheckpoint(ctx context.Context) error {
}
job := state.(*metadata.Job)
for _, task := range job.Tasks {
cfg := (*config.JobCfg)(task.Cfg)
cfg := task.Cfg.ToJobCfg()
return jm.checkpointAgent.Remove(ctx, cfg)
}
return errors.New("no task found in job")
Expand Down
8 changes: 7 additions & 1 deletion engine/jobmaster/dm/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,21 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad
}

var resources []resModel.ResourceID
taskCfg := persistentTask.Cfg
// first worker don't need local resource.
// unfresh sync unit don't need local resource.(if we need to save table checkpoint for loadTableStructureFromDump in future, we can save it before saving global checkpoint.)
// TODO: storage should be created/discarded in jobmaster instead of worker.
if workerIdxInSeq(persistentTask.Cfg.TaskMode, nextUnit) != 0 && !(nextUnit == frameModel.WorkerDMSync && !isFresh) {
resources = append(resources, NewDMResourceID(wm.jobID, persistentTask.Cfg.Upstreams[0].SourceID))
}

// FIXME: remove this after fix https://github.com/pingcap/tiflow/issues/7304
if nextUnit != frameModel.WorkerDMSync || isFresh {
taskCfg.NeedExtStorage = true
}

// createWorker should be an asynchronous operation
if err := wm.createWorker(ctx, taskID, nextUnit, persistentTask.Cfg, resources...); err != nil {
if err := wm.createWorker(ctx, taskID, nextUnit, taskCfg, resources...); err != nil {
recordError = err
continue
}
Expand Down
6 changes: 4 additions & 2 deletions engine/jobmaster/dm/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,12 @@ func (t *testDMJobmasterSuite) TestClearWorkerStatus() {
job.Tasks[source2] = metadata.NewTask(&config.TaskCfg{})
require.NoError(t.T(), workerManager.stopOutdatedWorkers(context.Background(), job))
messageAgent.On("SendMessage").Return(destroyError).Once()
job.Tasks[source2] = metadata.NewTask(&config.TaskCfg{ModRevision: 1})
jobCfg := &config.JobCfg{ModRevision: 1}
taskCfg := jobCfg.ToTaskCfg()
job.Tasks[source2] = metadata.NewTask(taskCfg)
require.EqualError(t.T(), workerManager.stopOutdatedWorkers(context.Background(), job), destroyError.Error())
messageAgent.On("SendMessage").Return(nil).Once()
job.Tasks[source2] = metadata.NewTask(&config.TaskCfg{ModRevision: 1})
job.Tasks[source2] = metadata.NewTask(taskCfg)
require.NoError(t.T(), workerManager.stopOutdatedWorkers(context.Background(), job))

job = metadata.NewJob(&config.JobCfg{})
Expand Down

0 comments on commit 84cb040

Please sign in to comment.