From 71d4e1c065590b651533369a2608092384b05ba8 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 17 Nov 2021 11:33:55 +0800 Subject: [PATCH 1/5] revert dm#2048 for correctness --- dm/pkg/streamer/reader.go | 13 ------ dm/pkg/streamer/streamer.go | 7 ---- dm/relay/relay.go | 68 ++++++------------------------- dm/relay/relay_test.go | 16 ++++---- dm/relay/retry/reader.go | 5 --- dm/syncer/syncer.go | 43 ------------------- dm/tests/others_integration_2.txt | 1 - 7 files changed, 20 insertions(+), 133 deletions(-) diff --git a/dm/pkg/streamer/reader.go b/dm/pkg/streamer/reader.go index 4d01ea0cd8c..f078f9e0d24 100644 --- a/dm/pkg/streamer/reader.go +++ b/dm/pkg/streamer/reader.go @@ -39,10 +39,6 @@ import ( "github.com/pingcap/ticdc/dm/pkg/utils" ) -// ErrorMaybeDuplicateEvent indicates that there may be duplicate event in next binlog file -// this is mainly happened when upstream master changed when relay log not finish reading a transaction. -var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated") - // Meta represents binlog meta information in relay.meta. type Meta struct { BinLogName string `toml:"binlog-name" json:"binlog-name"` @@ -560,15 +556,6 @@ func (r *BinlogReader) parseFile( if err != nil { if possibleLast && isIgnorableParseError(err) { r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err)) - // the file is truncated, we send a mock event with `IGNORABLE_EVENT` to notify the the consumer - // TODO: should add a integration test for this - e := &replication.BinlogEvent{ - RawData: []byte(ErrorMaybeDuplicateEvent.Error()), - Header: &replication.EventHeader{ - EventType: replication.IGNORABLE_EVENT, - }, - } - s.ch <- e } else { r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err)) return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath) diff --git a/dm/pkg/streamer/streamer.go b/dm/pkg/streamer/streamer.go index a7e4b67db06..1398508300e 100644 --- a/dm/pkg/streamer/streamer.go +++ b/dm/pkg/streamer/streamer.go @@ -14,7 +14,6 @@ package streamer import ( - "bytes" "context" "time" @@ -67,12 +66,6 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, heartbeatHeader := &replication.EventHeader{} return event.GenHeartbeatEvent(heartbeatHeader), nil case c := <-s.ch: - // special check for maybe truncated relay log - if c.Header.EventType == replication.IGNORABLE_EVENT { - if bytes.Equal(c.RawData, []byte(ErrorMaybeDuplicateEvent.Error())) { - return nil, ErrorMaybeDuplicateEvent - } - } return c, nil case s.err = <-s.ech: return nil, s.err diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 3db05c27de1..e3aee31d70c 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -307,8 +307,7 @@ func (r *Relay) process(ctx context.Context) error { // handles binlog events with retry mechanism. // it only do the retry for some binlog reader error now. for { - eventIdx, err := r.handleEvents(ctx, reader2, transformer2, writer2) - checkError: + err := r.handleEvents(ctx, reader2, transformer2, writer2) if err == nil { return nil } else if !readerRetry.Check(ctx, err) { @@ -325,33 +324,6 @@ func (r *Relay) process(ctx context.Context) error { return err } r.logger.Info("retrying to read binlog") - if r.cfg.EnableGTID && eventIdx > 0 { - // check if server has switched - isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor) - // should start from the transaction beginning when switch to a new server - if err2 != nil { - r.logger.Warn("check new server failed, continue outer loop", log.ShortError(err2)) - err = err2 - goto checkError - } - if !isNew { - for i := 0; i < eventIdx; { - res, err2 := reader2.GetEvent(ctx) - if err2 != nil { - err = err2 - goto checkError - } - tResult := transformer2.Transform(res.Event) - // do not count skip event - if !tResult.Ignore { - i++ - } - } - if eventIdx > 0 { - r.logger.Info("discard duplicate event", zap.Int("count", eventIdx)) - } - } - } } } @@ -465,16 +437,15 @@ func (r *Relay) handleEvents( reader2 reader.Reader, transformer2 transformer.Transformer, writer2 writer.Writer, -) (int, error) { +) error { var ( _, lastPos = r.meta.Pos() _, lastGTID = r.meta.GTID() err error - eventIndex int ) if lastGTID == nil { if lastGTID, err = gtid.ParserGTID(r.cfg.Flavor, ""); err != nil { - return 0, err + return err } } @@ -483,20 +454,10 @@ func (r *Relay) handleEvents( // 1. read events from upstream server readTimer := time.Now() rResult, err := reader2.GetEvent(ctx) - failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) { - if intVal, ok := v.(int); ok && intVal == eventIndex { - err = errors.New("fail point triggered") - _, gtid := r.meta.GTID() - r.logger.Warn("failed to get event", zap.Int("event_index", eventIndex), - zap.Any("gtid", gtid), log.ShortError(err)) - // wait backoff retry interval - time.Sleep(1 * time.Second) - } - }) if err != nil { switch errors.Cause(err) { case context.Canceled: - return 0, nil + return nil case replication.ErrChecksumMismatch: relayLogDataCorruptionCounter.Inc() case replication.ErrSyncClosed, replication.ErrNeedSyncAgain: @@ -515,7 +476,7 @@ func (r *Relay) handleEvents( } binlogReadErrorCounter.Inc() } - return eventIndex, err + return err } binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds()) @@ -547,15 +508,13 @@ func (r *Relay) handleEvents( if _, ok := e.Event.(*replication.RotateEvent); ok && utils.IsFakeRotateEvent(e.Header) { isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor) - // should start from the transaction beginning when switch to a new server if err2 != nil { - return 0, err2 + return err2 } // upstream database switch // report an error, let outer logic handle it - // should start from the transaction beginning when switch to a new server if isNew { - return 0, terror.ErrRotateEventWithDifferentServerID.Generate() + return terror.ErrRotateEventWithDifferentServerID.Generate() } } @@ -566,7 +525,7 @@ func (r *Relay) handleEvents( // and meta file is not created when relay resumed. firstEvent = false if err2 := r.saveAndFlushMeta(lastPos, lastGTID); err2 != nil { - return 0, err2 + return err2 } } @@ -576,7 +535,7 @@ func (r *Relay) handleEvents( wResult, err := writer2.WriteEvent(e) if err != nil { relayLogWriteErrorCounter.Inc() - return eventIndex, err + return err } else if wResult.Ignore { r.logger.Info("ignore event by writer", zap.Reflect("header", e.Header), @@ -595,7 +554,7 @@ func (r *Relay) handleEvents( lastPos.Pos = tResult.LogPos err = lastGTID.Set(tResult.GTIDSet) if err != nil { - return 0, terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet) + return terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet) } if !r.cfg.EnableGTID { // if go-mysql set RawModeEnabled to true @@ -620,17 +579,14 @@ func (r *Relay) handleEvents( if needSavePos { err = r.SaveMeta(lastPos, lastGTID) if err != nil { - return 0, terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID) + return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID) } - eventIndex = 0 - } else { - eventIndex++ } if tResult.NextLogName != "" && !utils.IsFakeRotateEvent(e.Header) { // if the binlog is rotated, we need to save and flush the next binlog filename to meta lastPos.Name = tResult.NextLogName if err := r.saveAndFlushMeta(lastPos, lastGTID); err != nil { - return 0, err + return err } } } diff --git a/dm/relay/relay_test.go b/dm/relay/relay_test.go index 3cb01ec80be..1eb4c5d23e8 100644 --- a/dm/relay/relay_test.go +++ b/dm/relay/relay_test.go @@ -451,7 +451,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { replication.ErrSyncClosed, replication.ErrNeedSyncAgain, } { - _, handleErr := r.handleEvents(ctx, reader2, transformer2, writer2) + handleErr := r.handleEvents(ctx, reader2, transformer2, writer2) c.Assert(errors.Cause(handleErr), Equals, reader2.err) } @@ -461,7 +461,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // writer return error to force handleEvents return writer2.err = errors.New("writer error for testing") // return with the annotated writer error - _, err = r.handleEvents(ctx, reader2, transformer2, writer2) + err = r.handleEvents(ctx, reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, writer2.err) // after handle rotate event, we save and flush the meta immediately c.Assert(r.meta.Dirty(), Equals, false) @@ -480,7 +480,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { lm := r.meta.(*LocalMeta) backupUUID := lm.currentUUID lm.currentUUID = "not exist" - _, err = r.handleEvents(ctx, reader2, transformer2, writer2) + err = r.handleEvents(ctx, reader2, transformer2, writer2) c.Assert(os.IsNotExist(errors.Cause(err)), Equals, true) lm.currentUUID = backupUUID } @@ -492,14 +492,14 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // writer return error writer2.err = errors.New("writer error for testing") // return with the annotated writer error - _, err = r.handleEvents(context.Background(), reader2, transformer2, writer2) + err = r.handleEvents(context.Background(), reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, writer2.err) // after handle rotate event, we save and flush the meta immediately c.Assert(r.meta.Dirty(), Equals, false) // writer without error writer2.err = nil - _, err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout + err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout c.Assert(errors.Cause(err), Equals, ctx.Err()) // check written event c.Assert(writer2.latestEvent, Equals, reader2.result.Event) @@ -514,7 +514,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // write a QueryEvent with GTID sets reader2.result.Event = queryEv - _, err = r.handleEvents(ctx2, reader2, transformer2, writer2) + err = r.handleEvents(ctx2, reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, ctx.Err()) // check written event c.Assert(writer2.latestEvent, Equals, reader2.result.Event) @@ -533,7 +533,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { } ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel4() - _, err = r.handleEvents(ctx4, reader2, transformer2, writer2) + err = r.handleEvents(ctx4, reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, ctx.Err()) select { case <-ctx4.Done(): @@ -546,7 +546,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { writer2.result.Ignore = true ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel5() - _, err = r.handleEvents(ctx5, reader2, transformer2, writer2) + err = r.handleEvents(ctx5, reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, ctx.Err()) select { case <-ctx5.Done(): diff --git a/dm/relay/retry/reader.go b/dm/relay/retry/reader.go index c9e37e00c95..b155aaf8b14 100644 --- a/dm/relay/retry/reader.go +++ b/dm/relay/retry/reader.go @@ -17,8 +17,6 @@ import ( "context" "time" - "github.com/pingcap/failpoint" - "github.com/pingcap/ticdc/dm/pkg/backoff" "github.com/pingcap/ticdc/dm/pkg/retry" "github.com/pingcap/ticdc/dm/pkg/terror" @@ -58,9 +56,6 @@ func NewReaderRetry(cfg ReaderRetryConfig) (*ReaderRetry, error) { // Check checks whether should retry for the error. func (rr *ReaderRetry) Check(ctx context.Context, err error) bool { - failpoint.Inject("RelayAllowRetry", func() { - failpoint.Return(true) - }) if !retry.IsConnectionError(err) { return false } diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 51bd6767c9e..ba9397ee465 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1546,28 +1546,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return nil } - maybeSkipNRowsEvent := func(n int) error { - if s.cfg.EnableGTID && n > 0 { - for i := 0; i < n; { - e, err1 := s.getEvent(tctx, currentLocation) - if err1 != nil { - return err - } - if _, ok := e.Event.(*replication.RowsEvent); ok { - i++ - } - } - log.L().Info("discard event already consumed", zap.Int("count", n), - zap.Any("cur_loc", currentLocation)) - } - return nil - } - - // eventIndex is the rows event index in this transaction, it's used to avoiding read duplicate event in gtid mode - eventIndex := 0 - // the relay log file may be truncated(not end with an RotateEvent), in this situation, we may read some rows events - // and then read from the gtid again, so we force enter safe-mode for one more transaction to avoid failure due to - // conflict for { if s.execError.Load() != nil { return nil @@ -1609,14 +1587,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { err = errors.New("connect: connection refused") } }) - failpoint.Inject("GetEventErrorInTxn", func(val failpoint.Value) { - if intVal, ok := val.(int); ok && intVal == eventIndex { - err = errors.New("failpoint triggered") - s.tctx.L().Warn("failed to get event", zap.Int("event_index", eventIndex), - zap.Any("cur_pos", currentLocation), zap.Any("las_pos", lastLocation), - zap.Any("pos", e.Header.LogPos), log.ShortError(err)) - } - }) switch { case err == context.Canceled: tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation)) @@ -1632,13 +1602,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err1 } continue - case err == streamer.ErrorMaybeDuplicateEvent: - tctx.L().Warn("read binlog met a truncated file, need to open safe-mode until the next transaction") - err = maybeSkipNRowsEvent(eventIndex) - if err == nil { - continue - } - log.L().Warn("skip duplicate rows event failed", zap.Error(err)) } if err != nil { @@ -1655,9 +1618,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err } log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) - if err = maybeSkipNRowsEvent(eventIndex); err != nil { - return err - } continue } @@ -1809,15 +1769,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { case *replication.RotateEvent: err2 = s.handleRotateEvent(ev, ec) case *replication.RowsEvent: - eventIndex++ metrics.BinlogEventRowHistogram.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID).Observe(float64(len(ev.Rows))) err2 = s.handleRowsEvent(ev, ec) case *replication.QueryEvent: originSQL = strings.TrimSpace(string(ev.Query)) err2 = s.handleQueryEvent(ev, ec, originSQL) case *replication.XIDEvent: - // reset eventIndex and force safeMode flag here. - eventIndex = 0 if shardingReSync != nil { shardingReSync.currLocation.Position.Pos = e.Header.LogPos shardingReSync.currLocation.Suffix = currentLocation.Suffix diff --git a/dm/tests/others_integration_2.txt b/dm/tests/others_integration_2.txt index c8959ae28bb..eb7c5d37db7 100644 --- a/dm/tests/others_integration_2.txt +++ b/dm/tests/others_integration_2.txt @@ -7,5 +7,4 @@ case_sensitive sql_mode http_proxies openapi -duplicate_event tracker_ignored_ddl From db2070fa1c6dda58e72435dd42cb99ce94e7b1cc Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 17 Nov 2021 13:56:48 +0800 Subject: [PATCH 2/5] fix reset streamer location --- dm/syncer/syncer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 91acaa912eb..96437d3bf67 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1620,8 +1620,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } if s.streamerController.CanRetry(err) { - // GlobalPoint is the last finished GTID - err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint()) + // lastLocation is the last finished GTID + err = s.streamerController.ResetReplicationSyncer(tctx, lastLocation) if err != nil { return err } From d55eb268a16e133d73c9fb0ae89a423a2a073a4b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 12 Nov 2021 14:17:06 +0800 Subject: [PATCH 3/5] dm/test: swap upstream and downstream of sync_diff in some cases (#3413) --- dm/tests/_utils/test_prepare | 7 +++++ .../conf/diff_config_revert_1.toml | 31 +++++++++++++++++++ .../conf/diff_config_revert_2.toml | 31 +++++++++++++++++++ dm/tests/adjust_gtid/run.sh | 10 ++++-- dm/tests/dm_syncer/run.sh | 2 +- .../full_mode/conf/diff_config_revert_1.toml | 30 ++++++++++++++++++ .../full_mode/conf/diff_config_revert_2.toml | 30 ++++++++++++++++++ dm/tests/full_mode/run.sh | 10 ++++-- .../conf/diff_config_revert_1.toml | 30 ++++++++++++++++++ .../conf/diff_config_revert_2.toml | 30 ++++++++++++++++++ dm/tests/incremental_mode/run.sh | 10 ++++-- 11 files changed, 211 insertions(+), 10 deletions(-) create mode 100644 dm/tests/adjust_gtid/conf/diff_config_revert_1.toml create mode 100644 dm/tests/adjust_gtid/conf/diff_config_revert_2.toml create mode 100644 dm/tests/full_mode/conf/diff_config_revert_1.toml create mode 100644 dm/tests/full_mode/conf/diff_config_revert_2.toml create mode 100644 dm/tests/incremental_mode/conf/diff_config_revert_1.toml create mode 100644 dm/tests/incremental_mode/conf/diff_config_revert_2.toml diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index de838513889..04e3f298fcd 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -10,6 +10,13 @@ function cleanup_data() { run_sql "drop database if exists dm_meta" $TIDB_PORT $TIDB_PASSWORD } +function cleanup_data_upstream() { + for target_db in "$@"; do + run_sql "drop database if exists \`${target_db}\`" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "drop database if exists \`${target_db}\`" $MYSQL_PORT2 $MYSQL_PASSWORD2 + done +} + function cleanup_process() { dm_master_num=$(ps aux >temp && grep "dm-master.test" temp | wc -l && rm temp) echo "$dm_master_num dm-master alive" diff --git a/dm/tests/adjust_gtid/conf/diff_config_revert_1.toml b/dm/tests/adjust_gtid/conf/diff_config_revert_1.toml new file mode 100644 index 00000000000..455ed32451f --- /dev/null +++ b/dm/tests/adjust_gtid/conf/diff_config_revert_1.toml @@ -0,0 +1,31 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = ["adjust_gtid.t?*"] + + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/adjust_gtid/conf/diff_config_revert_2.toml b/dm/tests/adjust_gtid/conf/diff_config_revert_2.toml new file mode 100644 index 00000000000..6d38ad7fb17 --- /dev/null +++ b/dm/tests/adjust_gtid/conf/diff_config_revert_2.toml @@ -0,0 +1,31 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["tidb0"] + + target-instance = "mysql2" + + target-check-tables = ["adjust_gtid.t?*"] + + +[data-sources] +[data-sources.mysql2] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/adjust_gtid/run.sh b/dm/tests/adjust_gtid/run.sh index 751f2fca8a3..f51a28a6e9d 100755 --- a/dm/tests/adjust_gtid/run.sh +++ b/dm/tests/adjust_gtid/run.sh @@ -84,10 +84,13 @@ function run() { # avoid cannot unmarshal !!str `binlog-...` into uint32 error sed -i "s/binlog-pos-placeholder-1/4/g" $WORK_DIR/dm-task.yaml sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml - dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + # start DM task. don't check error because it will meet injected error soon + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --remove-meta" # use sync_diff_inspector to check full dump loader - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_1.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_2.toml name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') @@ -129,7 +132,8 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT # use sync_diff_inspector to check incremental dump loader - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_1.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_2.toml run_sql_both_source "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'" run_sql_both_source "SET @@global.time_zone = 'SYSTEM';" diff --git a/dm/tests/dm_syncer/run.sh b/dm/tests/dm_syncer/run.sh index 1ed98da1d8a..11994fdb95c 100755 --- a/dm/tests/dm_syncer/run.sh +++ b/dm/tests/dm_syncer/run.sh @@ -71,7 +71,7 @@ function run() { run_dm_syncer $WORK_DIR/syncer2 $WORK_DIR/dm-syncer-2.toml $meta_file --syncer-config-format syncer2 # wait for dm_syncer to init and start - sleep 5 + sleep 10 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml check_sync_diff $WORK_DIR $cur/conf/diff_config_blalist.toml check_sync_diff $WORK_DIR $cur/conf/diff_config_route_rules.toml diff --git a/dm/tests/full_mode/conf/diff_config_revert_1.toml b/dm/tests/full_mode/conf/diff_config_revert_1.toml new file mode 100644 index 00000000000..f690ce6e804 --- /dev/null +++ b/dm/tests/full_mode/conf/diff_config_revert_1.toml @@ -0,0 +1,30 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = ["full_mode.t?*"] + + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/full_mode/conf/diff_config_revert_2.toml b/dm/tests/full_mode/conf/diff_config_revert_2.toml new file mode 100644 index 00000000000..84a3ecd2d43 --- /dev/null +++ b/dm/tests/full_mode/conf/diff_config_revert_2.toml @@ -0,0 +1,30 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["tidb0"] + + target-instance = "mysql2" + + target-check-tables = ["full_mode.t?*"] + + +[data-sources] +[data-sources.mysql2] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/full_mode/run.sh b/dm/tests/full_mode/run.sh index 714a9e8a610..30a8344eadc 100755 --- a/dm/tests/full_mode/run.sh +++ b/dm/tests/full_mode/run.sh @@ -60,6 +60,7 @@ function fail_acquire_global_lock() { "you need (at least one of) the RELOAD privilege(s) for this operation" 2 cleanup_data full_mode + cleanup_data_upstream full_mode cleanup_process $* } @@ -104,7 +105,6 @@ function escape_schema() { # start DM task only dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" - check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml check_log_contain_with_retry 'clean dump files' $WORK_DIR/worker1/log/dm-worker.log check_log_contain_with_retry 'clean dump files' $WORK_DIR/worker2/log/dm-worker.log @@ -118,6 +118,7 @@ function escape_schema() { check_metric $WORKER2_PORT 'dumpling_dump_finished_tables' 3 0 3 cleanup_data full/mode + cleanup_data_upstream full/mode cleanup_process $* } @@ -130,7 +131,8 @@ function empty_data() { init_cluster dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_1.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_2.toml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ @@ -142,6 +144,7 @@ function empty_data() { check_log_contains $WORK_DIR/worker2/log/dm-worker.log "progress=\"100.00 %\"" cleanup_data full_mode + cleanup_data_upstream full_mode cleanup_process $* } @@ -190,7 +193,8 @@ function run() { dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" # use sync_diff_inspector to check full dump loader - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_1.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_2.toml echo "check dump files have been cleaned" ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files" diff --git a/dm/tests/incremental_mode/conf/diff_config_revert_1.toml b/dm/tests/incremental_mode/conf/diff_config_revert_1.toml new file mode 100644 index 00000000000..45328c12217 --- /dev/null +++ b/dm/tests/incremental_mode/conf/diff_config_revert_1.toml @@ -0,0 +1,30 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = ["incremental_mode.t?*"] + + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/incremental_mode/conf/diff_config_revert_2.toml b/dm/tests/incremental_mode/conf/diff_config_revert_2.toml new file mode 100644 index 00000000000..16ed402a93f --- /dev/null +++ b/dm/tests/incremental_mode/conf/diff_config_revert_2.toml @@ -0,0 +1,30 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["tidb0"] + + target-instance = "mysql2" + + target-check-tables = ["incremental_mode.t?*"] + + +[data-sources] +[data-sources.mysql2] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/incremental_mode/run.sh b/dm/tests/incremental_mode/run.sh index 40774b7a469..b8b7dbbf171 100755 --- a/dm/tests/incremental_mode/run.sh +++ b/dm/tests/incremental_mode/run.sh @@ -76,7 +76,8 @@ function run() { sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_1.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_2.toml dmctl_stop_task $TASK_NAME @@ -218,7 +219,9 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT sleep 3 - dmctl_start_task $WORK_DIR/dm-task.yaml + # start DM task. don't check error because it will meet injected error soon + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml" # the task should paused by `FlushCheckpointStage` failpont before flush old checkpoint. # `db2.increment.sql` has no DDL, so we check count of content as `1`. @@ -263,7 +266,8 @@ function run() { "resume-task test" \ "\"result\": true" 3 - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_1.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config_revert_2.toml # test rotate binlog, after rotate and ddl, master binlog should be equal to sync binlog run_sql "flush logs;" $MYSQL_PORT1 $MYSQL_PASSWORD1 From 0d4e976afb73a34dc9c2934835498d508c5db8f9 Mon Sep 17 00:00:00 2001 From: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Date: Wed, 17 Nov 2021 08:27:46 +0800 Subject: [PATCH 4/5] dm/syncer : fix unstable test in shardddl1 (#3471) --- dm/dm/master/shardddl/optimist.go | 2 +- dm/dm/master/shardddl/pessimist.go | 2 +- dm/tests/shardddl1/run.sh | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/dm/master/shardddl/optimist.go b/dm/dm/master/shardddl/optimist.go index 2c1ca3c568b..eda57ea0332 100644 --- a/dm/dm/master/shardddl/optimist.go +++ b/dm/dm/master/shardddl/optimist.go @@ -698,7 +698,7 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk func (o *Optimist) removeLock(lock *optimism.Lock) (bool, error) { failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) { t := val.(int) - log.L().Info("wait new ddl info putted into etcd", + log.L().Info("wait new ddl info putted into etcd in optimistic", zap.String("failpoint", "SleepWhenRemoveLock"), zap.Int("max wait second", t)) diff --git a/dm/dm/master/shardddl/pessimist.go b/dm/dm/master/shardddl/pessimist.go index e20d228097c..e1f44df1e08 100644 --- a/dm/dm/master/shardddl/pessimist.go +++ b/dm/dm/master/shardddl/pessimist.go @@ -645,7 +645,7 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error { failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) { t := val.(int) - log.L().Info("wait new ddl info putted into etcd", + log.L().Info("wait new ddl info putted into etcd in pessimistic", zap.String("failpoint", "SleepWhenRemoveLock"), zap.Int("max wait second", t)) diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index 82483679b4b..4b63801670d 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -239,7 +239,7 @@ function DM_RemoveLock_CASE() { run_sql_source1 "alter table ${shardddl1}.${tb1} add column c double;" run_sql_source2 "alter table ${shardddl1}.${tb1} add column c double;" run_sql_source2 "alter table ${shardddl1}.${tb2} add column c double;" - check_log_contain_with_retry "wait new ddl info putted into etcd" $WORK_DIR/master/log/dm-master.log + check_log_contain_with_retry "wait new ddl info putted into etcd in ${1}" $WORK_DIR/master/log/dm-master.log check_metric_not_contains $MASTER_PORT "dm_master_shard_ddl_error" 3 run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" From 285c0e26c48385d5a7a601e47c449b4838348fc3 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 15 Nov 2021 14:25:06 +0800 Subject: [PATCH 5/5] CI: update config for sync diff (#3435) --- dm/tests/dmctl_basic/conf/diff_config.toml | 3 +-- dm/tests/online_ddl/conf/diff_config.toml | 10 ++-------- dm/tests/print_status/conf/diff_config.toml | 3 +-- dm/tests/safe_mode/conf/diff_config.toml | 3 +-- dm/tests/sequence_safe_mode/conf/diff_config.toml | 3 +-- dm/tests/sequence_sharding/conf/diff_config.toml | 5 ++--- .../sequence_sharding_optimistic/conf/diff_config.toml | 5 ++--- .../sequence_sharding_removemeta/conf/diff_config.toml | 5 ++--- dm/tests/sharding/conf/diff_config.toml | 5 ++--- 9 files changed, 14 insertions(+), 28 deletions(-) diff --git a/dm/tests/dmctl_basic/conf/diff_config.toml b/dm/tests/dmctl_basic/conf/diff_config.toml index 55d568baf4b..7f3079908c9 100644 --- a/dm/tests/dmctl_basic/conf/diff_config.toml +++ b/dm/tests/dmctl_basic/conf/diff_config.toml @@ -19,8 +19,7 @@ check-struct-only = false [table-configs] [table-configs.config1] -schema = "dmctl" -table = "t_target" +target-tables = ["dmctl.t_target"] ignore-columns = ["id"] [routes.rule1] diff --git a/dm/tests/online_ddl/conf/diff_config.toml b/dm/tests/online_ddl/conf/diff_config.toml index f576c586860..bce21d71a01 100644 --- a/dm/tests/online_ddl/conf/diff_config.toml +++ b/dm/tests/online_ddl/conf/diff_config.toml @@ -15,7 +15,7 @@ check-struct-only = false target-check-tables = ["online_ddl.gho_t_target", "online_ddl.pt_t_target"] - target-configs= ["config1", "config2"] + target-configs= ["config1"] [routes.rule1] schema-pattern = "online_ddl" @@ -31,13 +31,7 @@ target-table = "pt_t_target" [table-configs] [table-configs.config1] -schema = "online_ddl" -table = "gho_t_target" -ignore-columns = ["id"] - -[table-configs.config2] -schema = "online_ddl" -table = "pt_t_target" +target-tables = ["online_ddl.*"] ignore-columns = ["id"] [data-sources] diff --git a/dm/tests/print_status/conf/diff_config.toml b/dm/tests/print_status/conf/diff_config.toml index 30f2a1dda89..8b147560c6d 100644 --- a/dm/tests/print_status/conf/diff_config.toml +++ b/dm/tests/print_status/conf/diff_config.toml @@ -19,8 +19,7 @@ check-struct-only = false [table-configs] [table-configs.config1] -schema = "print_status" -table = "t_1" +target-tables = ["print_status.t_1"] # currently ignore check float and timestamp field ignore-columns = ["c5", "c9", "c11", "c15"] diff --git a/dm/tests/safe_mode/conf/diff_config.toml b/dm/tests/safe_mode/conf/diff_config.toml index 1f25e7fdbab..91af4e81d93 100644 --- a/dm/tests/safe_mode/conf/diff_config.toml +++ b/dm/tests/safe_mode/conf/diff_config.toml @@ -19,8 +19,7 @@ check-struct-only = false [table-configs] [table-configs.config1] -schema = "safe_mode_target" -table = "t_target" +target-tables = ["safe_mode_target.t_target"] ignore-columns = ["id"] [routes.rule1] diff --git a/dm/tests/sequence_safe_mode/conf/diff_config.toml b/dm/tests/sequence_safe_mode/conf/diff_config.toml index 46cd1690c47..5cdbd0c2ef9 100644 --- a/dm/tests/sequence_safe_mode/conf/diff_config.toml +++ b/dm/tests/sequence_safe_mode/conf/diff_config.toml @@ -25,8 +25,7 @@ target-table = "t_target" [table-configs] [table-configs.config1] -schema = "sequence_safe_mode_target" -table = "t_target" +target-tables = ["sequence_safe_mode_target.t_target"] ignore-columns = ["id"] [data-sources] diff --git a/dm/tests/sequence_sharding/conf/diff_config.toml b/dm/tests/sequence_sharding/conf/diff_config.toml index 0fbf2736182..0e8cc34b7b5 100644 --- a/dm/tests/sequence_sharding/conf/diff_config.toml +++ b/dm/tests/sequence_sharding/conf/diff_config.toml @@ -25,10 +25,9 @@ target-table = "t_target" [table-configs] [table-configs.config1] -schema = "sharding_target2" -table = "t_target" +target-tables = ["sharding_target2.t_target"] ignore-columns = ["id"] -index-fields = "uid" +index-fields = ["uid"] # range-placeholder [data-sources] diff --git a/dm/tests/sequence_sharding_optimistic/conf/diff_config.toml b/dm/tests/sequence_sharding_optimistic/conf/diff_config.toml index 937e7694b30..d0c6a4bccbf 100644 --- a/dm/tests/sequence_sharding_optimistic/conf/diff_config.toml +++ b/dm/tests/sequence_sharding_optimistic/conf/diff_config.toml @@ -25,9 +25,8 @@ target-table = "t_target" [table-configs] [table-configs.config1] -schema = "sharding_target_opt" -table = "t_target" -index-fields = "id" +target-tables = ["sharding_target_opt.t_target"] +index-fields = ["id"] # range-placeholder [data-sources] diff --git a/dm/tests/sequence_sharding_removemeta/conf/diff_config.toml b/dm/tests/sequence_sharding_removemeta/conf/diff_config.toml index 7b5453ba06c..10f10147c35 100644 --- a/dm/tests/sequence_sharding_removemeta/conf/diff_config.toml +++ b/dm/tests/sequence_sharding_removemeta/conf/diff_config.toml @@ -26,10 +26,9 @@ target-table = "t_target" [table-configs] [table-configs.config1] -schema = "sharding_target3" -table = "t_target" +target-tables = ["sharding_target3.t_target"] ignore-columns = ["id"] -index-fields = "uid" +index-fields = ["uid"] # range-placeholder [data-sources] diff --git a/dm/tests/sharding/conf/diff_config.toml b/dm/tests/sharding/conf/diff_config.toml index a3066bfabb8..f69fc0306ff 100644 --- a/dm/tests/sharding/conf/diff_config.toml +++ b/dm/tests/sharding/conf/diff_config.toml @@ -32,11 +32,10 @@ target-table = "t_target" [table-configs] [table-configs.config1] -schema = "db_target" -table = "t_target" +target-tables = ["db_target.t_target"] # currently sync_diff does not support json fields well ignore-columns = ["id", "info_json"] -index-fields = "uid" +index-fields = ["uid"] # range-placeholder