diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 4c8ba231d8b..6563e211018 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -280,12 +280,9 @@ type CheckPoint interface { // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location - // CheckGlobalPoint checks whether we should save global checkpoint - // corresponding to Meta.Check - CheckGlobalPoint() bool - - // CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush - CheckLastSnapshotCreationTime() bool + // LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of + // the two times are outdated, LastFlushOutdated returns true. + LastFlushOutdated() bool // Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints Rollback(schemaTracker *schema.Tracker) @@ -878,18 +875,18 @@ func (cp *RemoteCheckPoint) String() string { return cp.globalPoint.String() } -// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint. -func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { +// LastFlushOutdated implements CheckPoint.LastFlushOutdated. +func (cp *RemoteCheckPoint) LastFlushOutdated() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second -} -// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime. -func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool { - cp.RLock() - defer cp.RUnlock() - return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second + if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second { + return false + } + if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second { + return false + } + return true } // Rollback implements CheckPoint.Rollback. diff --git a/dm/syncer/checkpoint_test.go b/dm/syncer/checkpoint_test.go index f86161027b0..844cb4d4090 100644 --- a/dm/syncer/checkpoint_test.go +++ b/dm/syncer/checkpoint_test.go @@ -21,6 +21,7 @@ import ( "path/filepath" "strings" "testing" + "time" tidbddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/parser/ast" @@ -549,3 +550,18 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) { _, err = schemaTracker.GetTableInfo(tbl2) require.Error(t, err) } + +func TestLastFlushOutdated(t *testing.T) { + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 0 + cfg.CheckpointFlushInterval = 1 + + cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1") + checkpoint := cp.(*RemoteCheckPoint) + checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second) + + require.True(t, checkpoint.LastFlushOutdated()) + require.Nil(t, checkpoint.Snapshot(true)) + // though snapshot is nil, checkpoint is not outdated + require.False(t, checkpoint.LastFlushOutdated()) +} diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 68c4c961579..11c1075fd74 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -983,9 +983,9 @@ func (s *Syncer) addJob(job *job) { } } -// checkShouldFlush checks whether syncer should flush now because last flushing is outdated. -func (s *Syncer) checkShouldFlush() error { - if !s.checkpoint.CheckGlobalPoint() || !s.checkpoint.CheckLastSnapshotCreationTime() { +// flushIfOutdated checks whether syncer should flush now because last flushing is outdated. +func (s *Syncer) flushIfOutdated() error { + if !s.checkpoint.LastFlushOutdated() { return nil } @@ -1007,7 +1007,7 @@ func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) { skipCheckFlush := false defer func() { if !skipCheckFlush && err == nil { - err = s.checkShouldFlush() + err = s.flushIfOutdated() } }() @@ -1136,7 +1136,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { // and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before // this should be handled when `s.Run` returned // -// we may need to refactor the concurrency model to make the work-flow more clearer later. +// we may need to refactor the concurrency model to make the work-flow more clear later. func (s *Syncer) flushCheckPoints() error { err := s.execError.Load() // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put @@ -2144,7 +2144,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { case *replication.GenericEvent: if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events - if s.checkpoint.CheckGlobalPoint() { + if s.checkpoint.LastFlushOutdated() { s.tctx.L().Info("meet heartbeat event and then flush jobs") err2 = s.flushJobs() } @@ -2461,7 +2461,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } }) - return s.checkShouldFlush() + return s.flushIfOutdated() } type queryEventContext struct {