Skip to content

Commit

Permalink
relay(dm): send one heartbeat for successive skipped GTID (#5070)
Browse files Browse the repository at this point in the history
ref #5063
  • Loading branch information
lance6716 committed Apr 1, 2022
1 parent 7b7a79b commit 012ee38
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 55 deletions.
1 change: 0 additions & 1 deletion dm/pkg/conn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func FetchTimeZoneSetting(ctx context.Context, cfg *config.DBConfig) (string, er
}
defer db.Close()
dur, err := dbutil.GetTimeZoneOffset(ctx, db.DB)
println(dur.String())
if err != nil {
return "", errors.Trace(err)
}
Expand Down
47 changes: 28 additions & 19 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,10 @@ type binlogFileParseState struct {
f *os.File

// states may change
replaceWithHeartbeat bool
formatDescEventRead bool
latestPos int64
skipGTID bool
lastSkipGTIDHeader *replication.EventHeader
formatDescEventRead bool
latestPos int64
}

// parseFileAsPossible parses single relay log file as far as possible.
Expand All @@ -468,13 +469,13 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
defer f.Close()

state := &binlogFileParseState{
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
replaceWithHeartbeat: false,
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
skipGTID: false,
}

for {
Expand Down Expand Up @@ -517,6 +518,8 @@ func (r *BinlogReader) parseFile(
}
r.latestServerID = e.Header.ServerID // record server_id

lastSkipGTID := state.skipGTID

switch ev := e.Event.(type) {
case *replication.FormatDescriptionEvent:
state.formatDescEventRead = true
Expand Down Expand Up @@ -554,7 +557,7 @@ func (r *BinlogReader) parseFile(
if err2 != nil {
return errors.Trace(err2)
}
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(gtidStr)
state.skipGTID, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -568,7 +571,7 @@ func (r *BinlogReader) parseFile(
if err2 != nil {
return errors.Trace(err2)
}
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(gtidStr)
state.skipGTID, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -585,19 +588,25 @@ func (r *BinlogReader) parseFile(
}

// align with MySQL
// if an event's gtid has been contained by given gset
// replace it with HEARTBEAT event
// for Mariadb, it will bee replaced with MARIADB_GTID_LIST_EVENT
// In DM, we replace both of them with HEARTBEAT event
if state.replaceWithHeartbeat {
// ref https://github.com/pingcap/tiflow/issues/5063#issuecomment-1082678211
// heartbeat period is implemented in LocalStreamer.GetEvent
if state.skipGTID {
switch e.Event.(type) {
// Only replace transaction event
// Other events such as FormatDescriptionEvent, RotateEvent, etc. should be the same as before
case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent, *replication.XIDEvent, *replication.TableMapEvent:
case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent,
*replication.MariadbGTIDEvent, *replication.XIDEvent, *replication.TableMapEvent:
// replace with heartbeat event
e = event.GenHeartbeatEvent(e.Header)
state.lastSkipGTIDHeader = e.Header
default:
}
return nil
} else if lastSkipGTID && state.lastSkipGTIDHeader != nil {
// skipGTID is turned off after this event
select {
case s.ch <- event.GenHeartbeatEvent(state.lastSkipGTIDHeader):
case <-ctx.Done():
}
}

select {
Expand Down
71 changes: 37 additions & 34 deletions dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func (t *testReaderSuite) createBinlogFileParseState(c *C, relayLogDir, relayLog
c.Assert(err, IsNil)

return &binlogFileParseState{
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
replaceWithHeartbeat: false,
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
skipGTID: false,
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(100))
c.Assert(state.formatDescEventRead, IsFalse)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
}

// write some events to binlog file
Expand Down Expand Up @@ -175,7 +175,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, IsFalse)
c.Assert(state.skipGTID, IsFalse)

// try get events back, firstParse should have fake RotateEvent
var fakeRotateEventCount int
Expand Down Expand Up @@ -213,7 +213,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
fakeRotateEventCount := 0
i := 0
for {
Expand Down Expand Up @@ -256,7 +256,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(rotateEv.Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
t.purgeStreamer(c, s)
}

Expand All @@ -275,7 +275,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(rotateEv.Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)

// should only get a RotateEvent
i := 0
Expand Down Expand Up @@ -347,7 +347,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(4))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
t.purgeStreamer(c, s)

// NOTE: if we want to test the returned `needReParse` of `needSwitchSubDir`,
Expand Down Expand Up @@ -393,7 +393,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) {
c.Assert(needReParse, IsTrue)
c.Assert(state.latestPos, Equals, int64(4))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
}
}

Expand Down Expand Up @@ -712,9 +712,10 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {

var allEvents []*replication.BinlogEvent
var allResults []string
var eventsNumOfFirstServer int

// generate binlog file
for _, subDir := range testCase {
for i, subDir := range testCase {
lastPos = 4
lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, subDir.gtidStr)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -747,6 +748,9 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
f.Close()
t.createMetaFile(c, uuidDir, fileEventResult.filename, lastPos, previousGset.String())
}
if i == 0 {
eventsNumOfFirstServer = len(allEvents)
}
}

startGTID, err := gtid.ParserGTID(gmysql.MySQLFlavor, "")
Expand Down Expand Up @@ -782,11 +786,11 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
r = newBinlogReaderForTest(log.L(), cfg, true, "")

excludeStrs := []string{}
// exclude first uuid
excludeServerUUID := testCase[0].serverUUID
excludeUUID := testCase[0].uuid
// exclude event except for first server
includeServerUUID := testCase[0].serverUUID
includeUUID := testCase[0].uuid
for _, s := range strings.Split(preGset.String(), ",") {
if !strings.Contains(s, excludeServerUUID) {
if !strings.Contains(s, includeServerUUID) {
excludeStrs = append(excludeStrs, s)
}
}
Expand All @@ -797,19 +801,19 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
// StartSyncByGtid exclude first uuid
s, err = r.StartSyncByGTID(excludeGset)
c.Assert(err, IsNil)
obtainBaseEvents = readNEvents(ctx, c, s, len(allEvents), true)
obtainBaseEvents = readNEvents(ctx, c, s, eventsNumOfFirstServer, true)

gset := excludeGset.Clone()
// every gtid event not from first uuid should become heartbeat event
// should not receive any event not from first server
for i, event := range obtainBaseEvents {
switch event.Header.EventType {
case replication.HEARTBEAT_EVENT:
c.Assert(event.Header.LogPos, Equals, allEvents[i].Header.LogPos)
c.FailNow()
case replication.GTID_EVENT:
// check gtid event comes from first uuid subdir
ev, _ := event.Event.(*replication.GTIDEvent)
u, _ := uuid.FromBytes(ev.SID)
c.Assert(u.String(), Equals, excludeServerUUID)
c.Assert(u.String(), Equals, includeServerUUID)
c.Assert(event.Header, DeepEquals, allEvents[i].Header)
c.Assert(gset.Update(fmt.Sprintf("%s:%d", u.String(), ev.GNO)), IsNil)
default:
Expand All @@ -820,7 +824,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
c.Assert(gset.Equal(preGset), IsTrue)

// purge first uuid subdir's first binlog file
c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil)
c.Assert(os.Remove(path.Join(baseDir, includeUUID, "mysql.000001")), IsNil)

r.Close()
r = newBinlogReaderForTest(log.L(), cfg, true, "")
Expand All @@ -834,7 +838,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue)

// purge first uuid subdir
c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil)
c.Assert(os.RemoveAll(path.Join(baseDir, includeUUID)), IsNil)

r.Close()
r = newBinlogReaderForTest(log.L(), cfg, true, "")
Expand Down Expand Up @@ -1007,31 +1011,28 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) {
var wg sync.WaitGroup
wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())

go func() {
expected := map[uint32]replication.EventType{}
for _, e := range events {
// will not receive event for skipped GTID
switch e.Event.(type) {
// keeps same
case *replication.FormatDescriptionEvent, *replication.PreviousGTIDsEvent:
expected[e.Header.LogPos] = e.Header.EventType
default:
expected[e.Header.LogPos] = replication.HEARTBEAT_EVENT
}
}
// fake rotate
expected[0] = replication.ROTATE_EVENT
lastLogPos := events[len(events)-1].Header.LogPos

ctx, cancel := context.WithCancel(context.Background())
for {
ev, err2 := s.GetEvent(ctx)
c.Assert(err2, IsNil)
c.Assert(ev.Header.EventType, Equals, expected[ev.Header.LogPos])
if ev.Header.LogPos == lastLogPos {
if err2 == context.Canceled {
break
}
c.Assert(err2, IsNil)
c.Assert(ev.Header.EventType, Equals, expected[ev.Header.LogPos])
}
cancel()
wg.Done()
}()

Expand All @@ -1046,6 +1047,8 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) {
default:
}
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}

Expand Down
2 changes: 1 addition & 1 deletion dm/tests/many_tables/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: true
Expand Down
17 changes: 17 additions & 0 deletions dm/tests/many_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ function run() {
wait_until_sync $WORK_DIR "127.0.0.1:$MASTER_PORT"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check https://github.com/pingcap/tiflow/issues/5063
check_time=20
sleep 5
while [ $check_time -gt 0 ]; do
syncer_recv_event_num=$(grep '"receive binlog event"' $WORK_DIR/worker1/log/dm-worker.log | wc -l)
if [ $syncer_recv_event_num -eq 3 ]; then
break
fi
echo "syncer_recv_event_num: $syncer_recv_event_num, will retry later"
sleep 1
((check_time--))
done

if [ $syncer_recv_event_num -ne 3 ]; then
exit 1
fi

echo "start incremental_data"
incremental_data
echo "finish incremental_data"
Expand Down

0 comments on commit 012ee38

Please sign in to comment.