diff --git a/engine/executor/dm/api_test.go b/engine/executor/dm/api_test.go index 5e0710dc274..9a6c20224a8 100644 --- a/engine/executor/dm/api_test.go +++ b/engine/executor/dm/api_test.go @@ -20,10 +20,11 @@ import ( "testing" "github.com/gogo/protobuf/jsonpb" - "github.com/pingcap/tiflow/dm/config" + dmconfig "github.com/pingcap/tiflow/dm/config" "github.com/pingcap/tiflow/dm/pb" "github.com/pingcap/tiflow/engine/framework" frameModel "github.com/pingcap/tiflow/engine/framework/model" + "github.com/pingcap/tiflow/engine/jobmaster/dm/config" "github.com/pingcap/tiflow/engine/jobmaster/dm/metadata" dcontext "github.com/pingcap/tiflow/engine/pkg/context" "github.com/pingcap/tiflow/engine/pkg/deps" @@ -102,6 +103,22 @@ func TestQueryStatusAPI(t *testing.T) { Result: &dmpkg.ProcessResult{Errors: []*dmpkg.ProcessError{processError}}, Status: []byte(syncStatusBytes), } + taskCfg = &config.TaskCfg{ + JobCfg: config.JobCfg{ + TargetDB: &dmconfig.DBConfig{}, + Upstreams: []*config.UpstreamCfg{ + { + MySQLInstance: dmconfig.MySQLInstance{ + Mydumper: &dmconfig.MydumperConfig{}, + Loader: &dmconfig.LoaderConfig{}, + Syncer: &dmconfig.SyncerConfig{}, + SourceID: "task-id", + }, + DBCfg: &dmconfig.DBConfig{}, + }, + }, + }, + } ) dctx := dcontext.Background() @@ -111,7 +128,7 @@ func TestQueryStatusAPI(t *testing.T) { })) dctx = dctx.WithDeps(dp) - dmWorker := newDMWorker(dctx, "", frameModel.WorkerDMDump, &config.SubTaskConfig{SourceID: "task-id"}, 0) + dmWorker := newDMWorker(dctx, "", frameModel.WorkerDMDump, taskCfg) unitHolder := &mockUnitHolder{} dmWorker.unitHolder = unitHolder @@ -148,7 +165,23 @@ func TestStopWorker(t *testing.T) { })) dctx = dctx.WithDeps(dp) - dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, &config.SubTaskConfig{SourceID: "task-id"}, 0) + taskCfg := &config.TaskCfg{ + JobCfg: config.JobCfg{ + TargetDB: &dmconfig.DBConfig{}, + Upstreams: []*config.UpstreamCfg{ + { + MySQLInstance: dmconfig.MySQLInstance{ + Mydumper: &dmconfig.MydumperConfig{}, + Loader: &dmconfig.LoaderConfig{}, + Syncer: &dmconfig.SyncerConfig{}, + SourceID: "task-id", + }, + DBCfg: &dmconfig.DBConfig{}, + }, + }, + }, + } + dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, taskCfg) dmWorker.BaseWorker = framework.MockBaseWorker("worker-id", "master-id", dmWorker) dmWorker.BaseWorker.Init(context.Background()) dmWorker.unitHolder = &mockUnitHolder{} @@ -169,7 +202,23 @@ func TestOperateTask(t *testing.T) { })) dctx = dctx.WithDeps(dp) - dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, &config.SubTaskConfig{SourceID: "task-id"}, 0) + taskCfg := &config.TaskCfg{ + JobCfg: config.JobCfg{ + TargetDB: &dmconfig.DBConfig{}, + Upstreams: []*config.UpstreamCfg{ + { + MySQLInstance: dmconfig.MySQLInstance{ + Mydumper: &dmconfig.MydumperConfig{}, + Loader: &dmconfig.LoaderConfig{}, + Syncer: &dmconfig.SyncerConfig{}, + SourceID: "task-id", + }, + DBCfg: &dmconfig.DBConfig{}, + }, + }, + }, + } + dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, taskCfg) dmWorker.BaseWorker = framework.MockBaseWorker("worker-id", "master-id", dmWorker) dmWorker.BaseWorker.Init(context.Background()) mockUnitHolder := &mockUnitHolder{} diff --git a/engine/executor/dm/worker.go b/engine/executor/dm/worker.go index e3f7013599c..7ad2d5b0901 100644 --- a/engine/executor/dm/worker.go +++ b/engine/executor/dm/worker.go @@ -73,9 +73,8 @@ func (f workerFactory) DeserializeConfig(configBytes []byte) (registry.WorkerCon // NewWorkerImpl implements WorkerFactory.NewWorkerImpl func (f workerFactory) NewWorkerImpl(ctx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, conf framework.WorkerConfig) (framework.WorkerImpl, error) { cfg := conf.(*config.TaskCfg) - log.Info("new dm worker", zap.String(logutil.ConstFieldJobKey, masterID), zap.String(logutil.ConstFieldWorkerKey, workerID), zap.Uint64("config_modify_revision", cfg.ModRevision)) - dmSubtaskCfg := cfg.ToDMSubTaskCfg(masterID) - return newDMWorker(ctx, masterID, f.workerType, dmSubtaskCfg, cfg.ModRevision), nil + log.Info("new dm worker", zap.String(logutil.ConstFieldJobKey, masterID), zap.Stringer("worker_type", f.workerType), zap.String(logutil.ConstFieldWorkerKey, workerID), zap.Any("task_config", cfg)) + return newDMWorker(ctx, masterID, f.workerType, cfg), nil } // IsRetryableError implements WorkerFactory.IsRetryableError @@ -101,22 +100,25 @@ type dmWorker struct { messageHandlerManager p2p.MessageHandlerManager cfgModRevision uint64 + needExtStorage bool } -func newDMWorker(ctx *dcontext.Context, masterID frameModel.MasterID, workerType framework.WorkerType, cfg *dmconfig.SubTaskConfig, cfgModRevision uint64) *dmWorker { +func newDMWorker(ctx *dcontext.Context, masterID frameModel.MasterID, workerType framework.WorkerType, cfg *config.TaskCfg) *dmWorker { // TODO: support config later // nolint:errcheck bf, _ := backoff.NewBackoff(dmconfig.DefaultBackoffFactor, dmconfig.DefaultBackoffJitter, dmconfig.DefaultBackoffMin, dmconfig.DefaultBackoffMax) autoResume := &worker.AutoResumeInfo{Backoff: bf, LatestPausedTime: time.Now(), LatestResumeTime: time.Now()} + dmSubtaskCfg := cfg.ToDMSubTaskCfg(masterID) w := &dmWorker{ - cfg: cfg, + cfg: dmSubtaskCfg, stage: metadata.StageInit, workerType: workerType, - taskID: cfg.SourceID, + taskID: dmSubtaskCfg.SourceID, masterID: masterID, - unitHolder: newUnitHolderImpl(workerType, cfg), + unitHolder: newUnitHolderImpl(workerType, dmSubtaskCfg), autoResume: autoResume, - cfgModRevision: cfgModRevision, + cfgModRevision: cfg.ModRevision, + needExtStorage: cfg.NeedExtStorage, } // nolint:errcheck @@ -135,7 +137,7 @@ func (w *dmWorker) InitImpl(ctx context.Context) error { if err := w.messageAgent.UpdateClient(w.masterID, w); err != nil { return err } - if w.cfg.Mode != dmconfig.ModeIncrement { + if w.cfg.Mode != dmconfig.ModeIncrement && w.needExtStorage { if err := w.setupStorage(ctx); err != nil { return err } diff --git a/engine/executor/dm/worker_test.go b/engine/executor/dm/worker_test.go index 38c88b860d3..825ac72b6d7 100644 --- a/engine/executor/dm/worker_test.go +++ b/engine/executor/dm/worker_test.go @@ -110,7 +110,24 @@ func TestWorker(t *testing.T) { require.NoError(t, dp.Provide(func() p2p.MessageHandlerManager { return p2p.NewMockMessageHandlerManager() })) - dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, &dmconfig.SubTaskConfig{}, 0) + taskCfg := &config.TaskCfg{ + JobCfg: config.JobCfg{ + TargetDB: &dmconfig.DBConfig{}, + Upstreams: []*config.UpstreamCfg{ + { + MySQLInstance: dmconfig.MySQLInstance{ + Mydumper: &dmconfig.MydumperConfig{}, + Loader: &dmconfig.LoaderConfig{}, + Syncer: &dmconfig.SyncerConfig{}, + SourceID: "task-id", + }, + DBCfg: &dmconfig.DBConfig{}, + }, + }, + }, + NeedExtStorage: true, + } + dmWorker := newDMWorker(dctx, "master-id", frameModel.WorkerDMDump, taskCfg) unitHolder := &mockUnitHolder{} dmWorker.unitHolder = unitHolder dmWorker.BaseWorker = framework.MockBaseWorker("worker-id", "master-id", dmWorker) diff --git a/engine/jobmaster/dm/api_test.go b/engine/jobmaster/dm/api_test.go index 64999964723..8fbacdcecf2 100644 --- a/engine/jobmaster/dm/api_test.go +++ b/engine/jobmaster/dm/api_test.go @@ -47,15 +47,17 @@ func TestQueryStatusAPI(t *testing.T) { BaseJobMaster: mockBaseJobmaster, metadata: metadata.NewMetaData(metaKVClient, log.L()), } - job = &metadata.Job{ + jobCfg = &config.JobCfg{ModRevision: 4} + taskCfg = jobCfg.ToTaskCfg() + job = &metadata.Job{ Tasks: map[string]*metadata.Task{ - "task1": {Stage: metadata.StagePaused, Cfg: &config.TaskCfg{ModRevision: 4}}, - "task2": {Stage: metadata.StageFinished, Cfg: &config.TaskCfg{ModRevision: 4}}, - "task3": {Stage: metadata.StageFinished, Cfg: &config.TaskCfg{ModRevision: 4}}, - "task4": {Stage: metadata.StageRunning, Cfg: &config.TaskCfg{ModRevision: 4}}, - "task5": {Stage: metadata.StageRunning, Cfg: &config.TaskCfg{ModRevision: 4}}, - "task6": {Stage: metadata.StageRunning, Cfg: &config.TaskCfg{ModRevision: 4}}, - "task7": {Stage: metadata.StageFinished, Cfg: &config.TaskCfg{ModRevision: 4}}, + "task1": {Stage: metadata.StagePaused, Cfg: taskCfg}, + "task2": {Stage: metadata.StageFinished, Cfg: taskCfg}, + "task3": {Stage: metadata.StageFinished, Cfg: taskCfg}, + "task4": {Stage: metadata.StageRunning, Cfg: taskCfg}, + "task5": {Stage: metadata.StageRunning, Cfg: taskCfg}, + "task6": {Stage: metadata.StageRunning, Cfg: taskCfg}, + "task7": {Stage: metadata.StageFinished, Cfg: taskCfg}, }, } dumpStatus = &pb.DumpStatus{ diff --git a/engine/jobmaster/dm/checkpoint/agent.go b/engine/jobmaster/dm/checkpoint/agent.go index 144f8ce9596..5fa66250acf 100644 --- a/engine/jobmaster/dm/checkpoint/agent.go +++ b/engine/jobmaster/dm/checkpoint/agent.go @@ -211,7 +211,7 @@ func onlineDDLName(jobID string, cfg *config.JobCfg) string { func isLoadFresh(ctx context.Context, jobID string, taskCfg *config.TaskCfg, db *conn.BaseDB) (bool, error) { // nolint:gosec - query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", loadTableName(jobID, (*config.JobCfg)(taskCfg))) + query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", loadTableName(jobID, taskCfg.ToJobCfg())) var status string err := db.DB.QueryRowContext(ctx, query, jobID, taskCfg.Upstreams[0].SourceID).Scan(&status) switch { @@ -226,7 +226,7 @@ func isLoadFresh(ctx context.Context, jobID string, taskCfg *config.TaskCfg, db func isSyncFresh(ctx context.Context, jobID string, taskCfg *config.TaskCfg, db *conn.BaseDB) (bool, error) { // nolint:gosec - query := fmt.Sprintf("SELECT 1 FROM %s WHERE `id` = ? AND `is_global` = true", syncTableName(jobID, (*config.JobCfg)(taskCfg))) + query := fmt.Sprintf("SELECT 1 FROM %s WHERE `id` = ? AND `is_global` = true", syncTableName(jobID, taskCfg.ToJobCfg())) var status string err := db.DB.QueryRowContext(ctx, query, taskCfg.Upstreams[0].SourceID).Scan(&status) switch { diff --git a/engine/jobmaster/dm/config/config.go b/engine/jobmaster/dm/config/config.go index a8a46dca1ec..6df83ea8fc3 100644 --- a/engine/jobmaster/dm/config/config.go +++ b/engine/jobmaster/dm/config/config.go @@ -161,9 +161,7 @@ func (c *JobCfg) Clone() (*JobCfg, error) { func (c *JobCfg) ToTaskCfgs() map[string]*TaskCfg { taskCfgs := make(map[string]*TaskCfg, len(c.Upstreams)) for _, mysqlInstance := range c.Upstreams { - // nolint:errcheck - jobCfg, _ := c.Clone() - taskCfg := (*TaskCfg)(jobCfg) + taskCfg := c.ToTaskCfg() taskCfg.Upstreams = []*UpstreamCfg{mysqlInstance} taskCfgs[mysqlInstance.SourceID] = taskCfg } @@ -176,7 +174,7 @@ func FromTaskCfgs(taskCfgs []*TaskCfg) *JobCfg { return nil } - jobCfg := (*JobCfg)(taskCfgs[0]) + jobCfg := taskCfgs[0].ToJobCfg() // nolint:errcheck jobCfg, _ = jobCfg.Clone() for i := 1; i < len(taskCfgs); i++ { @@ -248,9 +246,31 @@ func (c *JobCfg) verifySourceID() error { return nil } +// ToTaskCfg converts JobCfg to TaskCfg. +func (c *JobCfg) ToTaskCfg() *TaskCfg { + // nolint:errcheck + clone, _ := c.Clone() + return &TaskCfg{ + JobCfg: *clone, + } +} + // TaskCfg shares same struct as JobCfg, but it only serves one upstream. // TaskCfg can be converted to an equivalent DM subtask by ToDMSubTaskCfg. -type TaskCfg JobCfg +// TaskCfg add some internal config for jobmaster/worker. +type TaskCfg struct { + JobCfg + + // FIXME: remove this item after fix https://github.com/pingcap/tiflow/issues/7304 + NeedExtStorage bool +} + +// ToJobCfg converts TaskCfg to JobCfg. +func (c *TaskCfg) ToJobCfg() *JobCfg { + // nolint:errcheck + clone, _ := c.JobCfg.Clone() + return clone +} // ToDMSubTaskCfg adapts a TaskCfg to a SubTaskCfg for worker now. // TODO: fully support all fields diff --git a/engine/jobmaster/dm/dm_jobmaster.go b/engine/jobmaster/dm/dm_jobmaster.go index 004327433da..ae7cafbef4e 100644 --- a/engine/jobmaster/dm/dm_jobmaster.go +++ b/engine/jobmaster/dm/dm_jobmaster.go @@ -411,7 +411,7 @@ func (jm *JobMaster) removeCheckpoint(ctx context.Context) error { } job := state.(*metadata.Job) for _, task := range job.Tasks { - cfg := (*config.JobCfg)(task.Cfg) + cfg := task.Cfg.ToJobCfg() return jm.checkpointAgent.Remove(ctx, cfg) } return errors.New("no task found in job") diff --git a/engine/jobmaster/dm/worker_manager.go b/engine/jobmaster/dm/worker_manager.go index f1c8a9acc21..c8b350b9c4c 100644 --- a/engine/jobmaster/dm/worker_manager.go +++ b/engine/jobmaster/dm/worker_manager.go @@ -249,6 +249,7 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad } var resources []resModel.ResourceID + taskCfg := persistentTask.Cfg // first worker don't need local resource. // unfresh sync unit don't need local resource.(if we need to save table checkpoint for loadTableStructureFromDump in future, we can save it before saving global checkpoint.) // TODO: storage should be created/discarded in jobmaster instead of worker. @@ -256,8 +257,13 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad resources = append(resources, NewDMResourceID(wm.jobID, persistentTask.Cfg.Upstreams[0].SourceID)) } + // FIXME: remove this after fix https://github.com/pingcap/tiflow/issues/7304 + if nextUnit != frameModel.WorkerDMSync || isFresh { + taskCfg.NeedExtStorage = true + } + // createWorker should be an asynchronous operation - if err := wm.createWorker(ctx, taskID, nextUnit, persistentTask.Cfg, resources...); err != nil { + if err := wm.createWorker(ctx, taskID, nextUnit, taskCfg, resources...); err != nil { recordError = err continue } diff --git a/engine/jobmaster/dm/worker_manager_test.go b/engine/jobmaster/dm/worker_manager_test.go index ab8f598b9ed..45fc59cd1de 100644 --- a/engine/jobmaster/dm/worker_manager_test.go +++ b/engine/jobmaster/dm/worker_manager_test.go @@ -157,10 +157,12 @@ func (t *testDMJobmasterSuite) TestClearWorkerStatus() { job.Tasks[source2] = metadata.NewTask(&config.TaskCfg{}) require.NoError(t.T(), workerManager.stopOutdatedWorkers(context.Background(), job)) messageAgent.On("SendMessage").Return(destroyError).Once() - job.Tasks[source2] = metadata.NewTask(&config.TaskCfg{ModRevision: 1}) + jobCfg := &config.JobCfg{ModRevision: 1} + taskCfg := jobCfg.ToTaskCfg() + job.Tasks[source2] = metadata.NewTask(taskCfg) require.EqualError(t.T(), workerManager.stopOutdatedWorkers(context.Background(), job), destroyError.Error()) messageAgent.On("SendMessage").Return(nil).Once() - job.Tasks[source2] = metadata.NewTask(&config.TaskCfg{ModRevision: 1}) + job.Tasks[source2] = metadata.NewTask(taskCfg) require.NoError(t.T(), workerManager.stopOutdatedWorkers(context.Background(), job)) job = metadata.NewJob(&config.JobCfg{})