Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM(syncer): fix lost lost dml under special sharding ddls #5006

Merged
merged 6 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dm/syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func (sg *ShardingGroup) CheckSyncing(source string, location binlog.Location) (
if activeDDLItem == nil {
return true
}
return binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) > 0
// ddl position caculate use last_position's gtid, when last is a dml, it will be equal, but the dml should be synced.
return binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) >= 0
Copy link
Contributor

@GMHDBJD GMHDBJD Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a solution. The issue is because last_position is wrong after pingcap/dm#1926, the correct behavior is

DML 
# activeDDLItem.FirstLocation  <-- (endlocation of DML, start location of DDL, lastLocation)
DDL

after pingcap/dm#1926

DML
DDL
# activeDDLItem.FirstLocation(gtid) <-- (endlocation of DDL, lastLocation)

so now binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) greater than 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I forgot why we update last location when receive begin, cc @lance6716

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe to some problems of exit safe point. I'll take a look later

}

// UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil.
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/sharding_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (t *testShardingGroupSuite) TestSync(c *C) {

// active DDL is at pos21
beforeActiveDDL = g1.CheckSyncing(source2, pos21)
c.Assert(beforeActiveDDL, IsFalse)
c.Assert(beforeActiveDDL, IsTrue)

info = g1.UnresolvedGroupInfo()
sort.Strings(info.Synced)
Expand Down Expand Up @@ -313,7 +313,7 @@ func (t *testShardingGroupSuite) TestKeeper(c *C) {
c.Assert(k.InSyncing(sourceTbl1, targetTbl, endPos11), IsFalse)
// position at/after active DDL, in syncing
c.Assert(binlog.CompareLocation(pos12, loc, false), Equals, 0)
c.Assert(k.InSyncing(sourceTbl1, targetTbl, pos12), IsTrue)
c.Assert(k.InSyncing(sourceTbl1, targetTbl, pos12), IsFalse)
c.Assert(binlog.CompareLocation(endPos12, loc, false), Equals, 1)
c.Assert(k.InSyncing(sourceTbl1, targetTbl, endPos12), IsTrue)

Expand Down
197 changes: 197 additions & 0 deletions dm/tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,201 @@ function DM_REPLACE_ERROR_SHARDING() {
"clean_table" "optimistic"
}

# two source, 4 tables
# source1: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
# source2: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
function DM_CROSS_DDL_SHARDING_CASE() {
# 11/21 first ddl
run_sql_source1 "alter table ${db}.${tb1} add column c int;"
run_sql_source2 "alter table ${db}.${tb1} add column c int;"
run_sql_source1 "insert into ${db}.${tb1} values(1,1,1);"
run_sql_source1 "insert into ${db}.${tb1} values(11,11,11);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2,2);"
run_sql_source2 "insert into ${db}.${tb1} values(22,22,22);"

# 11/21 second ddl
run_sql_source1 "alter table ${db}.${tb1} add column d int;"
run_sql_source1 "insert into ${db}.${tb1} values(3,3,3,3);"
run_sql_source2 "alter table ${db}.${tb1} add column d int;"
run_sql_source2 "insert into ${db}.${tb1} values(6,6,6,6);"

# 12/22 first ddl
run_sql_source1 "alter table ${db}.${tb2} add column c int;"
run_sql_source2 "alter table ${db}.${tb2} add column c int;"
run_sql_source1 "insert into ${db}.${tb2} values(4,4,4);"
run_sql_source2 "insert into ${db}.${tb2} values(5,5,5);"

# 12/22 second ddl
run_sql_source1 "alter table ${db}.${tb2} add column d int;"
run_sql_source2 "alter table ${db}.${tb2} add column d int;"
run_sql_source1 "insert into ${db}.${tb2} values(7,7,7,7);"
run_sql_source2 "insert into ${db}.${tb2} values(8,8,8,8);"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 4

run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 10"
}

function DM_CROSS_DDL_SHARDING() {
run_case CROSS_DDL_SHARDING "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "pessimistic"

run_case CROSS_DDL_SHARDING "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "optimistic"
}

# replace add column unique twice
# two source, 4 tables
# source1: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
# source2: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl
function DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR_CASE() {
# 11/21 first ddl
run_sql_source1 "alter table ${db}.${tb1} add column c int unique;"
run_sql_source2 "alter table ${db}.${tb1} add column c int unique;"
run_sql_source1 "insert into ${db}.${tb1} values(1,1,1);"
run_sql_source1 "insert into ${db}.${tb1} values(11,11,11);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2,2);"
run_sql_source2 "insert into ${db}.${tb1} values(22,22,22);"

# 11/21 second ddl
run_sql_source1 "alter table ${db}.${tb1} add column d int unique;"
run_sql_source1 "insert into ${db}.${tb1} values(3,3,3,3);"
run_sql_source2 "alter table ${db}.${tb1} add column d int unique;"
run_sql_source2 "insert into ${db}.${tb1} values(6,6,6,6);"

# 12/22 first ddl
run_sql_source1 "alter table ${db}.${tb2} add column c int unique;"
run_sql_source2 "alter table ${db}.${tb2} add column c int unique;"
run_sql_source1 "insert into ${db}.${tb2} values(4,4,4);"
run_sql_source2 "insert into ${db}.${tb2} values(5,5,5);"

# 12/22 second ddl
run_sql_source1 "alter table ${db}.${tb2} add column d int unique;"
run_sql_source2 "alter table ${db}.${tb2} add column d int unique;"
run_sql_source1 "insert into ${db}.${tb2} values(7,7,7,7);"
run_sql_source2 "insert into ${db}.${tb2} values(8,8,8,8);"

# 11/21 first ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# begin to handle error
# split 11/21 first ddl into two ddls
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test alter table ${db}.${tb1} add column c int;alter table ${db}.${tb1} add unique(c)" \
"\"result\": true" 3

if [[ "$1" = "pessimistic" ]]; then
# 11 second ddl bypass, 12 first ddl detect conflict
# 22 first ddl: detect conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence from source" 2

# split 12,22 first ddl into two ddls
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01,mysql-replica-02 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \
"\"result\": true" 3

# 11/21 second ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# split 11/21 second ddl into two ddls
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 3

# 12/22 second ddl: detect conflict
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"detect inconsistent DDL sequence from source" 2

# split 11/21 second ddl into two ddls one by one
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01 alter table ${db}.${tb2} add column d int;alter table ${db}.${tb2} add unique(d);" \
"\"result\": true" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-02 alter table ${db}.${tb2} add column d int;alter table ${db}.${tb2} add unique(d);" \
"\"result\": true" 2
else
# 11 second ddl, 22 first ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# replace 11 second ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01 alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 2

# replace 21 second ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-02 alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 2

# 12 first ddl, 21 second ddl: unsupport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# replace 12 first ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-01 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \
"\"result\": true" 2

# replace 22 first ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test -s mysql-replica-02 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \
"\"result\": true" 2

# 12 first ddl, 22 second ddl: unspport error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"unsupported add column .* constraint UNIQUE KEY" 2

# replace 12/22 second ddl
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog replace test alter table ${db}.${tb2} add column d int;alter table ${db}.${tb1} add unique(d);" \
"\"result\": true" 3

fi

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 4

run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 10"
}

function DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR() {
run_case CROSS_DDL_SHARDING_WITH_REPLACE_ERROR "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "pessimistic"

run_case CROSS_DDL_SHARDING_WITH_REPLACE_ERROR "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \
run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \
"clean_table" "optimistic"
}

# replace add column unique
# one source, one table, no sharding
function DM_INJECT_DDL_ERROR_CASE() {
Expand Down Expand Up @@ -880,6 +1075,8 @@ function run() {
DM_SKIP_ERROR
DM_SKIP_ERROR_SHARDING
DM_REPLACE_ERROR
DM_CROSS_DDL_SHARDING
DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR
DM_REPLACE_ERROR_SHARDING
DM_REPLACE_ERROR_MULTIPLE
DM_EXEC_ERROR_SKIP
Expand Down