diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 94f6fa8777a..c158bcfb4f2 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -38,6 +38,13 @@ import ( "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/util/dbutil" + "github.com/pingcap/tidb/util/filter" + regexprrouter "github.com/pingcap/tidb/util/regexpr-router" + router "github.com/pingcap/tidb/util/table-router" +>>>>>>> cd9032152 (syncer(dm): fix failed row skipped due to incorrect checkpoint flush (#5295)) clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" @@ -1011,8 +1018,7 @@ func (s *Syncer) checkShouldFlush() error { s.flushCheckPointsAsync(j) return nil } - s.jobWg.Wait() - return s.flushCheckPoints() + return s.flushJobs() } // TODO: move to syncer/job.go diff --git a/dm/tests/tracker_ignored_ddl/conf/dm-task.yaml b/dm/tests/tracker_ignored_ddl/conf/dm-task.yaml index a1dc02ebc87..4a54be354d5 100644 --- a/dm/tests/tracker_ignored_ddl/conf/dm-task.yaml +++ b/dm/tests/tracker_ignored_ddl/conf/dm-task.yaml @@ -46,3 +46,4 @@ syncers: worker-count: 16 batch: 100 enable-ansi-quotes: false # compatible with deprecated config + checkpoint-flush-interval: 1 diff --git a/dm/tests/tracker_ignored_ddl/conf/source1.yaml b/dm/tests/tracker_ignored_ddl/conf/source1_gtid.yaml similarity index 100% rename from dm/tests/tracker_ignored_ddl/conf/source1.yaml rename to dm/tests/tracker_ignored_ddl/conf/source1_gtid.yaml diff --git a/dm/tests/tracker_ignored_ddl/conf/source1_pos.yaml b/dm/tests/tracker_ignored_ddl/conf/source1_pos.yaml new file mode 100644 index 00000000000..7d67feb8cfd --- /dev/null +++ b/dm/tests/tracker_ignored_ddl/conf/source1_pos.yaml @@ -0,0 +1,11 @@ +source-id: mysql-replica-01 +flavor: '' +enable-gtid: false +enable-relay: true +relay-binlog-name: '' +relay-binlog-gtid: '' +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 diff --git a/dm/tests/tracker_ignored_ddl/run.sh b/dm/tests/tracker_ignored_ddl/run.sh index 670eadd4337..a26ca8e206f 100644 --- a/dm/tests/tracker_ignored_ddl/run.sh +++ b/dm/tests/tracker_ignored_ddl/run.sh @@ -7,6 +7,7 @@ source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME function run() { + source_cfg=$1 run_sql_file $cur/data/db.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml @@ -14,7 +15,7 @@ function run() { run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT # operate mysql config to worker - cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/$source_cfg $WORK_DIR/source1.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 # start DM task only @@ -27,6 +28,14 @@ function run() { check_not_contains "ignore_1" echo "increment1 check success" +<<<<<<< HEAD +======= + # a not ignored DDL to trigger a checkpoint flush + run_sql_source1 "create table tracker_ignored_ddl.test (c int primary key);" + + # sleep 2 second, so the next insert will trigger check point flush since checkpoint-flush-interval=1 + sleep 2 +>>>>>>> cd9032152 (syncer(dm): fix failed row skipped due to incorrect checkpoint flush (#5295)) run_sql_file $cur/data/db.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ @@ -55,12 +64,18 @@ function run() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"stage\": \"Running\"" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 2 + dmctl_operate_source stop $WORK_DIR/source1.yaml $SOURCE_ID1 } cleanup_data $TEST_NAME # also cleanup dm processes in case of last run failed cleanup_process -run +run source1_gtid.yaml +run source1_pos.yaml cleanup_process echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"