Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM/Openapi: support start task by some conditions #5349

Merged
merged 22 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ade37a4
commit-message: init
WizardXiao May 6, 2022
fbdda1f
Merge branch 'master' into add-start-condition
WizardXiao May 6, 2022
903d115
commit-message: return nil when cli args is empty
WizardXiao May 7, 2022
20b7ab7
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 7, 2022
2f3fbfc
Merge branch 'add-start-condition' of https://github.com/WizardXiao/t…
WizardXiao May 7, 2022
dd630b9
commit-message: fix back duration ts caculate
WizardXiao May 9, 2022
9fb6480
commit-message: merge master
WizardXiao May 12, 2022
bf5ffec
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 12, 2022
503a294
commit-message: fix conflicts with master
WizardXiao May 13, 2022
7ba25be
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 13, 2022
1feed83
commit-message: add integration test about wait stop
WizardXiao May 14, 2022
406bb15
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 14, 2022
8ed9422
Merge branch 'master' into add-start-condition
WizardXiao May 14, 2022
b656160
Merge branch 'master' into add-start-condition
WizardXiao May 16, 2022
9ec9cd8
commit-message: fix wait stop time to stop task
WizardXiao May 16, 2022
a038691
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 16, 2022
9c535bd
Merge branch 'add-start-condition' of https://github.com/WizardXiao/t…
WizardXiao May 16, 2022
7a1f4d7
commit-message: address some comments
WizardXiao May 16, 2022
3d20d7e
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 16, 2022
202e192
commit-message: add negative case
WizardXiao May 16, 2022
fd8c8f9
Merge branch 'master' of https://github.com/WizardXiao/tiflow into ad…
WizardXiao May 16, 2022
dd14d08
commit-message: fix function name
WizardXiao May 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
subTaskCfg.Meta = meta
}
// check must set meta when mode is ModeIncrement
if subTaskCfg.Meta == nil && subTaskCfg.Mode == ModeIncrement {
return nil, terror.ErrConfigMetadataNotSet.Generate(i, ModeIncrement)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}

// set shard config
if task.ShardMode != nil {
subTaskCfg.IsSharding = true
Expand Down Expand Up @@ -684,3 +681,18 @@ func genFilterRuleName(sourceName string, idx int) string {
// NOTE that we don't have user input filter rule name in sub task config, so we make one by ourself
return fmt.Sprintf("%s-filter-rule-%d", sourceName, idx)
}

func OpenAPIStartTaskReqToTaskCliArgs(req openapi.StartTaskRequest) (*TaskCliArgs, error) {
cliArgs := &TaskCliArgs{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about return nil when all cli agrs are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me have a try.

if req.StartTime != nil {
cliArgs.StartTime = *req.StartTime
}
if req.SafeModeTimeDuration != nil {
cliArgs.SafeModeDuration = *req.SafeModeTimeDuration
}

if err := cliArgs.Verify(); err != nil {
return nil, err
}
return cliArgs, nil
}
31 changes: 29 additions & 2 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"encoding/json"
"fmt"

"github.com/pingcap/tiflow/dm/dm/master/scheduler"
"github.com/pingcap/tiflow/dm/pkg/ha"

"github.com/pingcap/tiflow/dm/checker"
dmcommon "github.com/pingcap/tiflow/dm/dm/common"
"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -652,10 +655,14 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
return nil
}

// TODO(ehco) support other start args after https://github.com/pingcap/tiflow/pull/4601 merged
var (
release scheduler.ReleaseFunc
err error
)
// removeMeta
if req.RemoveMeta != nil && *req.RemoveMeta {
// use same latch for remove-meta and start-task
release, err := s.scheduler.AcquireSubtaskLatch(taskName)
release, err = s.scheduler.AcquireSubtaskLatch(taskName)
if err != nil {
return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName)
}
Expand All @@ -666,8 +673,28 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
if err != nil {
return terror.Annotate(err, "while removing metadata")
}
}

cliArgs, err := config.OpenAPIStartTaskReqToTaskCliArgs(req)
if err != nil {
return terror.Annotate(err, "while converting task command line arguments")
}

if cliArgs.StartTime == "" {
err = ha.DeleteAllTaskCliArgs(s.etcdClient, taskName)
if err != nil {
return terror.Annotate(err, "while removing task command line arguments")
}
} else {
err = ha.PutTaskCliArgs(s.etcdClient, taskName, *req.SourceNameList, *cliArgs)
if err != nil {
return terror.Annotate(err, "while putting task command line arguments")
}
}
if release != nil {
release()
}

return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, *req.SourceNameList...)
}

Expand Down
20 changes: 20 additions & 0 deletions dm/dm/master/openapi_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"testing"

"github.com/pingcap/tiflow/dm/pkg/ha"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/dm/checker"
"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -338,6 +340,24 @@ func (s *OpenAPIControllerSuite) TestTaskController() {
// stop success
s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{}))
s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped)

// start with cli args
startTime := "2022-05-05 12:12:12"
safeModeTimeDuration := "10s"
req = openapi.StartTaskRequest{
StartTime: &startTime,
SafeModeTimeDuration: &safeModeTimeDuration,
}
s.Nil(server.startTask(ctx, s.testTask.Name, req))
taskCliConf, err := ha.GetTaskCliArgs(server.etcdClient, s.testTask.Name, s.testSource.SourceName)
s.Nil(err)
s.NotNil(taskCliConf)
s.Equal(startTime, taskCliConf.StartTime)
s.Equal(safeModeTimeDuration, taskCliConf.SafeModeDuration)

// stop success
s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{}))
s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped)
}

// delete
Expand Down
202 changes: 102 additions & 100 deletions dm/openapi/gen.server.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions dm/openapi/gen.types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions dm/openapi/spec/dm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2168,13 +2168,14 @@ components:
description: whether to remove meta database in downstream database
source_name_list:
$ref: "#/components/schemas/SourceNameList"
# start_time:
# type: string
# example: "2006-01-02 15:04:05"
# description: task start time
# safe_mode_time_duration:
# example: "1s"
# description: time duration of safe mode
start_time:
type: string
example: "2006-01-02 15:04:05"
description: task start time
safe_mode_time_duration:
type: string
example: "10s"
description: time duration of safe mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also support WaitTimeOnStop in cliArgs now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitTimeOnStop is not in the requirement, maybe i will add later pr and add some integration test.

StopTaskRequest:
type: object
properties:
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/safe_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func TestEnableSafeModeInitializationPhase(t *testing.T) {
require.True(t, s.safeMode.Enable())
s.Lock()
require.Nil(t, s.exitSafeModeTS) // not meet the first binlog
firstBinlogTS := int64(1)
firstBinlogTS := int64(1000)
require.NoError(t, s.initSafeModeExitTS(firstBinlogTS))
require.NotNil(t, s.exitSafeModeTS) // not meet the first binlog
require.Equal(t, int64(3), *s.exitSafeModeTS)
require.Equal(t, int64(3000), *s.exitSafeModeTS)
require.Equal(t, firstBinlogTS, *s.firstMeetBinlogTS)
s.Unlock()
require.NoError(t, s.checkAndExitSafeModeByBinlogTS(s.tctx, *s.exitSafeModeTS)) // not exit when binlog TS == exit TS
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

// set exitSafeModeTS when meet first binlog
if s.firstMeetBinlogTS == nil && s.cliArgs != nil && s.cliArgs.SafeModeDuration != "" {
if s.firstMeetBinlogTS == nil && s.cliArgs != nil && s.cliArgs.SafeModeDuration != "" && int64(e.Header.Timestamp) != 0 {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
if checkErr := s.initSafeModeExitTS(int64(e.Header.Timestamp)); checkErr != nil {
return checkErr
}
Expand Down Expand Up @@ -2185,7 +2185,7 @@ func (s *Syncer) initSafeModeExitTS(firstBinlogTS int64) error {
return err
}
s.firstMeetBinlogTS = &firstBinlogTS
exitTS := firstBinlogTS + int64(duration.Seconds())
exitTS := firstBinlogTS + int64(duration.Seconds())*1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need *1000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, firstBinlogTS which comes from Timestamp is the value of millisecond,but duration is second. i find it when i run integration test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use duration.Milliseconds() then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://dev.mysql.com/doc/internals/en/binlog-event-header.html this should be the unit of second, not millisecond 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I have tried again, the value is uint32, i will fix back. I may modify this because I encounter another problem,the ts may be 0 when the event type is ROTATE_EVENT and then i modify here and avoid 0, but here is wrong.

s.exitSafeModeTS = &exitTS
s.tctx.L().Info("safe-mode will disable by task cli args", zap.Int64("ts", exitTS))
return nil
Expand Down
77 changes: 77 additions & 0 deletions dm/tests/openapi/client/openapi_task_check
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,66 @@ def create_noshard_task_success(task_name, tartget_table_name=""):
print("create_noshard_task_success resp=", resp.json())
assert resp.status_code == 201

def create_incremental_task_with_gitd_success(task_name,binlog_name1,binlog_pos1,binlog_gtid1,binlog_name2,binlog_pos2,binlog_gtid2):
task = {
"name": task_name,
"task_mode": "incremental",
"meta_schema": "dm_meta",
"enhance_online_schema_change": True,
"on_duplicate": "error",
"target_config": {
"host": "127.0.0.1",
"port": 4000,
"user": "root",
"password": "",
},
"table_migrate_rule": [
{
"source": {
"source_name": SOURCE1_NAME,
"schema": "openapi",
"table": "*",
},
"target": {"schema": "openapi", "table": ""},
},
{
"source": {
"source_name": SOURCE2_NAME,
"schema": "openapi",
"table": "*",
},
"target": {"schema": "openapi", "table": ""},
},
],
"source_config": {
"source_conf": [
{"source_name": SOURCE1_NAME},
{"source_name": SOURCE2_NAME},
],
},
}

if binlog_pos1 != "":
task["source_config"] = {
"source_conf": [
{
"source_name": SOURCE1_NAME,
"binlog_name": binlog_name1,
"binlog_pos": int(binlog_pos1),
"binlog_gtid": binlog_gtid1,
},
{
"source_name": SOURCE2_NAME,
"binlog_name": binlog_name2,
"binlog_pos": int(binlog_pos2),
"binlog_gtid": binlog_gtid2,
},
],
}

resp = requests.post(url=API_ENDPOINT, json={"task": task})
print("create_incremental_task_with_gitd_success resp=", resp.json())
assert resp.status_code == 201

def create_shard_task_success():
task = {
Expand Down Expand Up @@ -170,6 +230,21 @@ def start_task_success(task_name, source_name):
print("start_task_failed resp=", resp.json())
assert resp.status_code == 200

def start_task_with_condition(task_name, start_time, duration, is_success, check_result):
url = API_ENDPOINT + "/" + task_name + "/start"
req = {"start_time": start_time, "safe_mode_time_duration": duration}
resp = requests.post(url=url, json=req)
if is_success == "success":
if resp.status_code != 200:
print("start_task_with_condition_failed resp=", resp.json())
assert resp.status_code == 200
print("start_task_with_condition success")
else:
if resp.status_code == 200:
print("start_task_with_condition_failed resp should not be 200")
assert resp.status_code == 400
print("start_task_with_condition resp=", resp.json())
assert check_result in resp.json()["error_msg"]

def stop_task_success(task_name, source_name):
url = API_ENDPOINT + "/" + task_name + "/stop"
Expand Down Expand Up @@ -568,10 +643,12 @@ if __name__ == "__main__":
"create_task_failed": create_task_failed,
"create_noshard_task_success": create_noshard_task_success,
"create_shard_task_success": create_shard_task_success,
"create_incremental_task_with_gitd_success": create_incremental_task_with_gitd_success,
"delete_task_failed": delete_task_failed,
"delete_task_success": delete_task_success,
"delete_task_with_force_success": delete_task_with_force_success,
"start_task_success": start_task_success,
"start_task_with_condition": start_task_with_condition,
"stop_task_success": stop_task_success,
"get_task_list": get_task_list,
"get_task_list_with_status": get_task_list_with_status,
Expand Down
Loading