From 9eb1e9706898d0d46e10838d4a9ecafaf1c7e35d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 18 Apr 2022 19:28:03 +0800 Subject: [PATCH] This is an automated cherry-pick of #5160 Signed-off-by: ti-chi-bot --- dm/syncer/checkpoint.go | 27 ++++++------- dm/syncer/checkpoint_test.go | 76 ++++++++++++++++++++++++++++++++++++ dm/syncer/syncer.go | 70 ++++++++++++++++++++++++++++++++- 3 files changed, 157 insertions(+), 16 deletions(-) diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index b102105bbdf..bb66b39ad5e 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -272,12 +272,9 @@ type CheckPoint interface { // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location - // CheckGlobalPoint checks whether we should save global checkpoint - // corresponding to Meta.Check - CheckGlobalPoint() bool - - // CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush - CheckLastSnapshotCreationTime() bool + // LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of + // the two times are outdated, LastFlushOutdated returns true. + LastFlushOutdated() bool // GetFlushedTableInfo gets flushed table info // use for lazy create table in schemaTracker @@ -825,18 +822,18 @@ func (cp *RemoteCheckPoint) String() string { return cp.globalPoint.String() } -// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint. -func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { +// LastFlushOutdated implements CheckPoint.LastFlushOutdated. +func (cp *RemoteCheckPoint) LastFlushOutdated() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second -} -// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime. -func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool { - cp.RLock() - defer cp.RUnlock() - return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second + if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second { + return false + } + if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second { + return false + } + return true } // Rollback implements CheckPoint.Rollback. diff --git a/dm/syncer/checkpoint_test.go b/dm/syncer/checkpoint_test.go index 480b2b0b989..14f4c3d5507 100644 --- a/dm/syncer/checkpoint_test.go +++ b/dm/syncer/checkpoint_test.go @@ -20,6 +20,15 @@ import ( "os" "path/filepath" "strings" +<<<<<<< HEAD +======= + "testing" + "time" + + tidbddl "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser/ast" + "github.com/stretchr/testify/require" +>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160)) "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/pkg/binlog" @@ -495,3 +504,70 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(rcp.points[schemaName][tableName].flushedPoint.ti, NotNil) c.Assert(*rcp.safeModeExitPoint, DeepEquals, binlog.InitLocation(pos2, gs)) } +<<<<<<< HEAD +======= + +func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) { + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 0 + ctx := context.Background() + + db, _, err := sqlmock.New() + require.NoError(t, err) + dbConn, err := db.Conn(ctx) + require.NoError(t, err) + downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) + schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn) + require.NoError(t, err) + defer schemaTracker.Close() //nolint + + tbl1 := &filter.Table{Schema: "test", Name: "tbl1"} + tbl2 := &filter.Table{Schema: "test", Name: "tbl2"} + + // before load + _, err = schemaTracker.GetTableInfo(tbl1) + require.Error(t, err) + _, err = schemaTracker.GetTableInfo(tbl2) + require.Error(t, err) + + cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1") + checkpoint := cp.(*RemoteCheckPoint) + + parser, err := utils.GetParserFromSQLModeStr("") + require.NoError(t, err) + createNode, err := parser.ParseOneStmt("create table tbl1(id int)", "", "") + require.NoError(t, err) + ti, err := tidbddl.BuildTableInfoFromAST(createNode.(*ast.CreateTableStmt)) + require.NoError(t, err) + + tp1 := tablePoint{ti: ti} + tp2 := tablePoint{} + checkpoint.points[tbl1.Schema] = make(map[string]*binlogPoint) + checkpoint.points[tbl1.Schema][tbl1.Name] = &binlogPoint{flushedPoint: tp1} + checkpoint.points[tbl2.Schema][tbl2.Name] = &binlogPoint{flushedPoint: tp2} + + // after load + err = checkpoint.LoadIntoSchemaTracker(ctx, schemaTracker) + require.NoError(t, err) + tableInfo, err := schemaTracker.GetTableInfo(tbl1) + require.NoError(t, err) + require.Len(t, tableInfo.Columns, 1) + _, err = schemaTracker.GetTableInfo(tbl2) + require.Error(t, err) +} + +func TestLastFlushOutdated(t *testing.T) { + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 0 + cfg.CheckpointFlushInterval = 1 + + cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1") + checkpoint := cp.(*RemoteCheckPoint) + checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second) + + require.True(t, checkpoint.LastFlushOutdated()) + require.Nil(t, checkpoint.Snapshot(true)) + // though snapshot is nil, checkpoint is not outdated + require.False(t, checkpoint.LastFlushOutdated()) +} +>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160)) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index c42a3411b0e..0a45a614e8c 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -978,6 +978,43 @@ func (s *Syncer) addJob(job *job) error { time.Sleep(100 * time.Millisecond) }) } +<<<<<<< HEAD +======= +} + +// flushIfOutdated checks whether syncer should flush now because last flushing is outdated. +func (s *Syncer) flushIfOutdated() error { + if !s.checkpoint.LastFlushOutdated() { + return nil + } + + if s.cfg.Experimental.AsyncCheckpointFlush { + jobSeq := s.getFlushSeq() + s.tctx.L().Info("Start to async flush current checkpoint to downstream based on flush interval", zap.Int64("job sequence", jobSeq)) + j := newAsyncFlushJob(s.cfg.WorkerCount, jobSeq) + s.addJob(j) + s.flushCheckPointsAsync(j) + return nil + } + s.jobWg.Wait() + return s.flushCheckPoints() +} + +// TODO: move to syncer/job.go +// handleJob will do many actions based on job type. +func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) { + skipCheckFlush := false + defer func() { + if !skipCheckFlush && err == nil { + err = s.flushIfOutdated() + } + }() + + // 1. handle jobs that not needed to be sent to queue + + s.waitTransactionLock.Lock() + defer s.waitTransactionLock.Unlock() +>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160)) failpoint.Inject("flushFirstJob", func() { if waitJobsDone { @@ -1094,7 +1131,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { // and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before // this should be handled when `s.Run` returned // -// we may need to refactor the concurrency model to make the work-flow more clearer later. +// we may need to refactor the concurrency model to make the work-flow more clear later. func (s *Syncer) flushCheckPoints() error { err := s.execError.Load() // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put @@ -2016,8 +2053,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { case *replication.GenericEvent: if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events +<<<<<<< HEAD if s.checkpoint.CheckGlobalPoint() { tctx.L().Info("meet heartbeat event and then flush jobs") +======= + if s.checkpoint.LastFlushOutdated() { + s.tctx.L().Info("meet heartbeat event and then flush jobs") +>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160)) err2 = s.flushJobs() } } @@ -2289,8 +2331,34 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err } } +<<<<<<< HEAD metrics.DispatchBinlogDurationHistogram.WithLabelValues(jobType.String(), s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) return nil +======= + metrics.DispatchBinlogDurationHistogram.WithLabelValues(metricTp, s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) + + if len(sourceTable.Schema) != 0 { + // when in position-based replication, now events before table checkpoint is sent to queue. But in GTID-based + // replication events may share one GTID, so event whose end position is equal to table checkpoint may not be + // sent to queue. We may need event index in GTID to resolve it. + s.saveTablePoint(sourceTable, *ec.currentLocation) + } + + failpoint.Inject("flushFirstJob", func() { + if waitJobsDoneForTest { + s.tctx.L().Info("trigger flushFirstJob") + waitJobsDoneForTest = false + + err2 := s.flushJobs() + if err2 != nil { + s.tctx.L().DPanic("flush checkpoint failed", zap.Error(err2)) + } + failpoint.Return(nil) + } + }) + + return s.flushIfOutdated() +>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160)) } type queryEventContext struct {