From 397bcc6d90fc73ce6368f72bfbbbde58770ead3e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Apr 2022 11:56:51 +0800 Subject: [PATCH] This is an automated cherry-pick of #5273 Signed-off-by: ti-chi-bot --- dm/syncer/checkpoint.go | 2 ++ dm/syncer/filter.go | 1 + dm/syncer/filter_test.go | 18 +++++++++++++++--- dm/syncer/syncer.go | 3 +++ dm/tests/tracker_ignored_ddl/run.sh | 9 ++++++++- 5 files changed, 29 insertions(+), 4 deletions(-) diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index bb66b39ad5e..2143b778323 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -126,6 +126,8 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is b.flushedPoint.location.ResetSuffix() b.savedPoint.location = b.flushedPoint.location if b.savedPoint.ti == nil { + // TODO: if we forget to save table info for table checkpoint, this is also nil! + // And table checkpoint rollback to flushed point may also be nil! return // for global checkpoint, no need to rollback the schema. } diff --git a/dm/syncer/filter.go b/dm/syncer/filter.go index b224a5ee99c..670b4cf54b1 100644 --- a/dm/syncer/filter.go +++ b/dm/syncer/filter.go @@ -62,6 +62,7 @@ func (s *Syncer) skipQueryEvent(qec *queryEventContext, ddlInfo *ddlInfo) (bool, if err != nil { s.tctx.L().Warn("track ddl failed", zap.Stringer("ddl info", ddlInfo)) } + s.saveTablePoint(table, *qec.lastLocation) s.tctx.L().Warn("track skipped ddl and return empty string", zap.String("origin sql", qec.originSQL), zap.Stringer("ddl info", ddlInfo)) ddlInfo.originDDL = "" return true, nil diff --git a/dm/syncer/filter_test.go b/dm/syncer/filter_test.go index 3c83d3aae25..f59d9ab33d0 100644 --- a/dm/syncer/filter_test.go +++ b/dm/syncer/filter_test.go @@ -18,11 +18,17 @@ import ( "database/sql" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" . "github.com/pingcap/check" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/parser" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/util/filter" + "github.com/pingcap/tiflow/dm/pkg/binlog" +>>>>>>> 7744c05a7 (syncer(dm): save table checkpoint after a DDL is filtered (#5273)) "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/pkg/conn" @@ -69,6 +75,7 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { syncer.ddlDBConn = &dbconn.DBConn{Cfg: syncer.cfg, BaseConn: s.baseConn} syncer.schemaTracker, err = schema.NewTracker(context.Background(), syncer.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn) c.Assert(err, IsNil) + defer syncer.schemaTracker.Close() syncer.exprFilterGroup = NewExprFilterGroup(utils.NewSessionCtx(nil), nil) // test binlog filter @@ -126,11 +133,16 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { } p := parser.New() + loc := binlog.NewLocation(mysql.MySQLFlavor) + for _, ca := range cases { qec := &queryEventContext{ - eventContext: &eventContext{tctx: tcontext.Background()}, - p: p, - ddlSchema: ca.schema, + eventContext: &eventContext{ + tctx: tcontext.Background(), + lastLocation: &loc, + }, + p: p, + ddlSchema: ca.schema, } ddlInfo, err := syncer.genDDLInfo(qec, ca.sql) c.Assert(err, IsNil) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 7d262fb20f4..edd77efec03 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -3147,6 +3147,9 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error { zap.Error(err)) setFirstErr(err) } + // TODO: we should save table checkpoint here, but considering when + // the first time of flushing checkpoint, user may encounter https://github.com/pingcap/tiflow/issues/5010 + // we should fix that problem first. } } return firstErr diff --git a/dm/tests/tracker_ignored_ddl/run.sh b/dm/tests/tracker_ignored_ddl/run.sh index 670eadd4337..3fd1d611e7a 100644 --- a/dm/tests/tracker_ignored_ddl/run.sh +++ b/dm/tests/tracker_ignored_ddl/run.sh @@ -27,11 +27,18 @@ function run() { check_not_contains "ignore_1" echo "increment1 check success" + # a not ignored DDL to trigger a checkpoint flush + run_sql_source1 "create table tracker_ignored_ddl.test (c int primary key);" + run_sql_file $cur/data/db.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "Error 1054: Unknown column" 1 + # force a resume, the error is still there, but we want to check https://github.com/pingcap/tiflow/issues/5272#issuecomment-1109283279 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test" + # need operate tidb run_sql_tidb "alter table $TEST_NAME.t1 add column ignore_1 int;" @@ -39,7 +46,7 @@ function run() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "resume-task test" \ "\"result\": true" 2 - sleep 1 + sleep 3 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"stage\": \"Running\"" 2