Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): fix startGTID is equal to endGTID #4386

Merged
merged 13 commits into from
Feb 13, 2022
10 changes: 8 additions & 2 deletions dm/pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,15 @@ func (l *Location) ResetSuffix() {
// SetGTID set new gtid for location
// Use this func instead of GITSet.Set to avoid change other location.
func (l *Location) SetGTID(gset gmysql.GTIDSet) error {
flavor := gmysql.MySQLFlavor
if _, ok := l.gtidSet.(*gtid.MariadbGTIDSet); ok {
var flavor string

switch gset.(type) {
case *gmysql.MysqlGTIDSet:
flavor = gmysql.MySQLFlavor
case *gmysql.MariadbGTIDSet:
flavor = gmysql.MariaDBFlavor
default:
return nil
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}

newGTID := gtid.MinGTIDSet(flavor)
Expand Down
24 changes: 24 additions & 0 deletions dm/pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,30 @@ func (t *testPositionSuite) TestSetGTID(c *C) {
c.Assert(loc.gtidSet.String(), Equals, GTIDSetStr2)
c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr)
c.Assert(CompareLocation(loc, loc2, true), Equals, 1)

loc2.gtidSet = nil
err = loc2.SetGTID(mysqlSet)
c.Assert(err, IsNil)
c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr)
}

func (t *testPositionSuite) TestSetGTIDMariaDB(c *C) {
gSetStr := "1-1-1,2-2-2"
gSet, err := gtid.ParserGTID("mariadb", gSetStr)
c.Assert(err, IsNil)
gSetOrigin := gSet.Origin()

loc := Location{
Position: gmysql.Position{
Name: "mysql-bin.00002",
Pos: 2333,
},
gtidSet: nil,
Suffix: 0,
}
err = loc.SetGTID(gSetOrigin)
c.Assert(err, IsNil)
c.Assert(loc.gtidSet.String(), Equals, gSetStr)
}

func (t *testPositionSuite) TestExtractSuffix(c *C) {
Expand Down
62 changes: 50 additions & 12 deletions dm/syncer/binlog_locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"strings"
"sync"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/binlog/event"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/log"
)

Expand All @@ -33,7 +34,8 @@ type locationRecorder struct {
// | |
// curStartLocation curEndLocation
// there may be more events between curStartLocation and curEndLocation due to the limitation of binlog or
// implementation of DM, but those events should always belong to one transaction.
// implementation of DM, but in such scenario, those events should always belong to one transaction.
// When curStartLocation is equal to curEndLocation, it means current event is not a data change.
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
//
// curStartLocation is used when
// - display a meaningful location
Expand All @@ -44,7 +46,7 @@ type locationRecorder struct {
curStartLocation binlog.Location
curEndLocation binlog.Location

// txnEndLocation is the end location of last transaction. If current event is the last event of a txn,
// txnEndLocation is the end location of last seen transaction. If current event is the last event of a txn,
// txnEndLocation will be assigned from curEndLocation
// it is used when
// - reset binlog replication for a finer granularity
Expand All @@ -55,6 +57,9 @@ type locationRecorder struct {
// distinguish DML query event.
inDML bool

// we assign startGTID := endGTID when COMMIT, but DDL doesn't have commit event, so we do this in next event.
needUpdateStartGTID bool

mu sync.Mutex // guard curEndLocation because Syncer.printStatus is reading it from another goroutine.
}

Expand Down Expand Up @@ -102,15 +107,39 @@ func shouldUpdatePos(e *replication.BinlogEvent) bool {
return true
}

func (l *locationRecorder) setCurrentGTID(gset mysql.GTIDSet) {
err := l.curEndLocation.SetGTID(gset)
func (l *locationRecorder) updateCurStartGTID() {
gset := l.curEndLocation.GetGTID().Origin()
err := l.curStartLocation.SetGTID(gset)
if err != nil {
log.L().DPanic("failed to set GTID set",
zap.Any("GTID set", gset),
zap.Error(err))
}
}

func (l *locationRecorder) setCurEndGTID(e *replication.BinlogEvent) {
gtidStr, err := event.GetGTIDStr(e)
if err != nil {
log.L().DPanic("failed to get GTID from event",
zap.Any("event", e),
zap.Error(err))
return
}

if l.curEndLocation.GetGTID() == nil {
gset, _ := gtid.ParserGTID("", gtidStr)
_ = l.curEndLocation.SetGTID(gset.Origin())
return
}

err = l.curEndLocation.GetGTID().Update(gtidStr)
if err != nil {
log.L().DPanic("failed to update GTID set",
zap.Any("GTID", gtidStr),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zap.String?

zap.Error(err))
}
}

// update maintains the member of locationRecorder as their definitions.
// - curStartLocation is assigned to curEndLocation
// - curEndLocation is tried to be updated in-place
Expand All @@ -119,7 +148,13 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) {
l.mu.Lock()
defer l.mu.Unlock()

l.curStartLocation = l.curEndLocation
// GTID part is maintained separately
l.curStartLocation.Position = l.curEndLocation.Position

if l.needUpdateStartGTID {
l.updateCurStartGTID()
l.needUpdateStartGTID = false
}

if !shouldUpdatePos(e) {
return
Expand All @@ -138,9 +173,16 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) {
l.curEndLocation.Position.Pos = e.Header.LogPos

switch ev := e.Event.(type) {
case *replication.GTIDEvent:
l.setCurEndGTID(e)
case *replication.MariadbGTIDEvent:
l.setCurEndGTID(e)
if !ev.IsDDL() {
l.inDML = true
}
case *replication.XIDEvent:
// for transactional engines like InnoDB, COMMIT is xid event
l.setCurrentGTID(ev.GSet)
l.updateCurStartGTID()
l.saveTxnEndLocation()
l.inDML = false
case *replication.QueryEvent:
Expand All @@ -156,16 +198,12 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) {
l.inDML = false
}

l.needUpdateStartGTID = true
if l.inDML {
return
}

l.setCurrentGTID(ev.GSet)
l.saveTxnEndLocation()
case *replication.MariadbGTIDEvent:
if !ev.IsDDL() {
l.inDML = true
}
}
}

Expand Down
31 changes: 21 additions & 10 deletions dm/syncer/binlog_locations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ type testLocationSuite struct {
currGSet gtid.Set
}

func (s *testLocationSuite) SetUpSuite(c *C) {
func (s *testLocationSuite) SetUpTest(c *C) {
s.serverID = 101
s.binlogFile = "mysql-bin.000001"
s.nextBinlogFile = "mysql-bin.000002"
s.binlogPos = 123
s.flavor = mysql.MySQLFlavor
s.prevGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14"
s.lastGTIDStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
s.currGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14"
s.currGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-15"

var err error
s.prevGSet, err = gtid.ParserGTID(s.flavor, s.prevGSetStr)
Expand Down Expand Up @@ -179,10 +179,20 @@ func (s *testLocationSuite) checkOneTxnEvents(c *C, events []*replication.Binlog
c.Assert(r.curEndLocation, DeepEquals, expected[0])
c.Assert(r.txnEndLocation, DeepEquals, expected[0])

seenGTID := false
for i, e := range events {
r.update(e)
c.Assert(r.curStartLocation, DeepEquals, expected[i])
c.Assert(r.curEndLocation, DeepEquals, expected[i+1])

if e.Header.EventType == replication.GTID_EVENT || e.Header.EventType == replication.MARIADB_GTID_EVENT {
seenGTID = true
}
if seenGTID {
c.Assert(r.curEndLocation.Position, DeepEquals, expected[i+1].Position)
c.Assert(r.curEndLocation.GetGTID(), DeepEquals, expected[len(expected)-1].GetGTID())
} else {
c.Assert(r.curEndLocation, DeepEquals, expected[i+1])
}

if i == len(events)-1 {
switch e.Header.EventType {
Expand All @@ -197,7 +207,8 @@ func (s *testLocationSuite) checkOneTxnEvents(c *C, events []*replication.Binlog
}
}

func (s *testLocationSuite) generateExpectLocations(
// generateExpectedLocations generates binlog position part of location from given event.
func (s *testLocationSuite) generateExpectedLocations(
initLoc binlog.Location,
events []*replication.BinlogEvent,
) []binlog.Location {
Expand All @@ -224,7 +235,7 @@ func (s *testLocationSuite) TestDMLUpdateLocationsGTID(c *C) {
// we have 8 events
c.Assert(events, HasLen, 8)

expected := s.generateExpectLocations(s.loc, events)
expected := s.generateExpectedLocations(s.loc, events)

c.Assert(expected[8].SetGTID(s.currGSet.Origin()), IsNil)

Expand All @@ -246,7 +257,7 @@ func (s *testLocationSuite) TestDMLUpdateLocationsPos(c *C) {
events[7].Event.(*replication.XIDEvent).GSet = nil
events = append(events[:2], events[4:]...)

expected := s.generateExpectLocations(loc, events)
expected := s.generateExpectedLocations(loc, events)

s.checkOneTxnEvents(c, events, expected)
}
Expand All @@ -257,7 +268,7 @@ func (s *testLocationSuite) TestDDLUpdateLocationsGTID(c *C) {
// we have 5 events
c.Assert(events, HasLen, 5)

expected := s.generateExpectLocations(s.loc, events)
expected := s.generateExpectedLocations(s.loc, events)

c.Assert(expected[5].SetGTID(s.currGSet.Origin()), IsNil)

Expand All @@ -281,7 +292,7 @@ func (s *testLocationSuite) TestDDLUpdateLocationsPos(c *C) {
events = append(events[:2], events[4:]...)

// now we have 3 events, test about their 4 locations
expected := s.generateExpectLocations(loc, events)
expected := s.generateExpectedLocations(loc, events)

s.checkOneTxnEvents(c, events, expected)
}
Expand All @@ -305,7 +316,7 @@ func (s *testLocationSuite) TestDMLQueryUpdateLocationsGTID(c *C) {
// we have 7 events
c.Assert(events, HasLen, 7)

expected := s.generateExpectLocations(s.loc, events)
expected := s.generateExpectedLocations(s.loc, events)

c.Assert(expected[7].SetGTID(s.currGSet.Origin()), IsNil)

Expand Down Expand Up @@ -334,7 +345,7 @@ func (s *testLocationSuite) TestRotateEvent(c *C) {

nextLoc := s.loc
nextLoc.Position.Name = s.nextBinlogFile
expected := s.generateExpectLocations(nextLoc, events)
expected := s.generateExpectedLocations(nextLoc, events)

// reset events of first binlog file
expected[0].Position.Name = s.binlogFile
Expand Down
6 changes: 0 additions & 6 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,10 +1770,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

startTime := time.Now()
e, err = s.getEvent(tctx, currentLocation)
s.tctx.L().Debug("location refactor",
zap.Stringer("current", currentLocation),
zap.Stringer("start", startLocation),
zap.Stringer("last", lastLocation))

failpoint.Inject("SafeModeExit", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == 1 {
Expand Down Expand Up @@ -3674,8 +3670,6 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location)
e, err := s.streamerController.GetEvent(tctx)
if err == nil {
s.locations.update(e)
// TODO: observe below log in integration test
s.tctx.L().Debug("location refactor", zap.Stringer("locations", s.locations))
}
return e, err
}
Expand Down