Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checkpoint(dm): check outdated should respect snapshot create time #5160

Merged
merged 4 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos and gtid
FlushedGlobalPoint() binlog.Location

// CheckGlobalPoint checks whether we should save global checkpoint
// corresponding to Meta.Check
CheckGlobalPoint() bool

// CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush
CheckLastSnapshotCreationTime() bool
// LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of
// the two times are outdated, LastFlushOutdated returns true.
LastFlushOutdated() bool

// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
Rollback(schemaTracker *schema.Tracker)
Expand Down Expand Up @@ -878,18 +875,18 @@ func (cp *RemoteCheckPoint) String() string {
return cp.globalPoint.String()
}

// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint.
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
// LastFlushOutdated implements CheckPoint.LastFlushOutdated.
func (cp *RemoteCheckPoint) LastFlushOutdated() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
}

// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime.
func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
return true
}

// Rollback implements CheckPoint.Rollback.
Expand Down
16 changes: 16 additions & 0 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

tidbddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -549,3 +550,18 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
_, err = schemaTracker.GetTableInfo(tbl2)
require.Error(t, err)
}

func TestLastFlushOutdated(t *testing.T) {
cfg := genDefaultSubTaskConfig4Test()
cfg.WorkerCount = 0
cfg.CheckpointFlushInterval = 1

cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1")
checkpoint := cp.(*RemoteCheckPoint)
checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second)

require.True(t, checkpoint.LastFlushOutdated())
require.Nil(t, checkpoint.Snapshot(true))
// though snapshot is nil, checkpoint is not outdated
require.False(t, checkpoint.LastFlushOutdated())
}
14 changes: 7 additions & 7 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,9 +987,9 @@ func (s *Syncer) addJob(job *job) {
}
}

// checkShouldFlush checks whether syncer should flush now because last flushing is outdated.
func (s *Syncer) checkShouldFlush() error {
if !s.checkpoint.CheckGlobalPoint() || !s.checkpoint.CheckLastSnapshotCreationTime() {
// flushIfOutdated checks whether syncer should flush now because last flushing is outdated.
func (s *Syncer) flushIfOutdated() error {
if !s.checkpoint.LastFlushOutdated() {
return nil
}

Expand All @@ -1011,7 +1011,7 @@ func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) {
skipCheckFlush := false
defer func() {
if !skipCheckFlush && err == nil {
err = s.checkShouldFlush()
err = s.flushIfOutdated()
}
}()

Expand Down Expand Up @@ -1140,7 +1140,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) {
// and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before
// this should be handled when `s.Run` returned
//
// we may need to refactor the concurrency model to make the work-flow more clearer later.
// we may need to refactor the concurrency model to make the work-flow more clear later.
func (s *Syncer) flushCheckPoints() error {
err := s.execError.Load()
// TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put
Expand Down Expand Up @@ -2162,7 +2162,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.GenericEvent:
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// flush checkpoint even if there are no real binlog events
if s.checkpoint.CheckGlobalPoint() {
if s.checkpoint.LastFlushOutdated() {
s.tctx.L().Info("meet heartbeat event and then flush jobs")
err2 = s.flushJobs()
}
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}
})

return s.checkShouldFlush()
return s.flushIfOutdated()
}

type queryEventContext struct {
Expand Down