Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: add StepExecFrameworkInfo as an example of Impl access Base #52192

Merged
merged 7 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ tools/bin/gotestsum:

# [email protected] is imcompatible with v0.3.0, so install it always.
mockgen:
GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/mockgen@v0.3.0
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/lance6716/mock/mockgen@v0.4.0-patch

# Usage:
#
Expand Down Expand Up @@ -392,8 +392,8 @@ mock_lightning: mockgen
gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension > pkg/disttask/framework/scheduler/mock/scheduler_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -destination pkg/disttask/framework/scheduler/mock/scheduler_mock.go -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension
tools/bin/mockgen -embed -package mockexecute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/mock/storage/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *backfillDistExecutor) Init(ctx context.Context) error {
return nil
}

func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) {
func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task) (execute.StepExecutor, error) {
switch task.Step {
case proto.BackfillStepReadIndex, proto.BackfillStepMergeSort, proto.BackfillStepWriteAndIngest:
return s.newBackfillSubtaskExecutor(task.Step)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
)

type readIndexExecutor struct {
execute.StepExecFrameworkInfo
d *ddl
job *model.Job
indexes []*model.IndexInfo
Expand Down
39 changes: 36 additions & 3 deletions pkg/disttask/framework/mock/execute/execute_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/disttask/framework/mock/plan_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 25 additions & 4 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/disttask/framework/scheduler/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 15,
shard_count = 16,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor/execute",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
"//pkg/testkit",
Expand Down
49 changes: 49 additions & 0 deletions pkg/disttask/framework/taskexecutor/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package execute

import (
"context"
"reflect"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
)
Expand All @@ -29,6 +30,8 @@ import (
// else OnFinished
// Cleanup
type StepExecutor interface {
StepExecFrameworkInfo

// Init is used to initialize the environment.
// if failed, task executor will retry later.
Init(context.Context) error
Expand All @@ -49,3 +52,49 @@ type StepExecutor interface {
type SubtaskSummary struct {
RowCount int64
}

// StepExecFrameworkInfo is an interface that should be embedded into the
// implementation of StepExecutor. It's set by the framework automatically and
// the implementation can use it to access necessary information. The framework
// will init it before `StepExecutor.Init`, before that you cannot call methods
// in this interface.
type StepExecFrameworkInfo interface {
// restricted is a private method to prevent other package mistakenly implements
// StepExecFrameworkInfo. So when StepExecFrameworkInfo is composed with other
// interfaces, the implementation of other interface must embed
// StepExecFrameworkInfo.
restricted()
// GetResource returns the expected resource of this step executor.
GetResource() *proto.StepResource
}

var stepExecFrameworkInfoName = reflect.TypeOf((*StepExecFrameworkInfo)(nil)).Elem().Name()

type frameworkInfo struct {
resource *proto.StepResource
}

func (*frameworkInfo) restricted() {}

func (f *frameworkInfo) GetResource() *proto.StepResource {
return f.resource
}

// SetFrameworkInfo sets the framework info for the StepExecutor.
func SetFrameworkInfo(exec StepExecutor, resource *proto.StepResource) {
if exec == nil {
return
}
toInject := &frameworkInfo{resource: resource}
// use reflection to set the framework info
e := reflect.ValueOf(exec)
if e.Kind() == reflect.Ptr || e.Kind() == reflect.Interface {
e = e.Elem()
}
info := e.FieldByName(stepExecFrameworkInfoName)
// if `exec` embeds StepExecutor rather than StepExecFrameworkInfo, the field
// will not be found. This is happened in mock generated code.
if info.IsValid() && info.CanSet() {
info.Set(reflect.ValueOf(toInject))
}
}
Loading
Loading