Skip to content

Commit

Permalink
precheck(dm): refine error message & instruction (#7696)
Browse files Browse the repository at this point in the history
close #7621
  • Loading branch information
buchuitoudegou committed Nov 29, 2022
1 parent e1aa525 commit 92523b2
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 36 deletions.
8 changes: 4 additions & 4 deletions dm/checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestVersionChecking(t *testing.T) {
AddRow("version", "10.1.29-MariaDB"))
msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
require.NoError(t, err)
require.Contains(t, msg, "Migrating from MariaDB is experimentally supported")
require.Contains(t, msg, "Migrating from MariaDB is still experimental")

mock = initMockDB(t)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
Expand All @@ -234,7 +234,7 @@ func TestVersionChecking(t *testing.T) {
require.NoError(t, err)
require.True(t, result.Summary.Passed)
require.Equal(t, int64(1), result.Summary.Warning)
require.Contains(t, result.Results[0].Errors[0].ShortErr, "Migrating from MariaDB is experimentally supported")
require.Contains(t, result.Results[0].Errors[0].ShortErr, "Migrating from MariaDB is still experimental.")

// too low MySQL version

Expand Down Expand Up @@ -269,15 +269,15 @@ func TestServerIDChecking(t *testing.T) {
AddRow("server_id", "0"))
msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
require.NoError(t, err)
require.Contains(t, msg, "please set server_id greater than 0")
require.Contains(t, msg, "Set server_id greater than 0")

mock = initMockDB(t)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'server_id'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("server_id", "0"))
result, err := RunCheckOnConfigs(context.Background(), cfgs, false)
require.NoError(t, err)
require.True(t, result.Summary.Passed)
require.Contains(t, result.Results[0].Instruction, "please set server_id greater than 0")
require.Contains(t, result.Results[0].Instruction, "Set server_id greater than 0")

// happy path

Expand Down
12 changes: 7 additions & 5 deletions dm/pkg/checker/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (pc *MySQLBinlogEnableChecker) Check(ctx context.Context) *Result {
}
if strings.ToUpper(value) != "ON" {
result.Errors = append(result.Errors, NewError("log_bin is %s, and should be ON", value))
result.Instruction = "ref document: https://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html"
result.Instruction = "MySQL as source: please refer to the document to enable the binlog https://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html"
return result
}
result.State = StateSuccess
Expand Down Expand Up @@ -94,7 +94,7 @@ func (pc *MySQLBinlogFormatChecker) Check(ctx context.Context) *Result {
}
if strings.ToUpper(value) != "ROW" {
result.Errors = append(result.Errors, NewError("binlog_format is %s, and should be ROW", value))
result.Instruction = "please execute 'set global binlog_format=ROW;'"
result.Instruction = "MySQL as source: please execute 'set global binlog_format=ROW;'; AWS Aurora (MySQL)/RDS MySQL as source: please refer to the document to create a new DB parameter group and set the binlog_format=row: https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/USER_WorkingWithDBInstanceParamGroups.html. Then modify the instance to use the new DB parameter group and restart the instance to take effect."
return result
}
result.State = StateSuccess
Expand Down Expand Up @@ -172,7 +172,7 @@ func (pc *MySQLBinlogRowImageChecker) Check(ctx context.Context) *Result {
}
if strings.ToUpper(value) != "FULL" {
result.Errors = append(result.Errors, NewError("binlog_row_image is %s, and should be FULL", value))
result.Instruction = "please execute 'set global binlog_row_image = FULL;'"
result.Instruction = "MySQL as source: please execute 'set global binlog_row_image = FULL;'; AWS Aurora (MySQL)/RDS MySQL as source: please refer to the document to create a new DB parameter group and set the binlog_row_image = FULL: https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/USER_WorkingWithDBInstanceParamGroups.html Then modify the instance to use the new DB parameter group and restart the instance to take effect."
return result
}
result.State = StateSuccess
Expand Down Expand Up @@ -237,7 +237,8 @@ func (c *BinlogDBChecker) Check(ctx context.Context) *Result {
}
if len(c.schemas) > 0 {
dbs := utils.SetToSlice(c.schemas)
result.Extra = fmt.Sprintf("these dbs [%s] are not in binlog_do_db[%s]", strings.Join(dbs, ","), binlogDoDB)
result.Errors = append(result.Errors, NewWarn("these dbs [%s] are not in binlog_do_db[%s]", strings.Join(dbs, ","), binlogDoDB))
result.Instruction = "Ensure that the do_dbs contains the dbs you want to migrate"
return result
}
} else {
Expand All @@ -248,7 +249,8 @@ func (c *BinlogDBChecker) Check(ctx context.Context) *Result {
}
}
if len(ignoreDBs) > 0 {
result.Extra = fmt.Sprintf("db [%s] is in binlog_ignore_db[%s]", strings.Join(ignoreDBs, ","), binlogIgnoreDB)
result.Errors = append(result.Errors, NewWarn("these dbs [%s] are in binlog_ignore_db[%s]", strings.Join(ignoreDBs, ","), binlogIgnoreDB))
result.Instruction = "Ensure that the ignore_dbs does not contain the dbs you want to migrate"
return result
}
}
Expand Down
4 changes: 4 additions & 0 deletions dm/pkg/checker/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func TestBinlogDB(t *testing.T) {
r := binlogDBChecker.Check(ctx)
require.Nil(t, mock.ExpectationsWereMet())
require.Equal(t, cs.state, r.State)
// the error message is moved to Errors
if r.State == StateFailure {
require.Equal(t, 1, len(r.Errors))
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions dm/pkg/checker/conn_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ func (c *connNumberChecker) check(ctx context.Context, checkerName string, neede
neededConn,
),
)
result.Instruction = "set larger max_connections or adjust the configuration of dm"
result.Instruction = "You need to set a larger max_connection, or adjust the configuration of DM such as reducing the worker count of sycner and reducing the pool size of the dumper and loader."
result.State = StateFailure
} else if maxConn-usedConn < neededConn {
// if we don't have enough privilege to check the user's connection number,
// usedConn is 0
result.State = StateWarning
result.Instruction = "set larger max_connections or adjust the configuration of dm"
result.Instruction = "You need to set a larger max_connection, or adjust the configuration of DM such as reducing the worker count of sycner and reducing the pool size of the dumper and loader."
result.Errors = append(
result.Errors,
NewError(
Expand Down Expand Up @@ -182,9 +182,10 @@ func (l *LoaderConnNumberChecker) Check(ctx context.Context) *Result {
// because lightning doesn't need to keep connections while restoring.
result.Errors = append(
result.Errors,
NewWarn("task precheck cannot accurately check the number of connection needed for Lightning, please set a sufficiently large connections for TiDB"),
NewWarn("task precheck cannot accurately check the number of connection needed for Lightning."),
)
result.State = StateWarning
result.Instruction = "You need to set a larger connection for TiDB."
break
}
}
Expand Down
22 changes: 16 additions & 6 deletions dm/pkg/checker/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (pc *MySQLVersionChecker) Check(ctx context.Context) *Result {

err2 := pc.checkVersion(value, result)
if err2 != nil {
result.Instruction = err2.Instruction
err2.Instruction = ""
result.Errors = append(result.Errors, err2)
}
return result
Expand All @@ -72,10 +74,14 @@ func (pc *MySQLVersionChecker) Check(ctx context.Context) *Result {
func (pc *MySQLVersionChecker) checkVersion(value string, result *Result) *Error {
needVersion := SupportedVersion["mysql"]
if utils.IsMariaDB(value) {
return NewWarn("Migrating from MariaDB is experimentally supported. If you must use DM to migrate data from MariaDB, we suggest make your MariaDB >= 10.1.2")
err := NewWarn("Migrating from MariaDB is still experimental.")
err.Instruction = "It is recommended that you upgrade MariaDB to 10.1.2 or a later version."
return err
}
if IsTiDBFromVersion(value) {
return NewWarn("Not support migrate from TiDB")
err := NewWarn("migration from TiDB not supported")
err.Instruction = "TiDB is not supported as an upstream database."
return err
}

version, err := toMySQLVersion(value)
Expand All @@ -85,11 +91,15 @@ func (pc *MySQLVersionChecker) checkVersion(value string, result *Result) *Error
}

if !version.Ge(needVersion.Min) {
return NewWarn("version suggested at least %v but got %v", needVersion.Min, version)
err := NewWarn("version suggested at least %v but got %v", needVersion.Min, version)
err.Instruction = "It is recommended that you upgrade the database to the required version before performing data migration. Otherwise data inconsistency or task exceptions might occur."
return err
}

if !version.Lt(needVersion.Max) {
return NewWarn("version suggested less than %v but got %v", needVersion.Max, version)
err := NewWarn("version suggested earlier than %v but got %v", needVersion.Max, version)
err.Instruction = "It is recommended that you select a database version that meets the requirements before performing data migration. Otherwise data inconsistency or task exceptions might occur."
return err
}

result.State = StateSuccess
Expand Down Expand Up @@ -130,13 +140,13 @@ func (pc *MySQLServerIDChecker) Check(ctx context.Context) *Result {
return result
}
result.Errors = append(result.Errors, NewError("server_id not set"))
result.Instruction = "please set server_id in your database, or error may happen in master/slave switchover"
result.Instruction = "Set server_id in your database, or errors might happen in master/slave switchover"
return result
}

if serverID == 0 {
result.Errors = append(result.Errors, NewError("server_id is 0"))
result.Instruction = "please set server_id greater than 0, or error may happen in master/slave switchover"
result.Instruction = "Set server_id greater than 0, or errors might happen in master/slave switchover"
return result
}
result.State = StateSuccess
Expand Down
24 changes: 21 additions & 3 deletions dm/pkg/checker/mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
package checker

import (
tc "github.com/pingcap/check"
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/tidb/util/dbutil"
"github.com/stretchr/testify/require"
)

func (t *testCheckSuite) TestMysqlVersion(c *tc.C) {
func TestMysqlVersion(t *testing.T) {
versionChecker := &MySQLVersionChecker{}

cases := []struct {
Expand All @@ -41,6 +46,19 @@ func (t *testCheckSuite) TestMysqlVersion(c *tc.C) {
State: StateWarning,
}
versionChecker.checkVersion(cs.rawVersion, result)
c.Assert(result.State == StateSuccess, tc.Equals, cs.pass)
require.Equal(t, result.State == StateSuccess, cs.pass)
}
}

func TestVersionInstruction(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
versionChecker := &MySQLVersionChecker{
db: db,
dbinfo: &dbutil.DBConfig{},
}
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version';").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("version", "8.0.20"))
result := versionChecker.Check(context.Background())
require.Equal(t, result.State, StateWarning)
require.Equal(t, result.Instruction, "It is recommended that you select a database version that meets the requirements before performing data migration. Otherwise data inconsistency or task exceptions might occur.")
}
2 changes: 2 additions & 0 deletions dm/pkg/checker/privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (pc *SourceDumpPrivilegeChecker) Check(ctx context.Context) *Result {
err2 := verifyPrivilegesWithResult(result, grants, dumpRequiredPrivs)
if err2 != nil {
result.Errors = append(result.Errors, err2)
result.Instruction = "Please grant the required privileges to the account."
} else {
result.State = StateSuccess
}
Expand Down Expand Up @@ -156,6 +157,7 @@ func (pc *SourceReplicatePrivilegeChecker) Check(ctx context.Context) *Result {
if err2 != nil {
result.Errors = append(result.Errors, err2)
result.State = StateFailure
result.Instruction = "Grant the required privileges to the account."
}
return result
}
Expand Down
45 changes: 32 additions & 13 deletions dm/pkg/checker/table_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ func (c *TablesChecker) Name() string {
}

func (c *TablesChecker) handleOpts(ctx context.Context, r *Result) {
// extract same instruction from Errors to Result.Instruction
resultInstructions := map[string]interface{}{}
defer func() {
c.reMu.Lock()
for k := range resultInstructions {
r.Instruction += k + "; "
}
c.reMu.Unlock()
}()
for {
select {
case <-ctx.Done():
Expand All @@ -182,12 +191,16 @@ func (c *TablesChecker) handleOpts(ctx context.Context, r *Result) {
}
e := NewError(tableMsg + opt.errMessage)
e.Severity = StateWarning
e.Instruction = opt.instruction
if _, ok := resultInstructions[opt.instruction]; !ok && opt.instruction != "" {
resultInstructions[opt.instruction] = ""
}
r.Errors = append(r.Errors, e)
case StateFailure:
r.State = StateFailure
e := NewError(tableMsg + opt.errMessage)
e.Instruction = opt.instruction
if _, ok := resultInstructions[opt.instruction]; !ok && opt.instruction != "" {
resultInstructions[opt.instruction] = ""
}
r.Errors = append(r.Errors, e)
}
c.reMu.Unlock()
Expand Down Expand Up @@ -323,7 +336,7 @@ func (c *TablesChecker) checkAST(
if !hasUnique {
options = append(options, &incompatibilityOption{
state: StateWarning,
instruction: "please set primary/unique key for the table, or replication efficiency may get very slow and exactly-once replication can't be promised",
instruction: "You need to set primary/unique keys for the table. Otherwise replication efficiency might become very low and exactly-once replication cannot be guaranteed.",
errMessage: "primary/unique key does not exist",
})
}
Expand All @@ -332,7 +345,7 @@ func (c *TablesChecker) checkAST(
if len(extendedCols) > 0 {
options = append(options, &incompatibilityOption{
state: StateFailure,
instruction: "extended column feature needs the table to be created with extended columns before replication",
instruction: "You need to create a table with extended columns before replication.",
errMessage: fmt.Sprintf("upstream table %s who has extended columns %v does not exist in downstream table", upstreamStmt.Table.Name, extendedCols),
})
}
Expand All @@ -351,7 +364,7 @@ func (c *TablesChecker) checkConstraint(cst *ast.Constraint) *incompatibilityOpt
if cst.Tp == ast.ConstraintForeignKey {
return &incompatibilityOption{
state: StateWarning,
instruction: "please ref document: https://docs.pingcap.com/tidb/stable/mysql-compatibility#unsupported-features",
instruction: "TiDB does not support foreign key constraints. See the document: https://docs.pingcap.com/tidb/stable/mysql-compatibility#unsupported-features",
errMessage: fmt.Sprintf("Foreign Key %s is parsed but ignored by TiDB.", cst.Name),
}
}
Expand Down Expand Up @@ -382,7 +395,8 @@ func (c *TablesChecker) checkTableStructurePair(
!strings.EqualFold(upstreamCharset, downstreamCharset) &&
!strings.EqualFold(downstreamCharset, mysql.UTF8MB4Charset) {
options = append(options, &incompatibilityOption{
state: StateWarning,
state: StateWarning,
instruction: "Ensure that you use the same charsets for both upstream and downstream databases. Different charsets might cause data inconsistency.",
errMessage: fmt.Sprintf("charset is not same, upstream: (%s %s), downstream: (%s %s)",
upstream.Table.Name.O, upstreamCharset,
downstream.Table.Name.O, downstreamCharset),
Expand All @@ -395,7 +409,8 @@ func (c *TablesChecker) checkTableStructurePair(
if upstreamCollation != "" && downstreamCollation != "" &&
!strings.EqualFold(upstreamCollation, downstreamCollation) {
options = append(options, &incompatibilityOption{
state: StateWarning,
state: StateWarning,
instruction: "Ensure that you use the same collations for both upstream and downstream databases. Otherwise the query results from the two databases might be inconsistent.",
errMessage: fmt.Sprintf("collation is not same, upstream: (%s %s), downstream: (%s %s)",
upstream.Table.Name.O, upstreamCollation,
downstream.Table.Name.O, downstreamCollation),
Expand All @@ -417,14 +432,16 @@ func (c *TablesChecker) checkTableStructurePair(
}
for idxName, cols := range upstreamPKUK {
options = append(options, &incompatibilityOption{
state: StateWarning,
state: StateWarning,
instruction: "Ensure that you use the same index columns for both upstream and downstream databases. Otherwise the migration job might fail or data inconsistency might occur.",
errMessage: fmt.Sprintf("upstream has more PK or NOT NULL UK than downstream, index name: %s, columns: %v",
idxName, utils.SetToSlice(cols)),
})
}
for idxName, cols := range downstreamPKUK {
options = append(options, &incompatibilityOption{
state: StateWarning,
state: StateWarning,
instruction: "Ensure that you use the same index columns for both upstream and downstream databases. Otherwise the migration job might fail or data inconsistency might occur.",
errMessage: fmt.Sprintf("downstream has more PK or NOT NULL UK than upstream, table name: %s, index name: %s, columns: %v",
downstream.Table.Name.O, idxName, utils.SetToSlice(cols)),
})
Expand Down Expand Up @@ -454,14 +471,14 @@ func (c *TablesChecker) checkTableStructurePair(
if len(upstreamDupCols) > 0 {
options = append(options, &incompatibilityOption{
state: StateFailure,
instruction: "values of extended columns will be automatically filled by DM, please remove these columns or change configuration",
instruction: "DM automatically fills the values of extended columns. You need to remove these columns or change configuration.",
errMessage: fmt.Sprintf("upstream table must not contain extended column %v", upstreamDupCols),
})
}
if len(downstreamMissingCols) > 0 {
options = append(options, &incompatibilityOption{
state: StateFailure,
instruction: "please manually add extended column to downstream table",
instruction: "You need to manually add extended columns to the downstream table.",
errMessage: fmt.Sprintf("downstream table must contain extended columns %v", downstreamMissingCols),
})
}
Expand All @@ -471,7 +488,8 @@ func (c *TablesChecker) checkTableStructurePair(

if len(upstreamCols) > 0 {
options = append(options, &incompatibilityOption{
state: StateWarning,
state: StateWarning,
instruction: "Ensure that the column numbers are the same between upstream and downstream databases. Otherwise the migration job may fail.",
errMessage: fmt.Sprintf("upstream has more columns than downstream, columns: %v",
maps.Keys(upstreamCols)),
})
Expand All @@ -483,7 +501,8 @@ func (c *TablesChecker) checkTableStructurePair(
}
if len(downstreamCols) > 0 {
options = append(options, &incompatibilityOption{
state: StateWarning,
state: StateWarning,
instruction: "Ensure that the column numbers are the same between upstream and downstream databases. Otherwise the migration job may fail.",
errMessage: fmt.Sprintf("downstream has more columns than upstream that require values to insert records, table name: %s, columns: %v",
downstream.Table.Name.O, maps.Keys(downstreamCols)),
})
Expand Down
Loading

0 comments on commit 92523b2

Please sign in to comment.