Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM/Openapi: support start task by some conditions #5349

Merged
merged 22 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ade37a4
commit-message: init
WizardXiao May 6, 2022
fbdda1f
Merge branch 'master' into add-start-condition
WizardXiao May 6, 2022
903d115
commit-message: return nil when cli args is empty
WizardXiao May 7, 2022
20b7ab7
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 7, 2022
2f3fbfc
Merge branch 'add-start-condition' of https://github.com/WizardXiao/t…
WizardXiao May 7, 2022
dd630b9
commit-message: fix back duration ts caculate
WizardXiao May 9, 2022
9fb6480
commit-message: merge master
WizardXiao May 12, 2022
bf5ffec
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 12, 2022
503a294
commit-message: fix conflicts with master
WizardXiao May 13, 2022
7ba25be
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 13, 2022
1feed83
commit-message: add integration test about wait stop
WizardXiao May 14, 2022
406bb15
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 14, 2022
8ed9422
Merge branch 'master' into add-start-condition
WizardXiao May 14, 2022
b656160
Merge branch 'master' into add-start-condition
WizardXiao May 16, 2022
9ec9cd8
commit-message: fix wait stop time to stop task
WizardXiao May 16, 2022
a038691
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 16, 2022
9c535bd
Merge branch 'add-start-condition' of https://github.com/WizardXiao/t…
WizardXiao May 16, 2022
7a1f4d7
commit-message: address some comments
WizardXiao May 16, 2022
3d20d7e
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 16, 2022
202e192
commit-message: add negative case
WizardXiao May 16, 2022
fd8c8f9
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 16, 2022
dd14d08
commit-message: fix function name
WizardXiao May 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium],
ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file."
ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file."
ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file."
ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%d) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file."
ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file."
ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file."
ErrConfigFilterRuleNotFound,[code=20024:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s filter-rules %s not exist in filters, Workaround: Please check the `filter-rules` config in task configuration file."
ErrConfigColumnMappingNotFound,[code=20025:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s column-mapping-rules %s not exist in column-mapping, Workaround: Please check the `column-mapping-rules` config in task configuration file."
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (c *TaskConfig) adjust() error {
}
case ModeIncrement:
if inst.Meta == nil {
return terror.ErrConfigMetadataNotSet.Generate(i, c.TaskMode)
return terror.ErrConfigMetadataNotSet.Generate(inst.SourceID, c.TaskMode)
}
err := inst.Meta.Verify()
if err != nil {
Expand Down
37 changes: 35 additions & 2 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
subTaskCfg.Meta = meta
}
// check must set meta when mode is ModeIncrement

// if there is no meta for incremental task, we print a warning log
if subTaskCfg.Meta == nil && subTaskCfg.Mode == ModeIncrement {
return nil, terror.ErrConfigMetadataNotSet.Generate(i, ModeIncrement)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
log.L().Warn("mysql-instance doesn't set meta for incremental mode, user should specify start_time to start task.", zap.String("sourceID", sourceCfg.SourceName))
}

// set shard config
if task.ShardMode != nil {
subTaskCfg.IsSharding = true
Expand Down Expand Up @@ -691,3 +693,34 @@ func genFilterRuleName(sourceName string, idx int) string {
// NOTE that we don't have user input filter rule name in sub task config, so we make one by ourself
return fmt.Sprintf("%s-filter-rule-%d", sourceName, idx)
}

func OpenAPIStartTaskReqToTaskCliArgs(req openapi.StartTaskRequest) (*TaskCliArgs, error) {
if req.StartTime == nil && req.SafeModeTimeDuration == nil {
return nil, nil
}
cliArgs := &TaskCliArgs{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about return nil when all cli agrs are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me have a try.

if req.StartTime != nil {
cliArgs.StartTime = *req.StartTime
}
if req.SafeModeTimeDuration != nil {
cliArgs.SafeModeDuration = *req.SafeModeTimeDuration
}

if err := cliArgs.Verify(); err != nil {
return nil, err
}
return cliArgs, nil
}

func OpenAPIStopTasReqToTaskCliArgs(req openapi.StopTaskRequest) (*TaskCliArgs, error) {
if req.TimeoutDuration == nil {
return nil, nil
}
cliArgs := &TaskCliArgs{
WaitTimeOnStop: *req.TimeoutDuration,
}
if err := cliArgs.Verify(); err != nil {
return nil, err
}
return cliArgs, nil
}
58 changes: 54 additions & 4 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ import (
"fmt"
"strings"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/dm/master/scheduler"
"github.com/pingcap/tiflow/dm/pkg/ha"

"github.com/pingcap/tiflow/dm/checker"
dmcommon "github.com/pingcap/tiflow/dm/dm/common"
"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -377,7 +382,7 @@ func (s *Server) checkOpenAPITaskBeforeOperate(ctx context.Context, task *openap
if sourceCfg := s.scheduler.GetSourceCfgByID(cfg.SourceName); sourceCfg != nil {
sourceCfgMap[cfg.SourceName] = sourceCfg
} else {
return nil, "", terror.ErrSchedulerSourceCfgNotExist.Generate(sourceCfg.SourceID)
return nil, "", terror.ErrSchedulerSourceCfgNotExist.Generate(cfg.SourceName)
}
}
// generate sub task configs
Expand Down Expand Up @@ -666,6 +671,10 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
if !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
}
// start task check. incremental task need to specify meta or start time
if subTaskCfg.Meta == nil && subTaskCfg.Mode == config.ModeIncrement && req.StartTime == nil {
return terror.ErrConfigMetadataNotSet.Generate(sourceName, config.ModeIncrement)
}
cfg := s.scheduler.GetSourceCfgByID(sourceName)
if cfg == nil {
return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
Expand All @@ -679,10 +688,14 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
return nil
}

// TODO(ehco) support other start args after https://github.com/pingcap/tiflow/pull/4601 merged
var (
release scheduler.ReleaseFunc
err error
)
// removeMeta
if req.RemoveMeta != nil && *req.RemoveMeta {
// use same latch for remove-meta and start-task
release, err := s.scheduler.AcquireSubtaskLatch(taskName)
release, err = s.scheduler.AcquireSubtaskLatch(taskName)
if err != nil {
return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName)
}
Expand All @@ -693,8 +706,21 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
if err != nil {
return terror.Annotate(err, "while removing metadata")
}
}

// handle task cli args
cliArgs, err := config.OpenAPIStartTaskReqToTaskCliArgs(req)
if err != nil {
return terror.Annotate(err, "while converting task command line arguments")
}

if err = handleCliArgs(s.etcdClient, taskName, *req.SourceNameList, cliArgs); err != nil {
return err
}
if release != nil {
release()
}

return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, *req.SourceNameList...)
}

Expand All @@ -705,10 +731,34 @@ func (s *Server) stopTask(ctx context.Context, taskName string, req openapi.Stop
sourceNameList := openapi.SourceNameList(s.getTaskSourceNameList(taskName))
req.SourceNameList = &sourceNameList
}
// TODO(ehco): support stop req after https://github.com/pingcap/tiflow/pull/4601 merged
// handle task cli args
cliArgs, err := config.OpenAPIStopTasReqToTaskCliArgs(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cliArgs, err := config.OpenAPIStopTasReqToTaskCliArgs(req)
cliArgs, err := config.OpenAPIStopTaskReqToTaskCliArgs(req)

if err != nil {
return terror.Annotate(err, "while converting task command line arguments")
}
if err = handleCliArgs(s.etcdClient, taskName, *req.SourceNameList, cliArgs); err != nil {
return err
}
return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Stopped, taskName, *req.SourceNameList...)
}

// handleCliArgs handles cli args.
// it will try to delete args if cli args is nil.
func handleCliArgs(cli *clientv3.Client, taskName string, sources []string, cliArgs *config.TaskCliArgs) error {
if cliArgs == nil {
err := ha.DeleteTaskCliArgs(cli, taskName, sources)
if err != nil {
return terror.Annotate(err, "while removing task command line arguments")
}
} else {
err := ha.PutTaskCliArgs(cli, taskName, sources, *cliArgs)
if err != nil {
return terror.Annotate(err, "while putting task command line arguments")
}
}
return nil
}

// nolint:unparam
func (s *Server) convertTaskConfig(ctx context.Context, req openapi.ConverterTaskRequest) (*openapi.Task, *config.TaskConfig, error) {
if req.TaskConfigFile != nil {
Expand Down
20 changes: 20 additions & 0 deletions dm/dm/master/openapi_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"testing"

"github.com/pingcap/tiflow/dm/pkg/ha"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/dm/checker"
"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -372,6 +374,24 @@ func (s *OpenAPIControllerSuite) TestTaskController() {
// stop success
s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{}))
s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped)

// start with cli args
startTime := "2022-05-05 12:12:12"
safeModeTimeDuration := "10s"
req = openapi.StartTaskRequest{
StartTime: &startTime,
SafeModeTimeDuration: &safeModeTimeDuration,
}
s.Nil(server.startTask(ctx, s.testTask.Name, req))
taskCliConf, err := ha.GetTaskCliArgs(server.etcdClient, s.testTask.Name, s.testSource.SourceName)
s.Nil(err)
s.NotNil(taskCliConf)
s.Equal(startTime, taskCliConf.StartTime)
s.Equal(safeModeTimeDuration, taskCliConf.SafeModeDuration)

// stop success
s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{}))
s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped)
}

// delete
Expand Down
2 changes: 1 addition & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ workaround = "Please check the `target-database` config in task configuration fi
tags = ["internal", "medium"]

[error.DM-config-20022]
message = "mysql-instance(%d) must set meta for task-mode %s"
message = "mysql-instance(%s) must set meta for task-mode %s"
description = ""
workaround = "Please check the `meta` config in task configuration file."
tags = ["internal", "medium"]
Expand Down
Loading