Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer, relay: avoid read duplicate event when retry binlog streamer #2047

Merged
merged 25 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
48b8662
avoid read duplicate event when retry binlog streamer
glorv Aug 25, 2021
5855d5a
add log
glorv Aug 26, 2021
f874b83
fix bug and add integration
glorv Aug 26, 2021
76d2271
Merge branch 'master' into dup-event
glorv Aug 26, 2021
9e2279a
add an error when met truncated relay binlog file
glorv Aug 26, 2021
95e57e1
add to other_integration_2
glorv Aug 27, 2021
b9c85f6
add integration tests
glorv Aug 27, 2021
069a2a5
fix fmt
glorv Aug 27, 2021
f37aa69
Merge branch 'master' of ssh://github.com/pingcap/dm into dup-event
glorv Aug 27, 2021
299b971
Merge branch 'master' into dup-event
lance6716 Aug 30, 2021
1b70db4
add log for metrics
glorv Aug 30, 2021
2049b34
Merge branch 'dup-event' of ssh://github.com/glorv/dm into dup-event
glorv Aug 30, 2021
e27b9cc
add log for metrics
glorv Aug 30, 2021
9e4fd83
replace the error with a mock event to ensure the binlog ord
glorv Sep 6, 2021
db4bb96
Merge branch 'master' of ssh://github.com/pingcap/dm into dup-event
glorv Sep 6, 2021
17ce8e5
resolve comments
glorv Sep 7, 2021
2f3cc90
Merge branch 'master' into dup-event
glorv Sep 7, 2021
54b66e1
resolve comments
glorv Sep 7, 2021
a4da905
Merge branch 'master' into dup-event
glorv Sep 7, 2021
1e5aab7
add test commnet
glorv Sep 7, 2021
569cc6d
Merge branch 'dup-event' of ssh://github.com/glorv/dm into dup-event
glorv Sep 7, 2021
221e151
fix test comment again
glorv Sep 7, 2021
9275734
fix
glorv Sep 7, 2021
c9c6280
fix lint
glorv Sep 8, 2021
4b73e9c
Merge branch 'master' into dup-event
glorv Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import (
"github.com/pingcap/dm/pkg/utils"
)

// ErrorMaybeDuplicateEvent indicates that there may be duplicate event in next binlog file
// this is mainly happened when upstream master changed when relay log not finish reading a transaction.
var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated")

// Meta represents binlog meta information in relay.meta.
type Meta struct {
BinLogName string `toml:"binlog-name" json:"binlog-name"`
Expand Down Expand Up @@ -549,6 +553,15 @@ func (r *BinlogReader) parseFile(
if err != nil {
if possibleLast && isIgnorableParseError(err) {
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
// the file is truncated, we send a mock event with `IGNORABLE_EVENT` to notify the the consumer
// TODO: should add a integration test for this
e := &replication.BinlogEvent{
RawData: []byte(ErrorMaybeDuplicateEvent.Error()),
Header: &replication.EventHeader{
EventType: replication.IGNORABLE_EVENT,
},
}
s.ch <- e
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
Expand Down
7 changes: 7 additions & 0 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package streamer

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -66,6 +67,12 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent,
heartbeatHeader := &replication.EventHeader{}
return event.GenHeartbeatEvent(heartbeatHeader), nil
case c := <-s.ch:
// special check for maybe truncated relay log
if c.Header.EventType == replication.IGNORABLE_EVENT {
if bytes.Equal(c.RawData, []byte(ErrorMaybeDuplicateEvent.Error())) {
return nil, ErrorMaybeDuplicateEvent
}
}
return c, nil
case s.err = <-s.ech:
return nil, s.err
Expand Down
74 changes: 62 additions & 12 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ func (r *Relay) process(ctx context.Context) error {
// handles binlog events with retry mechanism.
// it only do the retry for some binlog reader error now.
for {
err := r.handleEvents(ctx, reader2, transformer2, writer2)
eventIdx, err := r.handleEvents(ctx, reader2, transformer2, writer2)
checkError:
if err == nil {
return nil
} else if !readerRetry.Check(ctx, err) {
Expand All @@ -319,6 +320,33 @@ func (r *Relay) process(ctx context.Context) error {
return err
}
r.logger.Info("retrying to read binlog")
if r.cfg.EnableGTID && eventIdx > 0 {
// check if server has switched
isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor)
// should start from the transaction beginning when switch to a new server
if err2 != nil {
r.logger.Warn("check new server failed, continue outer loop", log.ShortError(err2))
err = err2
goto checkError
}
if !isNew {
for i := 0; i < eventIdx; {
res, err2 := reader2.GetEvent(ctx)
if err2 != nil {
err = err2
goto checkError
}
tResult := transformer2.Transform(res.Event)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
// do not count skip event
if !tResult.Ignore {
i++
}
}
if eventIdx > 0 {
r.logger.Info("discard duplicate event", zap.Int("count", eventIdx))
}
}
}
}
}

Expand Down Expand Up @@ -426,26 +454,43 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
// 2. transform events
// 3. write events into relay log files
// 4. update metadata if needed
func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transformer2 transformer.Transformer, writer2 writer.Writer) error {
// the first return value is the index of last read rows event if the transaction is not finished.
func (r *Relay) handleEvents(
ctx context.Context,
reader2 reader.Reader,
transformer2 transformer.Transformer,
writer2 writer.Writer,
) (int, error) {
var (
_, lastPos = r.meta.Pos()
_, lastGTID = r.meta.GTID()
err error
eventIndex int
)
if lastGTID == nil {
if lastGTID, err = gtid.ParserGTID(r.cfg.Flavor, ""); err != nil {
return err
return 0, err
}
}

for {
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)
failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) {
if intVal, ok := v.(int); ok && intVal == eventIndex {
err = errors.New("fail point triggered")
_, gtid := r.meta.GTID()
r.logger.Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("gtid", gtid), log.ShortError(err))
// wait backoff retry interval
time.Sleep(1 * time.Second)
}
})
if err != nil {
switch errors.Cause(err) {
case context.Canceled:
return nil
return 0, nil
case replication.ErrChecksumMismatch:
relayLogDataCorruptionCounter.Inc()
case replication.ErrSyncClosed, replication.ErrNeedSyncAgain:
Expand All @@ -464,7 +509,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
}
binlogReadErrorCounter.Inc()
}
return err
return eventIndex, err
}

binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds())
Expand Down Expand Up @@ -496,13 +541,15 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo

if _, ok := e.Event.(*replication.RotateEvent); ok && utils.IsFakeRotateEvent(e.Header) {
isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor)
// should start from the transaction beginning when switch to a new server
if err2 != nil {
return err2
return 0, err2
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
// upstream database switch
// report an error, let outer logic handle it
// should start from the transaction beginning when switch to a new server
if isNew {
return terror.ErrRotateEventWithDifferentServerID.Generate()
return 0, terror.ErrRotateEventWithDifferentServerID.Generate()
}
}

Expand All @@ -512,7 +559,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
wResult, err := writer2.WriteEvent(e)
if err != nil {
relayLogWriteErrorCounter.Inc()
return err
return eventIndex, err
} else if wResult.Ignore {
r.logger.Info("ignore event by writer",
zap.Reflect("header", e.Header),
Expand All @@ -528,7 +575,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
lastPos.Pos = tResult.LogPos
err = lastGTID.Set(tResult.GTIDSet)
if err != nil {
return terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet)
return 0, terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet)
}
if !r.cfg.EnableGTID {
// if go-mysql set RawModeEnabled to true
Expand All @@ -553,17 +600,20 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
if needSavePos {
err = r.SaveMeta(lastPos, lastGTID)
if err != nil {
return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
return 0, terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
}
eventIndex = 0
} else {
eventIndex++
}
if tResult.NextLogName != "" && !utils.IsFakeRotateEvent(e.Header) {
// if the binlog is rotated, we need to save and flush the next binlog filename to meta
lastPos.Name = tResult.NextLogName
if err := r.SaveMeta(lastPos, lastGTID); err != nil {
return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
return 0, terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
}
if err := r.FlushMeta(); err != nil {
return err
return 0, err
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
replication.ErrSyncClosed,
replication.ErrNeedSyncAgain,
} {
err := r.handleEvents(ctx, reader2, transformer2, writer2)
_, err := r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, reader2.err)
}

Expand All @@ -428,14 +428,14 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// writer return error
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
err := r.handleEvents(ctx, reader2, transformer2, writer2)
_, err := r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
// after handle rotate event, we save and flush the meta immediately
c.Assert(r.meta.Dirty(), Equals, false)

// writer without error
writer2.err = nil
err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
_, err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
Expand All @@ -450,7 +450,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {

// write a QueryEvent with GTID sets
reader2.result.Event = queryEv
err = r.handleEvents(ctx2, reader2, transformer2, writer2)
_, err = r.handleEvents(ctx2, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
Expand All @@ -469,7 +469,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel4()
err = r.handleEvents(ctx4, reader2, transformer2, writer2)
_, err = r.handleEvents(ctx4, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx4.Done():
Expand All @@ -482,7 +482,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
writer2.result.Ignore = true
ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel5()
err = r.handleEvents(ctx5, reader2, transformer2, writer2)
_, err = r.handleEvents(ctx5, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx5.Done():
Expand Down
5 changes: 5 additions & 0 deletions relay/retry/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"

"github.com/pingcap/dm/pkg/backoff"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -56,6 +58,9 @@ func NewReaderRetry(cfg ReaderRetryConfig) (*ReaderRetry, error) {

// Check checks whether should retry for the error.
func (rr *ReaderRetry) Check(ctx context.Context, err error) bool {
failpoint.Inject("RelayAllowRetry", func() {
failpoint.Return(true)
})
if !retry.IsConnectionError(err) {
return false
}
Expand Down
3 changes: 3 additions & 0 deletions syncer/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context) {
// send error to the fatal chan to interrupt the process
s.runFatalChan <- unit.NewProcessError(err)
}
if !s.safeMode.Enable() {
s.tctx.L().Info("disable safe-mode after task initialization finished")
}
}()

initPhaseSeconds := s.cfg.CheckpointFlushInterval * 2
Expand Down
48 changes: 46 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,28 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return nil
}

maybeSkipNRowsEvent := func(n int) error {
if s.cfg.EnableGTID && n > 0 {
for i := 0; i < n; {
e, err := s.getEvent(tctx, currentLocation)
if err != nil {
return err
}
if _, ok := e.Event.(*replication.RowsEvent); ok {
i++
}
}
log.L().Info("discard event already consumed", zap.Int("count", n),
zap.Any("cur_loc", currentLocation))
}
return nil
}

// eventIndex is the rows event index in this transaction, it's used to avoiding read duplicate event in gtid mode
eventIndex := 0
// the relay log file may be truncated(not end with an RotateEvent), in this situation, we may read some rows events
// and then read from the gtid again, so we force enter safe-mode for one more transaction to avoid failure due to
// conflict
for {
if s.execError.Load() != nil {
return nil
Expand Down Expand Up @@ -1804,6 +1826,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
err = errors.New("connect: connection refused")
}
})
failpoint.Inject("GetEventErrorInTxn", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == eventIndex {
err = errors.New("failpoint triggered")
s.tctx.L().Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("cur_pos", currentLocation), zap.Any("las_pos", lastLocation),
zap.Any("pos", e.Header.LogPos), log.ShortError(err))
}
})
switch {
case err == context.Canceled:
tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation))
Expand All @@ -1819,6 +1849,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return err1
}
continue
case err == streamer.ErrorMaybeDuplicateEvent:
tctx.L().Warn("read binlog met a truncated file, need to open safe-mode until the next transaction")
err = maybeSkipNRowsEvent(eventIndex)
if err == nil {
continue
}
log.L().Warn("skip duplicate rows event failed", zap.Error(err))
}

if err != nil {
Expand All @@ -1829,11 +1866,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

if s.streamerController.CanRetry(err) {
err = s.streamerController.ResetReplicationSyncer(tctx, lastLocation)
// GlobalPoint is the last finished GTID
err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint())
if err != nil {
return err
}
log.L().Info("reset replication binlog puller")
log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint()))
if err = maybeSkipNRowsEvent(eventIndex); err != nil {
return err
}
continue
}

Expand Down Expand Up @@ -1985,12 +2026,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.RotateEvent:
err2 = s.handleRotateEvent(ev, ec)
case *replication.RowsEvent:
eventIndex++
metrics.BinlogEventRowHistogram.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID).Observe(float64(len(ev.Rows)))
err2 = s.handleRowsEvent(ev, ec)
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(ev.Query))
err2 = s.handleQueryEvent(ev, ec, originSQL)
case *replication.XIDEvent:
// reset eventIndex and force safeMode flag here.
eventIndex = 0
if shardingReSync != nil {
shardingReSync.currLocation.Position.Pos = e.Header.LogPos
shardingReSync.currLocation.Suffix = currentLocation.Suffix
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/check_metric
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ while [ $counter -lt $retry_count ]; do
exit 0
fi
((counter += 1))
echo "wait for valid metric for $counter-th time"
echo "wait for valid metric for $counter-th time, got value: '$metric'"
sleep 1
done

Expand Down
Loading