From 0e0c8081e079f2eb1041026b5a761573501eff6d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 6 Apr 2022 17:06:31 +0800 Subject: [PATCH] DM(syncer): fix lost lost dml under special sharding ddls (#5006) (#5026) close pingcap/tiflow#5002 --- dm/syncer/sharding_group.go | 6 +- dm/syncer/sharding_group_test.go | 4 +- dm/tests/handle_error/run.sh | 197 +++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 3 deletions(-) diff --git a/dm/syncer/sharding_group.go b/dm/syncer/sharding_group.go index 2ff48dba4ff..02fa49f39ec 100644 --- a/dm/syncer/sharding_group.go +++ b/dm/syncer/sharding_group.go @@ -239,7 +239,11 @@ func (sg *ShardingGroup) CheckSyncing(source string, location binlog.Location) ( if activeDDLItem == nil { return true } - return binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) > 0 + // this function only affects dml + // activeDDLItem.FirstLocation is ddl's startLocation + // location is dml's currentLocation + // dml should be synced when the comparation is equal + return binlog.CompareLocation(activeDDLItem.FirstLocation, location, sg.enableGTID) >= 0 } // UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil. diff --git a/dm/syncer/sharding_group_test.go b/dm/syncer/sharding_group_test.go index 10e54e96ae4..010e930e6e9 100644 --- a/dm/syncer/sharding_group_test.go +++ b/dm/syncer/sharding_group_test.go @@ -190,7 +190,7 @@ func (t *testShardingGroupSuite) TestSync(c *C) { // active DDL is at pos21 beforeActiveDDL = g1.CheckSyncing(source2, pos21) - c.Assert(beforeActiveDDL, IsFalse) + c.Assert(beforeActiveDDL, IsTrue) info = g1.UnresolvedGroupInfo() sort.Strings(info.Synced) @@ -313,7 +313,7 @@ func (t *testShardingGroupSuite) TestKeeper(c *C) { c.Assert(k.InSyncing(sourceTbl1, targetTbl, endPos11), IsFalse) // position at/after active DDL, in syncing c.Assert(binlog.CompareLocation(pos12, loc, false), Equals, 0) - c.Assert(k.InSyncing(sourceTbl1, targetTbl, pos12), IsTrue) + c.Assert(k.InSyncing(sourceTbl1, targetTbl, pos12), IsFalse) c.Assert(binlog.CompareLocation(endPos12, loc, false), Equals, 1) c.Assert(k.InSyncing(sourceTbl1, targetTbl, endPos12), IsTrue) diff --git a/dm/tests/handle_error/run.sh b/dm/tests/handle_error/run.sh index 8fac3cf1918..3081c467ae3 100644 --- a/dm/tests/handle_error/run.sh +++ b/dm/tests/handle_error/run.sh @@ -300,6 +300,201 @@ function DM_REPLACE_ERROR_SHARDING() { "clean_table" "optimistic" } +# two source, 4 tables +# source1: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl +# source2: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl +function DM_CROSS_DDL_SHARDING_CASE() { + # 11/21 first ddl + run_sql_source1 "alter table ${db}.${tb1} add column c int;" + run_sql_source2 "alter table ${db}.${tb1} add column c int;" + run_sql_source1 "insert into ${db}.${tb1} values(1,1,1);" + run_sql_source1 "insert into ${db}.${tb1} values(11,11,11);" + run_sql_source2 "insert into ${db}.${tb1} values(2,2,2);" + run_sql_source2 "insert into ${db}.${tb1} values(22,22,22);" + + # 11/21 second ddl + run_sql_source1 "alter table ${db}.${tb1} add column d int;" + run_sql_source1 "insert into ${db}.${tb1} values(3,3,3,3);" + run_sql_source2 "alter table ${db}.${tb1} add column d int;" + run_sql_source2 "insert into ${db}.${tb1} values(6,6,6,6);" + + # 12/22 first ddl + run_sql_source1 "alter table ${db}.${tb2} add column c int;" + run_sql_source2 "alter table ${db}.${tb2} add column c int;" + run_sql_source1 "insert into ${db}.${tb2} values(4,4,4);" + run_sql_source2 "insert into ${db}.${tb2} values(5,5,5);" + + # 12/22 second ddl + run_sql_source1 "alter table ${db}.${tb2} add column d int;" + run_sql_source2 "alter table ${db}.${tb2} add column d int;" + run_sql_source1 "insert into ${db}.${tb2} values(7,7,7,7);" + run_sql_source2 "insert into ${db}.${tb2} values(8,8,8,8);" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 2 + + run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 10" +} + +function DM_CROSS_DDL_SHARDING() { + run_case CROSS_DDL_SHARDING "double-source-pessimistic" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \ + "clean_table" "pessimistic" + + run_case CROSS_DDL_SHARDING "double-source-optimistic" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \ + "clean_table" "optimistic" +} + +# replace add column unique twice +# two source, 4 tables +# source1: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl +# source2: tb1 first ddl -> tb1 second ddl -> tb2 first ddl -> tb2 second ddl +function DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR_CASE() { + # 11/21 first ddl + run_sql_source1 "alter table ${db}.${tb1} add column c int unique;" + run_sql_source2 "alter table ${db}.${tb1} add column c int unique;" + run_sql_source1 "insert into ${db}.${tb1} values(1,1,1);" + run_sql_source1 "insert into ${db}.${tb1} values(11,11,11);" + run_sql_source2 "insert into ${db}.${tb1} values(2,2,2);" + run_sql_source2 "insert into ${db}.${tb1} values(22,22,22);" + + # 11/21 second ddl + run_sql_source1 "alter table ${db}.${tb1} add column d int unique;" + run_sql_source1 "insert into ${db}.${tb1} values(3,3,3,3);" + run_sql_source2 "alter table ${db}.${tb1} add column d int unique;" + run_sql_source2 "insert into ${db}.${tb1} values(6,6,6,6);" + + # 12/22 first ddl + run_sql_source1 "alter table ${db}.${tb2} add column c int unique;" + run_sql_source2 "alter table ${db}.${tb2} add column c int unique;" + run_sql_source1 "insert into ${db}.${tb2} values(4,4,4);" + run_sql_source2 "insert into ${db}.${tb2} values(5,5,5);" + + # 12/22 second ddl + run_sql_source1 "alter table ${db}.${tb2} add column d int unique;" + run_sql_source2 "alter table ${db}.${tb2} add column d int unique;" + run_sql_source1 "insert into ${db}.${tb2} values(7,7,7,7);" + run_sql_source2 "insert into ${db}.${tb2} values(8,8,8,8);" + + # 11/21 first ddl: unsupport error + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "unsupported add column .* constraint UNIQUE KEY" 2 + + # begin to handle error + # split 11/21 first ddl into two ddls + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test alter table ${db}.${tb1} add column c int;alter table ${db}.${tb1} add unique(c)" \ + "\"result\": true" 3 + + if [[ "$1" = "pessimistic" ]]; then + # 11 second ddl bypass, 12 first ddl detect conflict + # 22 first ddl: detect conflict + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence from source" 2 + + # split 12,22 first ddl into two ddls + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-01,mysql-replica-02 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \ + "\"result\": true" 3 + + # 11/21 second ddl: unsupport error + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "unsupported add column .* constraint UNIQUE KEY" 2 + + # split 11/21 second ddl into two ddls + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \ + "\"result\": true" 3 + + # 12/22 second ddl: detect conflict + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence from source" 2 + + # split 11/21 second ddl into two ddls one by one + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-01 alter table ${db}.${tb2} add column d int;alter table ${db}.${tb2} add unique(d);" \ + "\"result\": true" 2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-02 alter table ${db}.${tb2} add column d int;alter table ${db}.${tb2} add unique(d);" \ + "\"result\": true" 2 + else + # 11 second ddl, 22 first ddl: unsupport error + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "unsupported add column .* constraint UNIQUE KEY" 2 + + # replace 11 second ddl + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-01 alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \ + "\"result\": true" 2 + + # replace 21 second ddl + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-02 alter table ${db}.${tb1} add column d int;alter table ${db}.${tb1} add unique(d);" \ + "\"result\": true" 2 + + # 12 first ddl, 21 second ddl: unsupport error + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "unsupported add column .* constraint UNIQUE KEY" 2 + + # replace 12 first ddl + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-01 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \ + "\"result\": true" 2 + + # replace 22 first ddl + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test -s mysql-replica-02 alter table ${db}.${tb2} add column c int;alter table ${db}.${tb2} add unique(c);" \ + "\"result\": true" 2 + + # 12 first ddl, 22 second ddl: unspport error + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "unsupported add column .* constraint UNIQUE KEY" 2 + + # replace 12/22 second ddl + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "binlog replace test alter table ${db}.${tb2} add column d int;alter table ${db}.${tb1} add unique(d);" \ + "\"result\": true" 3 + + fi + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 2 + + run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 10" +} + +function DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR() { + run_case CROSS_DDL_SHARDING_WITH_REPLACE_ERROR "double-source-pessimistic" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \ + "clean_table" "pessimistic" + + run_case CROSS_DDL_SHARDING_WITH_REPLACE_ERROR "double-source-optimistic" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source1 \"create table ${db}.${tb2} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb1} (a int primary key, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb2} (a int primary key, b int);\"" \ + "clean_table" "optimistic" +} + # test handle_error fail on second replace ddl # two sources, two tables function DM_REPLACE_ERROR_MULTIPLE_CASE() { @@ -823,6 +1018,8 @@ function run() { DM_SKIP_ERROR DM_SKIP_ERROR_SHARDING DM_REPLACE_ERROR + DM_CROSS_DDL_SHARDING + DM_CROSS_DDL_SHARDING_WITH_REPLACE_ERROR DM_REPLACE_ERROR_SHARDING DM_REPLACE_ERROR_MULTIPLE DM_EXEC_ERROR_SKIP