diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 6563e211018..d3f0f04cf86 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -128,6 +128,8 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is b.flushedPoint.location.ResetSuffix() b.savedPoint.location = b.flushedPoint.location if b.savedPoint.ti == nil { + // TODO: if we forget to save table info for table checkpoint, this is also nil! + // And table checkpoint rollback to flushed point may also be nil! return // for global checkpoint, no need to rollback the schema. } diff --git a/dm/syncer/filter.go b/dm/syncer/filter.go index 1ec7c7b3a4e..7c734ec2936 100644 --- a/dm/syncer/filter.go +++ b/dm/syncer/filter.go @@ -62,6 +62,7 @@ func (s *Syncer) skipQueryEvent(qec *queryEventContext, ddlInfo *ddlInfo) (bool, if err != nil { s.tctx.L().Warn("track ddl failed", zap.Stringer("ddl info", ddlInfo)) } + s.saveTablePoint(table, *qec.lastLocation) s.tctx.L().Warn("track skipped ddl and return empty string", zap.String("origin sql", qec.originSQL), zap.Stringer("ddl info", ddlInfo)) ddlInfo.originDDL = "" return true, nil diff --git a/dm/syncer/filter_test.go b/dm/syncer/filter_test.go index ffaad815945..ed0e6519476 100644 --- a/dm/syncer/filter_test.go +++ b/dm/syncer/filter_test.go @@ -18,11 +18,13 @@ import ( "database/sql" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" . "github.com/pingcap/check" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/util/filter" + "github.com/pingcap/tiflow/dm/pkg/binlog" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/pkg/conn" @@ -69,6 +71,7 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { syncer.ddlDBConn = dbconn.NewDBConn(syncer.cfg, s.baseConn) syncer.schemaTracker, err = schema.NewTracker(context.Background(), syncer.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn) c.Assert(err, IsNil) + defer syncer.schemaTracker.Close() syncer.exprFilterGroup = NewExprFilterGroup(utils.NewSessionCtx(nil), nil) // test binlog filter @@ -126,11 +129,16 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { } p := parser.New() + loc := binlog.NewLocation(mysql.MySQLFlavor) + for _, ca := range cases { qec := &queryEventContext{ - eventContext: &eventContext{tctx: tcontext.Background()}, - p: p, - ddlSchema: ca.schema, + eventContext: &eventContext{ + tctx: tcontext.Background(), + lastLocation: &loc, + }, + p: p, + ddlSchema: ca.schema, } ddlInfo, err := syncer.genDDLInfo(qec, ca.sql) c.Assert(err, IsNil) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 38dd6fc08bc..5b41189f3f9 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -3314,6 +3314,9 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error { zap.Error(err)) setFirstErr(err) } + // TODO: we should save table checkpoint here, but considering when + // the first time of flushing checkpoint, user may encounter https://github.com/pingcap/tiflow/issues/5010 + // we should fix that problem first. } } return firstErr diff --git a/dm/tests/tracker_ignored_ddl/run.sh b/dm/tests/tracker_ignored_ddl/run.sh index 670eadd4337..3fd1d611e7a 100644 --- a/dm/tests/tracker_ignored_ddl/run.sh +++ b/dm/tests/tracker_ignored_ddl/run.sh @@ -27,11 +27,18 @@ function run() { check_not_contains "ignore_1" echo "increment1 check success" + # a not ignored DDL to trigger a checkpoint flush + run_sql_source1 "create table tracker_ignored_ddl.test (c int primary key);" + run_sql_file $cur/data/db.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "Error 1054: Unknown column" 1 + # force a resume, the error is still there, but we want to check https://github.com/pingcap/tiflow/issues/5272#issuecomment-1109283279 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test" + # need operate tidb run_sql_tidb "alter table $TEST_NAME.t1 add column ignore_1 int;" @@ -39,7 +46,7 @@ function run() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "resume-task test" \ "\"result\": true" 2 - sleep 1 + sleep 3 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"stage\": \"Running\"" 2