Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay(dm): send one heartbeat for successive skipped GTID (#5070) #5094

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,10 @@ type binlogFileParseState struct {
f *os.File

// states may change
replaceWithHeartbeat bool
formatDescEventRead bool
latestPos int64
skipGTID bool
lastSkipGTIDHeader *replication.EventHeader
formatDescEventRead bool
latestPos int64
}

// parseFileAsPossible parses single relay log file as far as possible.
Expand All @@ -468,13 +469,13 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
defer f.Close()

state := &binlogFileParseState{
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
replaceWithHeartbeat: false,
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
skipGTID: false,
}

for {
Expand Down Expand Up @@ -517,6 +518,8 @@ func (r *BinlogReader) parseFile(
}
r.latestServerID = e.Header.ServerID // record server_id

lastSkipGTID := state.skipGTID

switch ev := e.Event.(type) {
case *replication.FormatDescriptionEvent:
state.formatDescEventRead = true
Expand Down Expand Up @@ -554,7 +557,7 @@ func (r *BinlogReader) parseFile(
if err2 != nil {
return errors.Trace(err2)
}
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(gtidStr)
state.skipGTID, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -568,7 +571,7 @@ func (r *BinlogReader) parseFile(
if err2 != nil {
return errors.Trace(err2)
}
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(gtidStr)
state.skipGTID, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -585,19 +588,25 @@ func (r *BinlogReader) parseFile(
}

// align with MySQL
// if an event's gtid has been contained by given gset
// replace it with HEARTBEAT event
// for Mariadb, it will bee replaced with MARIADB_GTID_LIST_EVENT
// In DM, we replace both of them with HEARTBEAT event
if state.replaceWithHeartbeat {
// ref https://github.com/pingcap/tiflow/issues/5063#issuecomment-1082678211
// heartbeat period is implemented in LocalStreamer.GetEvent
if state.skipGTID {
switch e.Event.(type) {
// Only replace transaction event
// Other events such as FormatDescriptionEvent, RotateEvent, etc. should be the same as before
case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent, *replication.XIDEvent, *replication.TableMapEvent:
case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent,
*replication.MariadbGTIDEvent, *replication.XIDEvent, *replication.TableMapEvent:
// replace with heartbeat event
e = event.GenHeartbeatEvent(e.Header)
state.lastSkipGTIDHeader = e.Header
default:
}
return nil
} else if lastSkipGTID && state.lastSkipGTIDHeader != nil {
// skipGTID is turned off after this event
select {
case s.ch <- event.GenHeartbeatEvent(state.lastSkipGTIDHeader):
case <-ctx.Done():
}
}

select {
Expand Down
71 changes: 37 additions & 34 deletions dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func (t *testReaderSuite) createBinlogFileParseState(c *C, relayLogDir, relayLog
c.Assert(err, IsNil)

return &binlogFileParseState{
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
replaceWithHeartbeat: false,
possibleLast: possibleLast,
fullPath: fullPath,
relayLogFile: relayLogFile,
relayLogDir: relayLogDir,
f: f,
latestPos: offset,
skipGTID: false,
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(100))
c.Assert(state.formatDescEventRead, IsFalse)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
}

// write some events to binlog file
Expand Down Expand Up @@ -175,7 +175,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, IsFalse)
c.Assert(state.skipGTID, IsFalse)

// try get events back, firstParse should have fake RotateEvent
var fakeRotateEventCount int
Expand Down Expand Up @@ -213,7 +213,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
fakeRotateEventCount := 0
i := 0
for {
Expand Down Expand Up @@ -256,7 +256,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(rotateEv.Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
t.purgeStreamer(c, s)
}

Expand All @@ -275,7 +275,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(rotateEv.Header.LogPos))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)

// should only get a RotateEvent
i := 0
Expand Down Expand Up @@ -347,7 +347,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) {
c.Assert(needReParse, IsFalse)
c.Assert(state.latestPos, Equals, int64(4))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
t.purgeStreamer(c, s)

// NOTE: if we want to test the returned `needReParse` of `needSwitchSubDir`,
Expand Down Expand Up @@ -393,7 +393,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) {
c.Assert(needReParse, IsTrue)
c.Assert(state.latestPos, Equals, int64(4))
c.Assert(state.formatDescEventRead, IsTrue)
c.Assert(state.replaceWithHeartbeat, Equals, false)
c.Assert(state.skipGTID, Equals, false)
}
}

Expand Down Expand Up @@ -712,9 +712,10 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {

var allEvents []*replication.BinlogEvent
var allResults []string
var eventsNumOfFirstServer int

// generate binlog file
for _, subDir := range testCase {
for i, subDir := range testCase {
lastPos = 4
lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, subDir.gtidStr)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -747,6 +748,9 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
f.Close()
t.createMetaFile(c, uuidDir, fileEventResult.filename, lastPos, previousGset.String())
}
if i == 0 {
eventsNumOfFirstServer = len(allEvents)
}
}

startGTID, err := gtid.ParserGTID(gmysql.MySQLFlavor, "")
Expand Down Expand Up @@ -782,11 +786,11 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
r = newBinlogReaderForTest(log.L(), cfg, true, "")

excludeStrs := []string{}
// exclude first uuid
excludeServerUUID := testCase[0].serverUUID
excludeUUID := testCase[0].uuid
// exclude event except for first server
includeServerUUID := testCase[0].serverUUID
includeUUID := testCase[0].uuid
for _, s := range strings.Split(preGset.String(), ",") {
if !strings.Contains(s, excludeServerUUID) {
if !strings.Contains(s, includeServerUUID) {
excludeStrs = append(excludeStrs, s)
}
}
Expand All @@ -797,19 +801,19 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
// StartSyncByGtid exclude first uuid
s, err = r.StartSyncByGTID(excludeGset)
c.Assert(err, IsNil)
obtainBaseEvents = readNEvents(ctx, c, s, len(allEvents), true)
obtainBaseEvents = readNEvents(ctx, c, s, eventsNumOfFirstServer, true)

gset := excludeGset.Clone()
// every gtid event not from first uuid should become heartbeat event
// should not receive any event not from first server
for i, event := range obtainBaseEvents {
switch event.Header.EventType {
case replication.HEARTBEAT_EVENT:
c.Assert(event.Header.LogPos, Equals, allEvents[i].Header.LogPos)
c.FailNow()
case replication.GTID_EVENT:
// check gtid event comes from first uuid subdir
ev, _ := event.Event.(*replication.GTIDEvent)
u, _ := uuid.FromBytes(ev.SID)
c.Assert(u.String(), Equals, excludeServerUUID)
c.Assert(u.String(), Equals, includeServerUUID)
c.Assert(event.Header, DeepEquals, allEvents[i].Header)
c.Assert(gset.Update(fmt.Sprintf("%s:%d", u.String(), ev.GNO)), IsNil)
default:
Expand All @@ -820,7 +824,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
c.Assert(gset.Equal(preGset), IsTrue)

// purge first uuid subdir's first binlog file
c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil)
c.Assert(os.Remove(path.Join(baseDir, includeUUID, "mysql.000001")), IsNil)

r.Close()
r = newBinlogReaderForTest(log.L(), cfg, true, "")
Expand All @@ -834,7 +838,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue)

// purge first uuid subdir
c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil)
c.Assert(os.RemoveAll(path.Join(baseDir, includeUUID)), IsNil)

r.Close()
r = newBinlogReaderForTest(log.L(), cfg, true, "")
Expand Down Expand Up @@ -1007,31 +1011,28 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) {
var wg sync.WaitGroup
wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())

go func() {
expected := map[uint32]replication.EventType{}
for _, e := range events {
// will not receive event for skipped GTID
switch e.Event.(type) {
// keeps same
case *replication.FormatDescriptionEvent, *replication.PreviousGTIDsEvent:
expected[e.Header.LogPos] = e.Header.EventType
default:
expected[e.Header.LogPos] = replication.HEARTBEAT_EVENT
}
}
// fake rotate
expected[0] = replication.ROTATE_EVENT
lastLogPos := events[len(events)-1].Header.LogPos

ctx, cancel := context.WithCancel(context.Background())
for {
ev, err2 := s.GetEvent(ctx)
c.Assert(err2, IsNil)
c.Assert(ev.Header.EventType, Equals, expected[ev.Header.LogPos])
if ev.Header.LogPos == lastLogPos {
if err2 == context.Canceled {
break
}
c.Assert(err2, IsNil)
c.Assert(ev.Header.EventType, Equals, expected[ev.Header.LogPos])
}
cancel()
wg.Done()
}()

Expand All @@ -1046,6 +1047,8 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) {
default:
}
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}

Expand Down
2 changes: 1 addition & 1 deletion dm/tests/many_tables/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: true
Expand Down
17 changes: 17 additions & 0 deletions dm/tests/many_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ function run() {
wait_until_sync $WORK_DIR "127.0.0.1:$MASTER_PORT"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check https://github.com/pingcap/tiflow/issues/5063
check_time=20
sleep 5
while [ $check_time -gt 0 ]; do
syncer_recv_event_num=$(grep '"receive binlog event"' $WORK_DIR/worker1/log/dm-worker.log | wc -l)
if [ $syncer_recv_event_num -eq 3 ]; then
break
fi
echo "syncer_recv_event_num: $syncer_recv_event_num, will retry later"
sleep 1
((check_time--))
done

if [ $syncer_recv_event_num -ne 3 ]; then
exit 1
fi

echo "start incremental_data"
incremental_data
echo "finish incremental_data"
Expand Down