Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#5160
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Apr 18, 2022
1 parent b20132f commit 9eb1e97
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 16 deletions.
27 changes: 12 additions & 15 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos and gtid
FlushedGlobalPoint() binlog.Location

// CheckGlobalPoint checks whether we should save global checkpoint
// corresponding to Meta.Check
CheckGlobalPoint() bool

// CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush
CheckLastSnapshotCreationTime() bool
// LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of
// the two times are outdated, LastFlushOutdated returns true.
LastFlushOutdated() bool

// GetFlushedTableInfo gets flushed table info
// use for lazy create table in schemaTracker
Expand Down Expand Up @@ -825,18 +822,18 @@ func (cp *RemoteCheckPoint) String() string {
return cp.globalPoint.String()
}

// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint.
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
// LastFlushOutdated implements CheckPoint.LastFlushOutdated.
func (cp *RemoteCheckPoint) LastFlushOutdated() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
}

// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime.
func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
return true
}

// Rollback implements CheckPoint.Rollback.
Expand Down
76 changes: 76 additions & 0 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ import (
"os"
"path/filepath"
"strings"
<<<<<<< HEAD
=======
"testing"
"time"

tidbddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser/ast"
"github.com/stretchr/testify/require"
>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160))

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
Expand Down Expand Up @@ -495,3 +504,70 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
c.Assert(rcp.points[schemaName][tableName].flushedPoint.ti, NotNil)
c.Assert(*rcp.safeModeExitPoint, DeepEquals, binlog.InitLocation(pos2, gs))
}
<<<<<<< HEAD
=======

func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
cfg := genDefaultSubTaskConfig4Test()
cfg.WorkerCount = 0
ctx := context.Background()

db, _, err := sqlmock.New()
require.NoError(t, err)
dbConn, err := db.Conn(ctx)
require.NoError(t, err)
downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn)
require.NoError(t, err)
defer schemaTracker.Close() //nolint

tbl1 := &filter.Table{Schema: "test", Name: "tbl1"}
tbl2 := &filter.Table{Schema: "test", Name: "tbl2"}

// before load
_, err = schemaTracker.GetTableInfo(tbl1)
require.Error(t, err)
_, err = schemaTracker.GetTableInfo(tbl2)
require.Error(t, err)

cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1")
checkpoint := cp.(*RemoteCheckPoint)

parser, err := utils.GetParserFromSQLModeStr("")
require.NoError(t, err)
createNode, err := parser.ParseOneStmt("create table tbl1(id int)", "", "")
require.NoError(t, err)
ti, err := tidbddl.BuildTableInfoFromAST(createNode.(*ast.CreateTableStmt))
require.NoError(t, err)

tp1 := tablePoint{ti: ti}
tp2 := tablePoint{}
checkpoint.points[tbl1.Schema] = make(map[string]*binlogPoint)
checkpoint.points[tbl1.Schema][tbl1.Name] = &binlogPoint{flushedPoint: tp1}
checkpoint.points[tbl2.Schema][tbl2.Name] = &binlogPoint{flushedPoint: tp2}

// after load
err = checkpoint.LoadIntoSchemaTracker(ctx, schemaTracker)
require.NoError(t, err)
tableInfo, err := schemaTracker.GetTableInfo(tbl1)
require.NoError(t, err)
require.Len(t, tableInfo.Columns, 1)
_, err = schemaTracker.GetTableInfo(tbl2)
require.Error(t, err)
}

func TestLastFlushOutdated(t *testing.T) {
cfg := genDefaultSubTaskConfig4Test()
cfg.WorkerCount = 0
cfg.CheckpointFlushInterval = 1

cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1")
checkpoint := cp.(*RemoteCheckPoint)
checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second)

require.True(t, checkpoint.LastFlushOutdated())
require.Nil(t, checkpoint.Snapshot(true))
// though snapshot is nil, checkpoint is not outdated
require.False(t, checkpoint.LastFlushOutdated())
}
>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160))
70 changes: 69 additions & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,43 @@ func (s *Syncer) addJob(job *job) error {
time.Sleep(100 * time.Millisecond)
})
}
<<<<<<< HEAD
=======
}

// flushIfOutdated checks whether syncer should flush now because last flushing is outdated.
func (s *Syncer) flushIfOutdated() error {
if !s.checkpoint.LastFlushOutdated() {
return nil
}

if s.cfg.Experimental.AsyncCheckpointFlush {
jobSeq := s.getFlushSeq()
s.tctx.L().Info("Start to async flush current checkpoint to downstream based on flush interval", zap.Int64("job sequence", jobSeq))
j := newAsyncFlushJob(s.cfg.WorkerCount, jobSeq)
s.addJob(j)
s.flushCheckPointsAsync(j)
return nil
}
s.jobWg.Wait()
return s.flushCheckPoints()
}

// TODO: move to syncer/job.go
// handleJob will do many actions based on job type.
func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) {
skipCheckFlush := false
defer func() {
if !skipCheckFlush && err == nil {
err = s.flushIfOutdated()
}
}()

// 1. handle jobs that not needed to be sent to queue

s.waitTransactionLock.Lock()
defer s.waitTransactionLock.Unlock()
>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160))

failpoint.Inject("flushFirstJob", func() {
if waitJobsDone {
Expand Down Expand Up @@ -1094,7 +1131,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) {
// and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before
// this should be handled when `s.Run` returned
//
// we may need to refactor the concurrency model to make the work-flow more clearer later.
// we may need to refactor the concurrency model to make the work-flow more clear later.
func (s *Syncer) flushCheckPoints() error {
err := s.execError.Load()
// TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put
Expand Down Expand Up @@ -2016,8 +2053,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.GenericEvent:
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// flush checkpoint even if there are no real binlog events
<<<<<<< HEAD
if s.checkpoint.CheckGlobalPoint() {
tctx.L().Info("meet heartbeat event and then flush jobs")
=======
if s.checkpoint.LastFlushOutdated() {
s.tctx.L().Info("meet heartbeat event and then flush jobs")
>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160))
err2 = s.flushJobs()
}
}
Expand Down Expand Up @@ -2289,8 +2331,34 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
return err
}
}
<<<<<<< HEAD
metrics.DispatchBinlogDurationHistogram.WithLabelValues(jobType.String(), s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds())
return nil
=======
metrics.DispatchBinlogDurationHistogram.WithLabelValues(metricTp, s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds())

if len(sourceTable.Schema) != 0 {
// when in position-based replication, now events before table checkpoint is sent to queue. But in GTID-based
// replication events may share one GTID, so event whose end position is equal to table checkpoint may not be
// sent to queue. We may need event index in GTID to resolve it.
s.saveTablePoint(sourceTable, *ec.currentLocation)
}

failpoint.Inject("flushFirstJob", func() {
if waitJobsDoneForTest {
s.tctx.L().Info("trigger flushFirstJob")
waitJobsDoneForTest = false

err2 := s.flushJobs()
if err2 != nil {
s.tctx.L().DPanic("flush checkpoint failed", zap.Error(err2))
}
failpoint.Return(nil)
}
})

return s.flushIfOutdated()
>>>>>>> 4d6f44dfe (checkpoint(dm): check outdated should respect snapshot create time (#5160))
}

type queryEventContext struct {
Expand Down

0 comments on commit 9eb1e97

Please sign in to comment.