Skip to content

Commit

Permalink
checkpoint(dm): check outdated should respect snapshot create time (#…
Browse files Browse the repository at this point in the history
…5160)

close #5063
  • Loading branch information
lance6716 authored Apr 18, 2022
1 parent 8c4f693 commit 4d6f44d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 22 deletions.
27 changes: 12 additions & 15 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos and gtid
FlushedGlobalPoint() binlog.Location

// CheckGlobalPoint checks whether we should save global checkpoint
// corresponding to Meta.Check
CheckGlobalPoint() bool

// CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush
CheckLastSnapshotCreationTime() bool
// LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of
// the two times are outdated, LastFlushOutdated returns true.
LastFlushOutdated() bool

// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
Rollback(schemaTracker *schema.Tracker)
Expand Down Expand Up @@ -878,18 +875,18 @@ func (cp *RemoteCheckPoint) String() string {
return cp.globalPoint.String()
}

// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint.
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
// LastFlushOutdated implements CheckPoint.LastFlushOutdated.
func (cp *RemoteCheckPoint) LastFlushOutdated() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
}

// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime.
func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
return true
}

// Rollback implements CheckPoint.Rollback.
Expand Down
16 changes: 16 additions & 0 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

tidbddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -549,3 +550,18 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
_, err = schemaTracker.GetTableInfo(tbl2)
require.Error(t, err)
}

func TestLastFlushOutdated(t *testing.T) {
cfg := genDefaultSubTaskConfig4Test()
cfg.WorkerCount = 0
cfg.CheckpointFlushInterval = 1

cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1")
checkpoint := cp.(*RemoteCheckPoint)
checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second)

require.True(t, checkpoint.LastFlushOutdated())
require.Nil(t, checkpoint.Snapshot(true))
// though snapshot is nil, checkpoint is not outdated
require.False(t, checkpoint.LastFlushOutdated())
}
14 changes: 7 additions & 7 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,9 @@ func (s *Syncer) addJob(job *job) {
}
}

// checkShouldFlush checks whether syncer should flush now because last flushing is outdated.
func (s *Syncer) checkShouldFlush() error {
if !s.checkpoint.CheckGlobalPoint() || !s.checkpoint.CheckLastSnapshotCreationTime() {
// flushIfOutdated checks whether syncer should flush now because last flushing is outdated.
func (s *Syncer) flushIfOutdated() error {
if !s.checkpoint.LastFlushOutdated() {
return nil
}

Expand All @@ -1007,7 +1007,7 @@ func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) {
skipCheckFlush := false
defer func() {
if !skipCheckFlush && err == nil {
err = s.checkShouldFlush()
err = s.flushIfOutdated()
}
}()

Expand Down Expand Up @@ -1136,7 +1136,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) {
// and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before
// this should be handled when `s.Run` returned
//
// we may need to refactor the concurrency model to make the work-flow more clearer later.
// we may need to refactor the concurrency model to make the work-flow more clear later.
func (s *Syncer) flushCheckPoints() error {
err := s.execError.Load()
// TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put
Expand Down Expand Up @@ -2144,7 +2144,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.GenericEvent:
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// flush checkpoint even if there are no real binlog events
if s.checkpoint.CheckGlobalPoint() {
if s.checkpoint.LastFlushOutdated() {
s.tctx.L().Info("meet heartbeat event and then flush jobs")
err2 = s.flushJobs()
}
Expand Down Expand Up @@ -2461,7 +2461,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}
})

return s.checkShouldFlush()
return s.flushIfOutdated()
}

type queryEventContext struct {
Expand Down

0 comments on commit 4d6f44d

Please sign in to comment.