Skip to content

Commit

Permalink
checker(dm): make start-task/resume-task/check-task return pre-chec…
Browse files Browse the repository at this point in the history
…k result only warning (#4118)

ref #3608
  • Loading branch information
okJiang authored Jan 12, 2022
1 parent be45fd5 commit 750816f
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 125 deletions.
3 changes: 3 additions & 0 deletions dm/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ linters:
# - maligned

linters-settings:
dupl:
# tokens count to trigger issue, 150 by default
threshold: 200
govet:
# report about shadowed variables
check-shadowing: true
Expand Down
98 changes: 73 additions & 25 deletions dm/checker/check_test.go

Large diffs are not rendered by default.

97 changes: 50 additions & 47 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,59 +283,59 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
errs = append(errs, unit.NewProcessError(err))
} else if !result.Summary.Passed {
errs = append(errs, unit.NewProcessError(errors.New("check was failed, please see detail")))
warnLeft, errLeft := c.warnCnt, c.errCnt
}
warnLeft, errLeft := c.warnCnt, c.errCnt

// remove success result if not pass
results := result.Results[:0]
for _, r := range result.Results {
if r.State == checker.StateSuccess {
continue
}
// remove success result if not pass
results := result.Results[:0]
for _, r := range result.Results {
if r.State == checker.StateSuccess {
continue
}

// handle results without r.Errors
if len(r.Errors) == 0 {
switch r.State {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
results = append(results, r)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
results = append(results, r)
// handle results without r.Errors
if len(r.Errors) == 0 {
switch r.State {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
continue
warnLeft--
results = append(results, r)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
results = append(results, r)
}
continue
}

subErrors := make([]*checker.Error, 0, len(r.Errors))
for _, e := range r.Errors {
switch e.Severity {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
subErrors = append(subErrors, e)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
subErrors = append(subErrors, e)
subErrors := make([]*checker.Error, 0, len(r.Errors))
for _, e := range r.Errors {
switch e.Severity {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
subErrors = append(subErrors, e)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
subErrors = append(subErrors, e)
}
// skip display an empty Result
if len(subErrors) > 0 {
r.Errors = subErrors
results = append(results, r)
}
}
result.Results = results
// skip display an empty Result
if len(subErrors) > 0 {
r.Errors = subErrors
results = append(results, r)
}
}
result.Results = results

c.updateInstruction(result)

Expand All @@ -345,9 +345,12 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
default:
}

rawResult, err := json.MarshalIndent(result, "\t", "\t")
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal error %v", err))
var rawResult []byte
if result.Summary.Successful != result.Summary.Total {
rawResult, err = json.MarshalIndent(result, "\t", "\t")
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal error %v", err))
}
}

c.result.Lock()
Expand Down
27 changes: 17 additions & 10 deletions dm/checker/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,31 @@ package checker

import (
"context"
"fmt"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

var (
// ErrorMsgHeader used as the header of the error message when checking config failed.
ErrorMsgHeader = "fail to check synchronization configuration with type"
// CheckTaskMsgHeader used as the header of the error/warning message when checking config failed.
CheckTaskMsgHeader = "fail to check synchronization configuration with type"

CheckTaskSuccess = "check pass!!!"

// CheckSyncConfigFunc holds the CheckSyncConfig function.
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error)
)

func init() {
CheckSyncConfigFunc = CheckSyncConfig
}

// CheckSyncConfig checks synchronization configuration.
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error {
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) {
if len(cfgs) == 0 {
return nil
return "", nil
}

// all `IgnoreCheckingItems` and `Mode` of sub-task are same, so we take first one
Expand All @@ -53,25 +56,29 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt,
}
checkingItems := config.FilterCheckingItems(ignoreCheckingItems)
if len(checkingItems) == 0 {
return nil
return "", nil
}

c := NewChecker(cfgs, checkingItems, errCnt, warnCnt)

if err := c.Init(ctx); err != nil {
return terror.Annotate(err, "fail to initial checker")
return "", terror.Annotate(err, "fail to initial checker")
}
defer c.Close()

pr := make(chan pb.ProcessResult, 1)
c.Process(ctx, pr)
for len(pr) > 0 {
if len(pr) > 0 {
r := <-pr
// we only want first error
if len(r.Errors) > 0 {
return terror.ErrTaskCheckSyncConfigError.Generate(ErrorMsgHeader, r.Errors[0].Message, string(r.Detail))
return "", terror.ErrTaskCheckSyncConfigError.Generate(CheckTaskMsgHeader, r.Errors[0].Message, string(r.Detail))
}
if len(r.Detail) == 0 {
return CheckTaskSuccess, nil
}
return fmt.Sprintf("%s: no errors but some warnings\n detail: %s", CheckTaskMsgHeader, string(r.Detail)), nil
}

return nil
return "", nil
}
2 changes: 1 addition & 1 deletion dm/dm/ctl/master/check_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) error {
return err
}

if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) {
if !common.PrettyPrintResponseWithCheckTask(resp, checker.CheckTaskMsgHeader) {
common.PrettyPrintResponse(resp)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/ctl/master/start_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error {
return err
}

if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) {
if !common.PrettyPrintResponseWithCheckTask(resp, checker.CheckTaskMsgHeader) {
common.PrettyPrintResponse(resp)
}
return nil
Expand Down
9 changes: 7 additions & 2 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,16 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
for i := range subTaskConfigList {
subTaskConfigPList[i] = &subTaskConfigList[i]
}
if err = checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList,
common.DefaultErrorCnt, common.DefaultWarnCnt); err != nil {
msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList,
common.DefaultErrorCnt, common.DefaultWarnCnt)
if err != nil {
_ = c.Error(terror.WithClass(err, terror.ClassDMMaster))
return
}
if len(msg) != 0 {
// TODO: return warning msg with http.StatusCreated and task together
log.L().Warn("openapi pre-check warning before start task", zap.String("warning", msg))
}
// specify only start task on partial sources
var needStartSubTaskList []config.SubTaskConfig
if req.SourceNameList != nil {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,6 @@ func mockTaskQueryStatus(
).Return(queryResp, nil).MaxTimes(maxRetryNum)
}

func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error {
return nil
func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) {
return "", nil
}
Loading

0 comments on commit 750816f

Please sign in to comment.