diff --git a/Makefile b/Makefile index 4ee16ab839de4..96c92efe3295e 100644 --- a/Makefile +++ b/Makefile @@ -272,7 +272,7 @@ tools/bin/gotestsum: # mockgen@v0.2.0 is imcompatible with v0.3.0, so install it always. mockgen: - GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/mockgen@v0.3.0 + GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/lance6716/mock/mockgen@v0.4.0-patch # Usage: # @@ -392,8 +392,8 @@ mock_lightning: mockgen gen_mock: mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go - tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension > pkg/disttask/framework/scheduler/mock/scheduler_mock.go - tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor > pkg/disttask/framework/mock/execute/execute_mock.go + tools/bin/mockgen -destination pkg/disttask/framework/scheduler/mock/scheduler_mock.go -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension + tools/bin/mockgen -embed -package mockexecute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor > pkg/disttask/framework/mock/execute/execute_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go diff --git a/br/pkg/mock/storage/storage.go b/br/pkg/mock/storage/storage.go index 6ca014bb92b93..5e547f4c18436 100644 --- a/br/pkg/mock/storage/storage.go +++ b/br/pkg/mock/storage/storage.go @@ -5,6 +5,7 @@ // // mockgen -package mockstorage github.com/pingcap/tidb/br/pkg/storage ExternalStorage // + // Package mockstorage is a generated GoMock package. package mockstorage @@ -39,6 +40,11 @@ func (m *MockExternalStorage) EXPECT() *MockExternalStorageMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockExternalStorage) ISGOMOCK() struct{} { + return struct{}{} +} + // Close mocks base method. func (m *MockExternalStorage) Close() { m.ctrl.T.Helper() diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 4d8739d764e57..a90cc74580d21 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -191,7 +191,7 @@ func (s *backfillDistExecutor) Init(ctx context.Context) error { return nil } -func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) { +func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task) (execute.StepExecutor, error) { switch task.Step { case proto.BackfillStepReadIndex, proto.BackfillStepMergeSort, proto.BackfillStepWriteAndIngest: return s.newBackfillSubtaskExecutor(task.Step) diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 98239f5c70daa..5b30f5512a347 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -38,6 +38,7 @@ import ( ) type readIndexExecutor struct { + execute.StepExecFrameworkInfo d *ddl job *model.Job indexes []*model.IndexInfo diff --git a/pkg/disttask/framework/mock/execute/execute_mock.go b/pkg/disttask/framework/mock/execute/execute_mock.go index cae959b911188..edd5af519dad1 100644 --- a/pkg/disttask/framework/mock/execute/execute_mock.go +++ b/pkg/disttask/framework/mock/execute/execute_mock.go @@ -3,10 +3,11 @@ // // Generated by this command: // -// mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor +// mockgen -embed -package mockexecute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor // -// Package execute is a generated GoMock package. -package execute + +// Package mockexecute is a generated GoMock package. +package mockexecute import ( context "context" @@ -19,6 +20,7 @@ import ( // MockStepExecutor is a mock of StepExecutor interface. type MockStepExecutor struct { + execute.StepExecutor ctrl *gomock.Controller recorder *MockStepExecutorMockRecorder } @@ -40,6 +42,11 @@ func (m *MockStepExecutor) EXPECT() *MockStepExecutorMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockStepExecutor) ISGOMOCK() struct{} { + return struct{}{} +} + // Cleanup mocks base method. func (m *MockStepExecutor) Cleanup(arg0 context.Context) error { m.ctrl.T.Helper() @@ -54,6 +61,20 @@ func (mr *MockStepExecutorMockRecorder) Cleanup(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockStepExecutor)(nil).Cleanup), arg0) } +// GetResource mocks base method. +func (m *MockStepExecutor) GetResource() *proto.StepResource { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetResource") + ret0, _ := ret[0].(*proto.StepResource) + return ret0 +} + +// GetResource indicates an expected call of GetResource. +func (mr *MockStepExecutorMockRecorder) GetResource() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResource", reflect.TypeOf((*MockStepExecutor)(nil).GetResource)) +} + // Init mocks base method. func (m *MockStepExecutor) Init(arg0 context.Context) error { m.ctrl.T.Helper() @@ -109,3 +130,15 @@ func (mr *MockStepExecutorMockRecorder) RunSubtask(arg0, arg1 any) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunSubtask", reflect.TypeOf((*MockStepExecutor)(nil).RunSubtask), arg0, arg1) } + +// restricted mocks base method. +func (m *MockStepExecutor) restricted() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "restricted") +} + +// restricted indicates an expected call of restricted. +func (mr *MockStepExecutorMockRecorder) restricted() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "restricted", reflect.TypeOf((*MockStepExecutor)(nil).restricted)) +} diff --git a/pkg/disttask/framework/mock/plan_mock.go b/pkg/disttask/framework/mock/plan_mock.go index 1192073675f11..b0db8b51dc48d 100644 --- a/pkg/disttask/framework/mock/plan_mock.go +++ b/pkg/disttask/framework/mock/plan_mock.go @@ -5,6 +5,7 @@ // // mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec // + // Package mock is a generated GoMock package. package mock @@ -38,6 +39,11 @@ func (m *MockLogicalPlan) EXPECT() *MockLogicalPlanMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockLogicalPlan) ISGOMOCK() struct{} { + return struct{}{} +} + // FromTaskMeta mocks base method. func (m *MockLogicalPlan) FromTaskMeta(arg0 []byte) error { m.ctrl.T.Helper() @@ -105,6 +111,11 @@ func (m *MockPipelineSpec) EXPECT() *MockPipelineSpecMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockPipelineSpec) ISGOMOCK() struct{} { + return struct{}{} +} + // ToSubtaskMeta mocks base method. func (m *MockPipelineSpec) ToSubtaskMeta(arg0 planner.PlanCtx) ([]byte, error) { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/mock/scheduler_mock.go b/pkg/disttask/framework/mock/scheduler_mock.go index 5493ab1d2f556..a37a16b894be7 100644 --- a/pkg/disttask/framework/mock/scheduler_mock.go +++ b/pkg/disttask/framework/mock/scheduler_mock.go @@ -5,6 +5,7 @@ // // mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager // + // Package mock is a generated GoMock package. package mock @@ -41,6 +42,11 @@ func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockScheduler) ISGOMOCK() struct{} { + return struct{}{} +} + // Close mocks base method. func (m *MockScheduler) Close() { m.ctrl.T.Helper() @@ -200,6 +206,11 @@ func (m *MockCleanUpRoutine) EXPECT() *MockCleanUpRoutineMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockCleanUpRoutine) ISGOMOCK() struct{} { + return struct{}{} +} + // CleanUp mocks base method. func (m *MockCleanUpRoutine) CleanUp(arg0 context.Context, arg1 *proto.Task) error { m.ctrl.T.Helper() @@ -237,6 +248,11 @@ func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockTaskManager) ISGOMOCK() struct{} { + return struct{}{} +} + // CancelTask mocks base method. func (m *MockTaskManager) CancelTask(arg0 context.Context, arg1 int64) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/mock/task_executor_mock.go b/pkg/disttask/framework/mock/task_executor_mock.go index 3935737ec96dd..2ecd10ab0472a 100644 --- a/pkg/disttask/framework/mock/task_executor_mock.go +++ b/pkg/disttask/framework/mock/task_executor_mock.go @@ -5,6 +5,7 @@ // // mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension // + // Package mock is a generated GoMock package. package mock @@ -41,6 +42,11 @@ func (m *MockTaskTable) EXPECT() *MockTaskTableMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockTaskTable) ISGOMOCK() struct{} { + return struct{}{} +} + // CancelSubtask mocks base method. func (m *MockTaskTable) CancelSubtask(arg0 context.Context, arg1 string, arg2 int64) error { m.ctrl.T.Helper() @@ -315,6 +321,11 @@ func (m *MockPool) EXPECT() *MockPoolMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockPool) ISGOMOCK() struct{} { + return struct{}{} +} + // ReleaseAndWait mocks base method. func (m *MockPool) ReleaseAndWait() { m.ctrl.T.Helper() @@ -378,6 +389,11 @@ func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockTaskExecutor) ISGOMOCK() struct{} { + return struct{}{} +} + // Cancel mocks base method. func (m *MockTaskExecutor) Cancel() { m.ctrl.T.Helper() @@ -491,19 +507,24 @@ func (m *MockExtension) EXPECT() *MockExtensionMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockExtension) ISGOMOCK() struct{} { + return struct{}{} +} + // GetStepExecutor mocks base method. -func (m *MockExtension) GetStepExecutor(arg0 *proto.Task, arg1 *proto.StepResource) (execute.StepExecutor, error) { +func (m *MockExtension) GetStepExecutor(arg0 *proto.Task) (execute.StepExecutor, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetStepExecutor", arg0, arg1) + ret := m.ctrl.Call(m, "GetStepExecutor", arg0) ret0, _ := ret[0].(execute.StepExecutor) ret1, _ := ret[1].(error) return ret0, ret1 } // GetStepExecutor indicates an expected call of GetStepExecutor. -func (mr *MockExtensionMockRecorder) GetStepExecutor(arg0, arg1 any) *gomock.Call { +func (mr *MockExtensionMockRecorder) GetStepExecutor(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStepExecutor", reflect.TypeOf((*MockExtension)(nil).GetStepExecutor), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStepExecutor", reflect.TypeOf((*MockExtension)(nil).GetStepExecutor), arg0) } // IsIdempotent mocks base method. diff --git a/pkg/disttask/framework/scheduler/mock/scheduler_mock.go b/pkg/disttask/framework/scheduler/mock/scheduler_mock.go index 54e18806e8a62..e820c22016f16 100644 --- a/pkg/disttask/framework/scheduler/mock/scheduler_mock.go +++ b/pkg/disttask/framework/scheduler/mock/scheduler_mock.go @@ -3,8 +3,9 @@ // // Generated by this command: // -// mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension +// mockgen -destination pkg/disttask/framework/scheduler/mock/scheduler_mock.go -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension // + // Package mock is a generated GoMock package. package mock @@ -40,6 +41,11 @@ func (m *MockExtension) EXPECT() *MockExtensionMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockExtension) ISGOMOCK() struct{} { + return struct{}{} +} + // GetEligibleInstances mocks base method. func (m *MockExtension) GetEligibleInstances(arg0 context.Context, arg1 *proto.Task) ([]string, error) { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index 47a76a2fca0f9..e872c39cbef13 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -51,13 +51,14 @@ go_test( ], embed = [":taskexecutor"], flaky = True, - shard_count = 15, + shard_count = 16, deps = [ "//pkg/disttask/framework/mock", "//pkg/disttask/framework/mock/execute", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/scheduler", "//pkg/disttask/framework/storage", + "//pkg/disttask/framework/taskexecutor/execute", "//pkg/disttask/framework/testutil", "//pkg/kv", "//pkg/testkit", diff --git a/pkg/disttask/framework/taskexecutor/execute/interface.go b/pkg/disttask/framework/taskexecutor/execute/interface.go index 61d464441a888..9fc0f80eeca71 100644 --- a/pkg/disttask/framework/taskexecutor/execute/interface.go +++ b/pkg/disttask/framework/taskexecutor/execute/interface.go @@ -16,6 +16,7 @@ package execute import ( "context" + "reflect" "github.com/pingcap/tidb/pkg/disttask/framework/proto" ) @@ -29,6 +30,8 @@ import ( // else OnFinished // Cleanup type StepExecutor interface { + StepExecFrameworkInfo + // Init is used to initialize the environment. // if failed, task executor will retry later. Init(context.Context) error @@ -49,3 +52,49 @@ type StepExecutor interface { type SubtaskSummary struct { RowCount int64 } + +// StepExecFrameworkInfo is an interface that should be embedded into the +// implementation of StepExecutor. It's set by the framework automatically and +// the implementation can use it to access necessary information. The framework +// will init it before `StepExecutor.Init`, before that you cannot call methods +// in this interface. +type StepExecFrameworkInfo interface { + // restricted is a private method to prevent other package mistakenly implements + // StepExecFrameworkInfo. So when StepExecFrameworkInfo is composed with other + // interfaces, the implementation of other interface must embed + // StepExecFrameworkInfo. + restricted() + // GetResource returns the expected resource of this step executor. + GetResource() *proto.StepResource +} + +var stepExecFrameworkInfoName = reflect.TypeOf((*StepExecFrameworkInfo)(nil)).Elem().Name() + +type frameworkInfo struct { + resource *proto.StepResource +} + +func (*frameworkInfo) restricted() {} + +func (f *frameworkInfo) GetResource() *proto.StepResource { + return f.resource +} + +// SetFrameworkInfo sets the framework info for the StepExecutor. +func SetFrameworkInfo(exec StepExecutor, resource *proto.StepResource) { + if exec == nil { + return + } + toInject := &frameworkInfo{resource: resource} + // use reflection to set the framework info + e := reflect.ValueOf(exec) + if e.Kind() == reflect.Ptr || e.Kind() == reflect.Interface { + e = e.Elem() + } + info := e.FieldByName(stepExecFrameworkInfoName) + // if `exec` embeds StepExecutor rather than StepExecFrameworkInfo, the field + // will not be found. This is happened in mock generated code. + if info.IsValid() && info.CanSet() { + info.Set(reflect.ValueOf(toInject)) + } +} diff --git a/pkg/disttask/framework/taskexecutor/interface.go b/pkg/disttask/framework/taskexecutor/interface.go index c64074307b8e1..2ae95087960b5 100644 --- a/pkg/disttask/framework/taskexecutor/interface.go +++ b/pkg/disttask/framework/taskexecutor/interface.go @@ -118,7 +118,7 @@ type Extension interface { // Note: // 1. summary is the summary manager of all subtask of the same type now. // 2. should not retry the error from it. - GetStepExecutor(task *proto.Task, resource *proto.StepResource) (execute.StepExecutor, error) + GetStepExecutor(task *proto.Task) (execute.StepExecutor, error) // IsRetryableError returns whether the error is transient. // When error is transient, the framework won't mark subtasks as failed, // then the TaskExecutor can load the subtask again and redo it. @@ -128,6 +128,7 @@ type Extension interface { // EmptyStepExecutor is an empty Executor. // it can be used for the task that does not need to split into subtasks. type EmptyStepExecutor struct { + execute.StepExecFrameworkInfo } var _ execute.StepExecutor = &EmptyStepExecutor{} diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 87981cd7a289b..b25f9781cf37c 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -309,11 +309,12 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) stepLogger.End(zap.InfoLevel, resErr) }() - stepExecutor, err := e.GetStepExecutor(task, resource) + stepExecutor, err := e.GetStepExecutor(task) if err != nil { e.onError(err) return e.getError() } + execute.SetFrameworkInfo(stepExecutor, resource) failpoint.Inject("mockExecSubtaskInitEnvErr", func() { failpoint.Return(errors.New("mockExecSubtaskInitEnvErr")) diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 9b834e62c2cfe..7db71b127048d 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -21,9 +21,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/disttask/framework/mock" - mockexecute "github.com/pingcap/tidb/pkg/disttask/framework/mock/execute" + "github.com/pingcap/tidb/pkg/disttask/framework/mock/execute" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "google.golang.org/grpc/codes" @@ -57,7 +58,7 @@ func TestTaskExecutorRun(t *testing.T) { // 1. no taskExecutor constructor taskExecutorRegisterErr := errors.Errorf("constructor of taskExecutor for key not found") - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, taskExecutorRegisterErr).Times(2) taskExecutor := NewBaseTaskExecutor(ctx, "id", task1, mockSubtaskTable) taskExecutor.Extension = mockExtension err := taskExecutor.runStep(nil) @@ -68,7 +69,7 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, ctrl.Satisfied()) // 2. init subtask exec env failed - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() initErr := errors.New("init error") mockStepExecutor.EXPECT().Init(gomock.Any()).Return(initErr) @@ -297,7 +298,7 @@ func TestTaskExecutor(t *testing.T) { mockStepExecutor := mockexecute.NewMockStepExecutor(ctrl) mockExtension := mock.NewMockExtension(ctrl) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "id", taskID, proto.SubtaskStateFailed, gomock.Any()).Return(nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() // mock for checkBalanceSubtask mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", @@ -359,7 +360,7 @@ func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) { mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) // mock for runStep - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, @@ -451,14 +452,14 @@ func TestExecutorErrHandling(t *testing.T) { // GetStepExecutor meet retryable error. getSubtaskExecutorErr := errors.New("get executor err") mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, getSubtaskExecutorErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) // GetStepExecutor meet non retryable error. mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(nil, getSubtaskExecutorErr) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(nil, getSubtaskExecutorErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), getSubtaskExecutorErr) require.NoError(t, taskExecutor.RunStep(nil)) @@ -467,7 +468,7 @@ func TestExecutorErrHandling(t *testing.T) { // Init meet retryable error. initErr := errors.New("executor init err") mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) require.NoError(t, taskExecutor.RunStep(nil)) @@ -475,7 +476,7 @@ func TestExecutorErrHandling(t *testing.T) { // Init meet non retryable error. mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), initErr) @@ -485,7 +486,7 @@ func TestExecutorErrHandling(t *testing.T) { // Cleanup meet retryable error. cleanupErr := errors.New("cleanup err") mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ @@ -503,7 +504,7 @@ func TestExecutorErrHandling(t *testing.T) { // Cleanup meet non retryable error. mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ @@ -522,7 +523,7 @@ func TestExecutorErrHandling(t *testing.T) { // subtask succeed. mockSubtaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) + mockExtension.EXPECT().GetStepExecutor(gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{SubtaskBase: proto.SubtaskBase{ @@ -537,3 +538,11 @@ func TestExecutorErrHandling(t *testing.T) { require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) } + +func TestInject(t *testing.T) { + e := &EmptyStepExecutor{} + r := &proto.StepResource{CPU: proto.NewAllocatable(1)} + execute.SetFrameworkInfo(e, r) + got := e.GetResource() + require.Equal(t, r, got) +} diff --git a/pkg/disttask/framework/testutil/disttest_util.go b/pkg/disttask/framework/testutil/disttest_util.go index b2128f4c1f62a..fbf288dcad01f 100644 --- a/pkg/disttask/framework/testutil/disttest_util.go +++ b/pkg/disttask/framework/testutil/disttest_util.go @@ -56,7 +56,7 @@ func RegisterTaskMeta(t testing.TB, ctrl *gomock.Controller, schedulerExt schedu } mockStepExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes() executorExt.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes() - executorExt.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() + executorExt.EXPECT().GetStepExecutor(gomock.Any()).Return(mockStepExecutor, nil).AnyTimes() executorExt.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() registerTaskMetaInner(t, proto.TaskTypeExample, schedulerExt, executorExt, mockCleanupRountine) } @@ -104,7 +104,7 @@ func RegisterRollbackTaskMeta(t testing.TB, ctrl *gomock.Controller, schedulerEx stepExecutor.EXPECT().RealtimeSummary().Return(nil).AnyTimes() stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() executorExt.EXPECT().IsIdempotent(gomock.Any()).Return(true).AnyTimes() - executorExt.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any()).Return(stepExecutor, nil).AnyTimes() + executorExt.EXPECT().GetStepExecutor(gomock.Any()).Return(stepExecutor, nil).AnyTimes() executorExt.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() registerTaskMetaInner(t, proto.TaskTypeExample, schedulerExt, executorExt, mockCleanupRountine) diff --git a/pkg/disttask/framework/testutil/executor_util.go b/pkg/disttask/framework/testutil/executor_util.go index dd25c288f3995..98a041977a32f 100644 --- a/pkg/disttask/framework/testutil/executor_util.go +++ b/pkg/disttask/framework/testutil/executor_util.go @@ -37,7 +37,7 @@ func GetMockStepExecutor(ctrl *gomock.Controller) *mockexecute.MockStepExecutor func GetMockTaskExecutorExtension(ctrl *gomock.Controller, mockStepExecutor *mockexecute.MockStepExecutor) *mock.MockExtension { mockExtension := mock.NewMockExtension(ctrl) mockExtension.EXPECT(). - GetStepExecutor(gomock.Any(), gomock.Any()). + GetStepExecutor(gomock.Any()). Return(mockStepExecutor, nil).AnyTimes() mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false).AnyTimes() return mockExtension diff --git a/pkg/disttask/importinto/mock/import_mock.go b/pkg/disttask/importinto/mock/import_mock.go index 2aa27848b5bb5..13135320fbd38 100644 --- a/pkg/disttask/importinto/mock/import_mock.go +++ b/pkg/disttask/importinto/mock/import_mock.go @@ -5,6 +5,7 @@ // // mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor // + // Package mock is a generated GoMock package. package mock @@ -39,6 +40,11 @@ func (m *MockMiniTaskExecutor) EXPECT() *MockMiniTaskExecutorMockRecorder { return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockMiniTaskExecutor) ISGOMOCK() struct{} { + return struct{}{} +} + // Run mocks base method. func (m *MockMiniTaskExecutor) Run(arg0 context.Context, arg1, arg2 backend.EngineWriter) error { m.ctrl.T.Helper() diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 0f80c5c254fd8..f4db5e3377d8c 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -49,6 +49,8 @@ import ( // importStepExecutor is a executor for import step. // StepExecutor is equivalent to a Lightning instance. type importStepExecutor struct { + execute.StepExecFrameworkInfo + taskID int64 taskMeta *TaskMeta tableImporter *importer.TableImporter @@ -62,7 +64,6 @@ type importStepExecutor struct { importCtx context.Context importCancel context.CancelFunc wg sync.WaitGroup - resource *proto.StepResource } func getTableImporter( @@ -110,7 +111,7 @@ func (s *importStepExecutor) Init(ctx context.Context) error { s.tableImporter.CheckDiskQuota(s.importCtx) }() } - s.dataKVMemSizePerCon, s.perIndexKVMemSizePerCon = getWriterMemorySizeLimit(s.resource, s.tableImporter.Plan) + s.dataKVMemSizePerCon, s.perIndexKVMemSizePerCon = getWriterMemorySizeLimit(s.GetResource(), s.tableImporter.Plan) s.logger.Info("KV writer memory size limit per concurrency", zap.String("data", units.BytesSize(float64(s.dataKVMemSizePerCon))), zap.String("per-index", units.BytesSize(float64(s.perIndexKVMemSizePerCon)))) @@ -280,7 +281,6 @@ type mergeSortStepExecutor struct { // max(max-merged-files * max-file-size / max-part-num(10000), min-part-size) dataKVPartSize int64 indexKVPartSize int64 - resource *proto.StepResource } var _ execute.StepExecutor = &mergeSortStepExecutor{} @@ -294,7 +294,7 @@ func (m *mergeSortStepExecutor) Init(ctx context.Context) error { return err } m.controller = controller - dataKVMemSizePerCon, perIndexKVMemSizePerCon := getWriterMemorySizeLimit(m.resource, &m.taskMeta.Plan) + dataKVMemSizePerCon, perIndexKVMemSizePerCon := getWriterMemorySizeLimit(m.GetResource(), &m.taskMeta.Plan) m.dataKVPartSize = max(external.MinUploadPartSize, int64(dataKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000)) m.indexKVPartSize = max(external.MinUploadPartSize, int64(perIndexKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000)) @@ -367,11 +367,12 @@ func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Sub } type writeAndIngestStepExecutor struct { + execute.StepExecFrameworkInfo + taskID int64 taskMeta *TaskMeta logger *zap.Logger tableImporter *importer.TableImporter - resource *proto.StepResource store tidbkv.Storage } @@ -528,7 +529,7 @@ func (*importExecutor) IsRetryableError(err error) bool { return common.IsRetryableError(err) } -func (e *importExecutor) GetStepExecutor(task *proto.Task, stepResource *proto.StepResource) (execute.StepExecutor, error) { +func (e *importExecutor) GetStepExecutor(task *proto.Task) (execute.StepExecutor, error) { taskMeta := TaskMeta{} if err := json.Unmarshal(task.Meta, &taskMeta); err != nil { return nil, errors.Trace(err) @@ -545,7 +546,6 @@ func (e *importExecutor) GetStepExecutor(task *proto.Task, stepResource *proto.S taskID: task.ID, taskMeta: &taskMeta, logger: logger, - resource: stepResource, store: e.store, }, nil case proto.ImportStepMergeSort: @@ -553,14 +553,12 @@ func (e *importExecutor) GetStepExecutor(task *proto.Task, stepResource *proto.S taskID: task.ID, taskMeta: &taskMeta, logger: logger, - resource: stepResource, }, nil case proto.ImportStepWriteAndIngest: return &writeAndIngestStepExecutor{ taskID: task.ID, taskMeta: &taskMeta, logger: logger, - resource: stepResource, store: e.store, }, nil case proto.ImportStepPostProcess: diff --git a/pkg/disttask/importinto/task_executor_test.go b/pkg/disttask/importinto/task_executor_test.go index e1cd063ee8bf1..304d8a8c863a9 100644 --- a/pkg/disttask/importinto/task_executor_test.go +++ b/pkg/disttask/importinto/task_executor_test.go @@ -44,12 +44,12 @@ func TestImportTaskExecutor(t *testing.T) { proto.ImportStepWriteAndIngest, proto.ImportStepPostProcess, } { - exe, err := executor.GetStepExecutor(&proto.Task{TaskBase: proto.TaskBase{Step: step}, Meta: []byte("{}")}, nil) + exe, err := executor.GetStepExecutor(&proto.Task{TaskBase: proto.TaskBase{Step: step}, Meta: []byte("{}")}) require.NoError(t, err) require.NotNil(t, exe) } - _, err := executor.GetStepExecutor(&proto.Task{TaskBase: proto.TaskBase{Step: proto.StepInit}, Meta: []byte("{}")}, nil) + _, err := executor.GetStepExecutor(&proto.Task{TaskBase: proto.TaskBase{Step: proto.StepInit}, Meta: []byte("{}")}) require.Error(t, err) - _, err = executor.GetStepExecutor(&proto.Task{TaskBase: proto.TaskBase{Step: proto.ImportStepImport}, Meta: []byte("")}, nil) + _, err = executor.GetStepExecutor(&proto.Task{TaskBase: proto.TaskBase{Step: proto.ImportStepImport}, Meta: []byte("")}) require.Error(t, err) } diff --git a/pkg/util/sqlexec/mock/restricted_sql_executor_mock.go b/pkg/util/sqlexec/mock/restricted_sql_executor_mock.go index b0da3b6039828..7db258957e7ab 100644 --- a/pkg/util/sqlexec/mock/restricted_sql_executor_mock.go +++ b/pkg/util/sqlexec/mock/restricted_sql_executor_mock.go @@ -5,6 +5,7 @@ // // mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor // + // Package mock is a generated GoMock package. package mock @@ -41,6 +42,11 @@ func (m *MockRestrictedSQLExecutor) EXPECT() *MockRestrictedSQLExecutorMockRecor return m.recorder } +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockRestrictedSQLExecutor) ISGOMOCK() struct{} { + return struct{}{} +} + // ExecRestrictedSQL mocks base method. func (m *MockRestrictedSQLExecutor) ExecRestrictedSQL(arg0 context.Context, arg1 []func(*sqlexec.ExecOption), arg2 string, arg3 ...any) ([]chunk.Row, []*ast.ResultField, error) { m.ctrl.T.Helper()