Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay(dm): send one heartbeat for successive skipped GTID (#5070) #5092

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions dm/pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer,
func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64, relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) (needSwitch bool, latestPos int64, nextUUID string, nextBinlogName string, err error) {
var needReParse bool
latestPos = offset
replaceWithHeartbeat := false
skipGTID := false
var lastSkipGTIDHeader *replication.EventHeader
r.tctx.L().Debug("start to parse relay log file", zap.String("file", relayLogFile), zap.Int64("position", latestPos), zap.String("directory", relayLogDir))

for {
Expand All @@ -409,7 +410,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
return false, 0, "", "", ctx.Err()
default:
}
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat)
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, skipGTID, lastSkipGTIDHeader, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, skipGTID, lastSkipGTIDHeader)
if err != nil {
return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir)
}
Expand All @@ -433,11 +434,12 @@ func (r *BinlogReader) parseFile(
firstParse bool,
currentUUID string,
possibleLast bool,
replaceWithHeartbeat bool,
) (needSwitch, needReParse bool, latestPos int64, nextUUID, nextBinlogName string, currentReplaceFlag bool, err error) {
skipGTID bool,
lastSkipGTIDHeader *replication.EventHeader,
) (needSwitch, needReParse bool, latestPos int64, nextUUID, nextBinlogName string, skipGTIDFlag bool, header *replication.EventHeader, err error) {
_, suffixInt, err := utils.ParseSuffixForUUID(currentUUID)
if err != nil {
return false, false, 0, "", "", false, err
return false, false, 0, "", "", false, nil, err
}

uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name
Expand All @@ -447,6 +449,8 @@ func (r *BinlogReader) parseFile(
r.tctx.L().Debug("read event", zap.Reflect("header", e.Header))
r.latestServerID = e.Header.ServerID // record server_id

lastSkipGTID := skipGTID

switch ev := e.Event.(type) {
case *replication.FormatDescriptionEvent:
// go-mysql will send a duplicate FormatDescriptionEvent event when offset > 4, ignore it
Expand Down Expand Up @@ -483,7 +487,7 @@ func (r *BinlogReader) parseFile(
break
}
u, _ := uuid.FromBytes(ev.SID)
replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO))
skipGTID, err = r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -494,7 +498,7 @@ func (r *BinlogReader) parseFile(
break
}
GTID := ev.GTID
replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
skipGTID, err = r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -511,19 +515,29 @@ func (r *BinlogReader) parseFile(
}

// align with MySQL
// ref https://github.com/pingcap/tiflow/issues/5063#issuecomment-1082678211
// heartbeat period is implemented in LocalStreamer.GetEvent
// if an event's gtid has been contained by given gset
// replace it with HEARTBEAT event
// for Mariadb, it will bee replaced with MARIADB_GTID_LIST_EVENT
// In DM, we replace both of them with HEARTBEAT event
if replaceWithHeartbeat {
if skipGTID {
switch e.Event.(type) {
// Only replace transaction event
// Other events such as FormatDescriptionEvent, RotateEvent, etc. should be the same as before
case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent, *replication.XIDEvent, *replication.TableMapEvent:
case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent,
*replication.MariadbGTIDEvent, *replication.XIDEvent, *replication.TableMapEvent:
// replace with heartbeat event
e = event.GenHeartbeatEvent(e.Header)
lastSkipGTIDHeader = e.Header
default:
}
return nil
} else if lastSkipGTID && lastSkipGTIDHeader != nil {
// skipGTID is turned off after this event
select {
case s.ch <- event.GenHeartbeatEvent(lastSkipGTIDHeader):
case <-ctx.Done():
}
}

select {
Expand All @@ -540,11 +554,11 @@ func (r *BinlogReader) parseFile(
// ref: https://github.com/mysql/mysql-server/blob/4f1d7cf5fcb11a3f84cff27e37100d7295e7d5ca/sql/rpl_binlog_sender.cc#L248
e, err2 := utils.GenFakeRotateEvent(relayLogFile, uint64(offset), r.latestServerID)
if err2 != nil {
return false, false, 0, "", "", false, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset)
return false, false, 0, "", "", false, nil, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset)
}
err2 = onEventFunc(e)
if err2 != nil {
return false, false, 0, "", "", false, terror.Annotatef(err2, "send event %+v", e.Header)
return false, false, 0, "", "", false, nil, terror.Annotatef(err2, "send event %+v", e.Header)
}
r.tctx.L().Info("start parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset))
} else {
Expand All @@ -558,15 +572,15 @@ func (r *BinlogReader) parseFile(
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
return false, false, 0, "", "", false, nil, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
}
}
r.tctx.L().Debug("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", latestPos))

if !possibleLast {
// there are more relay log files in current sub directory, continue to re-collect them
r.tctx.L().Info("more relay log files need to parse", zap.String("directory", relayLogDir))
return false, false, latestPos, "", "", false, nil
return false, false, latestPos, "", "", false, nil, nil
}

switchCh := make(chan SwitchPath, 1)
Expand Down Expand Up @@ -598,22 +612,22 @@ func (r *BinlogReader) parseFile(

select {
case <-ctx.Done():
return false, false, 0, "", "", false, nil
return false, false, 0, "", "", false, nil, nil
case switchResp := <-switchCh:
// update new uuid
if err = r.updateUUIDs(); err != nil {
return false, false, 0, "", "", false, nil
return false, false, 0, "", "", false, nil, nil
}
return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, false, nil
return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, false, nil, nil
case updatePath := <-updatePathCh:
if strings.HasSuffix(updatePath, relayLogFile) {
// current relay log file updated, need to re-parse it
return false, true, latestPos, "", "", replaceWithHeartbeat, nil
return false, true, latestPos, "", "", skipGTID, lastSkipGTIDHeader, nil
}
// need parse next relay log file or re-collect files
return false, false, latestPos, "", "", false, nil
return false, false, latestPos, "", "", false, nil, nil
case err := <-updateErrCh:
return false, false, 0, "", "", false, err
return false, false, 0, "", "", false, nil, err
}
}

Expand Down
Loading