diff --git a/dm/pkg/binlog/position.go b/dm/pkg/binlog/position.go index 9573f2292fe..ad7cfb1e719 100644 --- a/dm/pkg/binlog/position.go +++ b/dm/pkg/binlog/position.go @@ -401,9 +401,18 @@ func (l *Location) ResetSuffix() { // SetGTID set new gtid for location // Use this func instead of GITSet.Set to avoid change other location. func (l *Location) SetGTID(gset gmysql.GTIDSet) error { - flavor := gmysql.MySQLFlavor - if _, ok := l.gtidSet.(*gtid.MariadbGTIDSet); ok { + var flavor string + + switch gset.(type) { + case *gmysql.MysqlGTIDSet: + flavor = gmysql.MySQLFlavor + case *gmysql.MariadbGTIDSet: flavor = gmysql.MariaDBFlavor + case nil: + l.gtidSet = nil + return nil + default: + return fmt.Errorf("unknown GTIDSet type: %T", gset) } newGTID := gtid.MinGTIDSet(flavor) diff --git a/dm/pkg/binlog/position_test.go b/dm/pkg/binlog/position_test.go index 715451718fb..a61996f94b7 100644 --- a/dm/pkg/binlog/position_test.go +++ b/dm/pkg/binlog/position_test.go @@ -763,6 +763,30 @@ func (t *testPositionSuite) TestSetGTID(c *C) { c.Assert(loc.gtidSet.String(), Equals, GTIDSetStr2) c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr) c.Assert(CompareLocation(loc, loc2, true), Equals, 1) + + loc2.gtidSet = nil + err = loc2.SetGTID(mysqlSet) + c.Assert(err, IsNil) + c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr) +} + +func (t *testPositionSuite) TestSetGTIDMariaDB(c *C) { + gSetStr := "1-1-1,2-2-2" + gSet, err := gtid.ParserGTID("mariadb", gSetStr) + c.Assert(err, IsNil) + gSetOrigin := gSet.Origin() + + loc := Location{ + Position: gmysql.Position{ + Name: "mysql-bin.00002", + Pos: 2333, + }, + gtidSet: nil, + Suffix: 0, + } + err = loc.SetGTID(gSetOrigin) + c.Assert(err, IsNil) + c.Assert(loc.gtidSet.String(), Equals, gSetStr) } func (t *testPositionSuite) TestExtractSuffix(c *C) { diff --git a/dm/syncer/binlog_locations.go b/dm/syncer/binlog_locations.go index 2d0a29c1cbe..f73630015fc 100644 --- a/dm/syncer/binlog_locations.go +++ b/dm/syncer/binlog_locations.go @@ -18,11 +18,12 @@ import ( "strings" "sync" - "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/pkg/binlog" + "github.com/pingcap/tiflow/dm/pkg/binlog/event" + "github.com/pingcap/tiflow/dm/pkg/gtid" "github.com/pingcap/tiflow/dm/pkg/log" ) @@ -33,7 +34,8 @@ type locationRecorder struct { // | | // curStartLocation curEndLocation // there may be more events between curStartLocation and curEndLocation due to the limitation of binlog or - // implementation of DM, but those events should always belong to one transaction. + // implementation of DM, but in such scenario, those events should always belong to one transaction. + // When curStartLocation is equal to curEndLocation, it means current event is not a data change. // // curStartLocation is used when // - display a meaningful location @@ -44,7 +46,7 @@ type locationRecorder struct { curStartLocation binlog.Location curEndLocation binlog.Location - // txnEndLocation is the end location of last transaction. If current event is the last event of a txn, + // txnEndLocation is the end location of last seen transaction. If current event is the last event of a txn, // txnEndLocation will be assigned from curEndLocation // it is used when // - reset binlog replication for a finer granularity @@ -55,15 +57,20 @@ type locationRecorder struct { // distinguish DML query event. inDML bool + // we assign startGTID := endGTID after COMMIT, so at COMMIT we turn on the flag. + needUpdateStartGTID bool + mu sync.Mutex // guard curEndLocation because Syncer.printStatus is reading it from another goroutine. } func (l *locationRecorder) reset(loc binlog.Location) { l.mu.Lock() defer l.mu.Unlock() - l.curStartLocation = loc - l.curEndLocation = loc - l.txnEndLocation = loc + // need to clone location to avoid the modification leaking outside + clone := loc.Clone() + l.curStartLocation = clone + l.curEndLocation = clone + l.txnEndLocation = clone } //nolint:unused @@ -102,8 +109,13 @@ func shouldUpdatePos(e *replication.BinlogEvent) bool { return true } -func (l *locationRecorder) setCurrentGTID(gset mysql.GTIDSet) { - err := l.curEndLocation.SetGTID(gset) +func (l *locationRecorder) updateCurStartGTID() { + gsetWrapped := l.curEndLocation.GetGTID() + if gsetWrapped == nil { + return + } + gset := gsetWrapped.Origin() + err := l.curStartLocation.SetGTID(gset) if err != nil { log.L().DPanic("failed to set GTID set", zap.Any("GTID set", gset), @@ -111,6 +123,40 @@ func (l *locationRecorder) setCurrentGTID(gset mysql.GTIDSet) { } } +func (l *locationRecorder) setCurEndGTID(e *replication.BinlogEvent) { + gtidStr, err := event.GetGTIDStr(e) + if err != nil { + log.L().DPanic("failed to get GTID from event", + zap.Any("event", e), + zap.Error(err)) + return + } + + gset := l.curEndLocation.GetGTID() + + if gset == nil { + gset, _ = gtid.ParserGTID("", gtidStr) + _ = l.curEndLocation.SetGTID(gset.Origin()) + return + } + + clone := gset.Clone() + err = clone.Update(gtidStr) + if err != nil { + log.L().DPanic("failed to update GTID set", + zap.String("GTID", gtidStr), + zap.Error(err)) + return + } + + err = l.curEndLocation.SetGTID(clone.Origin()) + if err != nil { + log.L().DPanic("failed to set GTID set", + zap.String("GTID", gtidStr), + zap.Error(err)) + } +} + // update maintains the member of locationRecorder as their definitions. // - curStartLocation is assigned to curEndLocation // - curEndLocation is tried to be updated in-place @@ -119,7 +165,13 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) { l.mu.Lock() defer l.mu.Unlock() - l.curStartLocation = l.curEndLocation + // GTID part is maintained separately + l.curStartLocation.Position = l.curEndLocation.Position + + if l.needUpdateStartGTID { + l.updateCurStartGTID() + l.needUpdateStartGTID = false + } if !shouldUpdatePos(e) { return @@ -138,11 +190,18 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) { l.curEndLocation.Position.Pos = e.Header.LogPos switch ev := e.Event.(type) { + case *replication.GTIDEvent: + l.setCurEndGTID(e) + case *replication.MariadbGTIDEvent: + l.setCurEndGTID(e) + if !ev.IsDDL() { + l.inDML = true + } case *replication.XIDEvent: // for transactional engines like InnoDB, COMMIT is xid event - l.setCurrentGTID(ev.GSet) l.saveTxnEndLocation() l.inDML = false + l.needUpdateStartGTID = true case *replication.QueryEvent: query := strings.TrimSpace(string(ev.Query)) switch query { @@ -159,13 +218,9 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) { if l.inDML { return } + l.needUpdateStartGTID = true - l.setCurrentGTID(ev.GSet) l.saveTxnEndLocation() - case *replication.MariadbGTIDEvent: - if !ev.IsDDL() { - l.inDML = true - } } } diff --git a/dm/syncer/binlog_locations_test.go b/dm/syncer/binlog_locations_test.go index 326c3dd7626..4df4260a7c0 100644 --- a/dm/syncer/binlog_locations_test.go +++ b/dm/syncer/binlog_locations_test.go @@ -47,7 +47,7 @@ type testLocationSuite struct { currGSet gtid.Set } -func (s *testLocationSuite) SetUpSuite(c *C) { +func (s *testLocationSuite) SetUpTest(c *C) { s.serverID = 101 s.binlogFile = "mysql-bin.000001" s.nextBinlogFile = "mysql-bin.000002" @@ -55,7 +55,7 @@ func (s *testLocationSuite) SetUpSuite(c *C) { s.flavor = mysql.MySQLFlavor s.prevGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14" s.lastGTIDStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" - s.currGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14" + s.currGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-15" var err error s.prevGSet, err = gtid.ParserGTID(s.flavor, s.prevGSetStr) @@ -179,10 +179,20 @@ func (s *testLocationSuite) checkOneTxnEvents(c *C, events []*replication.Binlog c.Assert(r.curEndLocation, DeepEquals, expected[0]) c.Assert(r.txnEndLocation, DeepEquals, expected[0]) + seenGTID := false for i, e := range events { r.update(e) c.Assert(r.curStartLocation, DeepEquals, expected[i]) - c.Assert(r.curEndLocation, DeepEquals, expected[i+1]) + + if e.Header.EventType == replication.GTID_EVENT || e.Header.EventType == replication.MARIADB_GTID_EVENT { + seenGTID = true + } + if seenGTID { + c.Assert(r.curEndLocation.Position, DeepEquals, expected[i+1].Position) + c.Assert(r.curEndLocation.GetGTID(), DeepEquals, expected[len(expected)-1].GetGTID()) + } else { + c.Assert(r.curEndLocation, DeepEquals, expected[i+1]) + } if i == len(events)-1 { switch e.Header.EventType { @@ -197,7 +207,8 @@ func (s *testLocationSuite) checkOneTxnEvents(c *C, events []*replication.Binlog } } -func (s *testLocationSuite) generateExpectLocations( +// generateExpectedLocations generates binlog position part of location from given event. +func (s *testLocationSuite) generateExpectedLocations( initLoc binlog.Location, events []*replication.BinlogEvent, ) []binlog.Location { @@ -224,7 +235,7 @@ func (s *testLocationSuite) TestDMLUpdateLocationsGTID(c *C) { // we have 8 events c.Assert(events, HasLen, 8) - expected := s.generateExpectLocations(s.loc, events) + expected := s.generateExpectedLocations(s.loc, events) c.Assert(expected[8].SetGTID(s.currGSet.Origin()), IsNil) @@ -246,7 +257,7 @@ func (s *testLocationSuite) TestDMLUpdateLocationsPos(c *C) { events[7].Event.(*replication.XIDEvent).GSet = nil events = append(events[:2], events[4:]...) - expected := s.generateExpectLocations(loc, events) + expected := s.generateExpectedLocations(loc, events) s.checkOneTxnEvents(c, events, expected) } @@ -257,7 +268,7 @@ func (s *testLocationSuite) TestDDLUpdateLocationsGTID(c *C) { // we have 5 events c.Assert(events, HasLen, 5) - expected := s.generateExpectLocations(s.loc, events) + expected := s.generateExpectedLocations(s.loc, events) c.Assert(expected[5].SetGTID(s.currGSet.Origin()), IsNil) @@ -281,7 +292,7 @@ func (s *testLocationSuite) TestDDLUpdateLocationsPos(c *C) { events = append(events[:2], events[4:]...) // now we have 3 events, test about their 4 locations - expected := s.generateExpectLocations(loc, events) + expected := s.generateExpectedLocations(loc, events) s.checkOneTxnEvents(c, events, expected) } @@ -305,7 +316,7 @@ func (s *testLocationSuite) TestDMLQueryUpdateLocationsGTID(c *C) { // we have 7 events c.Assert(events, HasLen, 7) - expected := s.generateExpectLocations(s.loc, events) + expected := s.generateExpectedLocations(s.loc, events) c.Assert(expected[7].SetGTID(s.currGSet.Origin()), IsNil) @@ -334,7 +345,7 @@ func (s *testLocationSuite) TestRotateEvent(c *C) { nextLoc := s.loc nextLoc.Position.Name = s.nextBinlogFile - expected := s.generateExpectLocations(nextLoc, events) + expected := s.generateExpectedLocations(nextLoc, events) // reset events of first binlog file expected[0].Position.Name = s.binlogFile diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 9f005e7e993..6ba6cf5e487 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1794,10 +1794,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { startTime := time.Now() e, err = s.getEvent(tctx, currentLocation) - s.tctx.L().Debug("location refactor", - zap.Stringer("current", currentLocation), - zap.Stringer("start", startLocation), - zap.Stringer("last", lastLocation)) failpoint.Inject("SafeModeExit", func(val failpoint.Value) { if intVal, ok := val.(int); ok && intVal == 1 { @@ -3684,8 +3680,6 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location) e, err := s.streamerController.GetEvent(tctx) if err == nil { s.locations.update(e) - // TODO: observe below log in integration test - s.tctx.L().Debug("location refactor", zap.Stringer("locations", s.locations)) } return e, err }