Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: revert dm#2047 #3491

Merged
merged 6 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions dm/pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ import (
"github.com/pingcap/ticdc/dm/pkg/utils"
)

// ErrorMaybeDuplicateEvent indicates that there may be duplicate event in next binlog file
// this is mainly happened when upstream master changed when relay log not finish reading a transaction.
var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated")

// Meta represents binlog meta information in relay.meta.
type Meta struct {
BinLogName string `toml:"binlog-name" json:"binlog-name"`
Expand Down Expand Up @@ -560,15 +556,6 @@ func (r *BinlogReader) parseFile(
if err != nil {
if possibleLast && isIgnorableParseError(err) {
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
// the file is truncated, we send a mock event with `IGNORABLE_EVENT` to notify the the consumer
// TODO: should add a integration test for this
e := &replication.BinlogEvent{
RawData: []byte(ErrorMaybeDuplicateEvent.Error()),
Header: &replication.EventHeader{
EventType: replication.IGNORABLE_EVENT,
},
}
s.ch <- e
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
Expand Down
7 changes: 0 additions & 7 deletions dm/pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package streamer

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -67,12 +66,6 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent,
heartbeatHeader := &replication.EventHeader{}
return event.GenHeartbeatEvent(heartbeatHeader), nil
case c := <-s.ch:
// special check for maybe truncated relay log
if c.Header.EventType == replication.IGNORABLE_EVENT {
if bytes.Equal(c.RawData, []byte(ErrorMaybeDuplicateEvent.Error())) {
return nil, ErrorMaybeDuplicateEvent
}
}
return c, nil
case s.err = <-s.ech:
return nil, s.err
Expand Down
68 changes: 12 additions & 56 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ func (r *Relay) process(ctx context.Context) error {
// handles binlog events with retry mechanism.
// it only do the retry for some binlog reader error now.
for {
eventIdx, err := r.handleEvents(ctx, reader2, transformer2, writer2)
checkError:
err := r.handleEvents(ctx, reader2, transformer2, writer2)
if err == nil {
return nil
} else if !readerRetry.Check(ctx, err) {
Expand All @@ -325,33 +324,6 @@ func (r *Relay) process(ctx context.Context) error {
return err
}
r.logger.Info("retrying to read binlog")
if r.cfg.EnableGTID && eventIdx > 0 {
// check if server has switched
isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor)
// should start from the transaction beginning when switch to a new server
if err2 != nil {
r.logger.Warn("check new server failed, continue outer loop", log.ShortError(err2))
err = err2
goto checkError
}
if !isNew {
for i := 0; i < eventIdx; {
res, err2 := reader2.GetEvent(ctx)
if err2 != nil {
err = err2
goto checkError
}
tResult := transformer2.Transform(res.Event)
// do not count skip event
if !tResult.Ignore {
i++
}
}
if eventIdx > 0 {
r.logger.Info("discard duplicate event", zap.Int("count", eventIdx))
}
}
}
}
}

Expand Down Expand Up @@ -465,16 +437,15 @@ func (r *Relay) handleEvents(
reader2 reader.Reader,
transformer2 transformer.Transformer,
writer2 writer.Writer,
) (int, error) {
) error {
var (
_, lastPos = r.meta.Pos()
_, lastGTID = r.meta.GTID()
err error
eventIndex int
)
if lastGTID == nil {
if lastGTID, err = gtid.ParserGTID(r.cfg.Flavor, ""); err != nil {
return 0, err
return err
}
}

Expand All @@ -483,20 +454,10 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)
failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) {
if intVal, ok := v.(int); ok && intVal == eventIndex {
err = errors.New("fail point triggered")
_, gtid := r.meta.GTID()
r.logger.Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("gtid", gtid), log.ShortError(err))
// wait backoff retry interval
time.Sleep(1 * time.Second)
}
})
if err != nil {
switch errors.Cause(err) {
case context.Canceled:
return 0, nil
return nil
case replication.ErrChecksumMismatch:
relayLogDataCorruptionCounter.Inc()
case replication.ErrSyncClosed, replication.ErrNeedSyncAgain:
Expand All @@ -515,7 +476,7 @@ func (r *Relay) handleEvents(
}
binlogReadErrorCounter.Inc()
}
return eventIndex, err
return err
}

binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds())
Expand Down Expand Up @@ -547,15 +508,13 @@ func (r *Relay) handleEvents(

if _, ok := e.Event.(*replication.RotateEvent); ok && utils.IsFakeRotateEvent(e.Header) {
isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor)
// should start from the transaction beginning when switch to a new server
if err2 != nil {
return 0, err2
return err2
}
// upstream database switch
// report an error, let outer logic handle it
// should start from the transaction beginning when switch to a new server
if isNew {
return 0, terror.ErrRotateEventWithDifferentServerID.Generate()
return terror.ErrRotateEventWithDifferentServerID.Generate()
}
}

Expand All @@ -566,7 +525,7 @@ func (r *Relay) handleEvents(
// and meta file is not created when relay resumed.
firstEvent = false
if err2 := r.saveAndFlushMeta(lastPos, lastGTID); err2 != nil {
return 0, err2
return err2
}
}

Expand All @@ -576,7 +535,7 @@ func (r *Relay) handleEvents(
wResult, err := writer2.WriteEvent(e)
if err != nil {
relayLogWriteErrorCounter.Inc()
return eventIndex, err
return err
} else if wResult.Ignore {
r.logger.Info("ignore event by writer",
zap.Reflect("header", e.Header),
Expand All @@ -595,7 +554,7 @@ func (r *Relay) handleEvents(
lastPos.Pos = tResult.LogPos
err = lastGTID.Set(tResult.GTIDSet)
if err != nil {
return 0, terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet)
return terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet)
}
if !r.cfg.EnableGTID {
// if go-mysql set RawModeEnabled to true
Expand All @@ -620,17 +579,14 @@ func (r *Relay) handleEvents(
if needSavePos {
err = r.SaveMeta(lastPos, lastGTID)
if err != nil {
return 0, terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
}
eventIndex = 0
} else {
eventIndex++
}
if tResult.NextLogName != "" && !utils.IsFakeRotateEvent(e.Header) {
// if the binlog is rotated, we need to save and flush the next binlog filename to meta
lastPos.Name = tResult.NextLogName
if err := r.saveAndFlushMeta(lastPos, lastGTID); err != nil {
return 0, err
return err
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions dm/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
replication.ErrSyncClosed,
replication.ErrNeedSyncAgain,
} {
_, handleErr := r.handleEvents(context.Background(), reader2, transformer2, writer2)
handleErr := r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(handleErr), Equals, reader2.err)
}

Expand All @@ -458,7 +458,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// writer return error to force handleEvents return
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
// after handle rotate event, we save and flush the meta immediately
c.Assert(r.meta.Dirty(), Equals, false)
Expand All @@ -477,7 +477,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
lm := r.meta.(*LocalMeta)
backupUUID := lm.currentUUID
lm.currentUUID = "not exist"
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(os.IsNotExist(errors.Cause(err)), Equals, true)
lm.currentUUID = backupUUID
}
Expand All @@ -489,15 +489,15 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// writer return error
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
c.Assert(r.meta.Dirty(), Equals, false)

// writer without error
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
writer2.err = nil
_, err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
Expand All @@ -512,7 +512,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {

// write a QueryEvent with GTID sets
reader2.result.Event = queryEv
_, err = r.handleEvents(ctx2, reader2, transformer2, writer2)
err = r.handleEvents(ctx2, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
Expand All @@ -531,7 +531,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel4()
_, err = r.handleEvents(ctx4, reader2, transformer2, writer2)
err = r.handleEvents(ctx4, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx4.Done():
Expand All @@ -544,7 +544,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
writer2.result.Ignore = true
ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel5()
_, err = r.handleEvents(ctx5, reader2, transformer2, writer2)
err = r.handleEvents(ctx5, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx5.Done():
Expand Down
5 changes: 0 additions & 5 deletions dm/relay/retry/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"

"github.com/pingcap/ticdc/dm/pkg/backoff"
"github.com/pingcap/ticdc/dm/pkg/retry"
"github.com/pingcap/ticdc/dm/pkg/terror"
Expand Down Expand Up @@ -58,9 +56,6 @@ func NewReaderRetry(cfg ReaderRetryConfig) (*ReaderRetry, error) {

// Check checks whether should retry for the error.
func (rr *ReaderRetry) Check(ctx context.Context, err error) bool {
failpoint.Inject("RelayAllowRetry", func() {
failpoint.Return(true)
})
if !retry.IsConnectionError(err) {
return false
}
Expand Down
47 changes: 2 additions & 45 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1554,28 +1554,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return nil
}

maybeSkipNRowsEvent := func(n int) error {
if s.cfg.EnableGTID && n > 0 {
for i := 0; i < n; {
e, err1 := s.getEvent(tctx, currentLocation)
if err1 != nil {
return err
}
if _, ok := e.Event.(*replication.RowsEvent); ok {
i++
}
}
log.L().Info("discard event already consumed", zap.Int("count", n),
zap.Any("cur_loc", currentLocation))
}
return nil
}

// eventIndex is the rows event index in this transaction, it's used to avoiding read duplicate event in gtid mode
eventIndex := 0
// the relay log file may be truncated(not end with an RotateEvent), in this situation, we may read some rows events
// and then read from the gtid again, so we force enter safe-mode for one more transaction to avoid failure due to
// conflict
for {
if s.execError.Load() != nil {
return nil
Expand Down Expand Up @@ -1617,14 +1595,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
err = errors.New("connect: connection refused")
}
})
failpoint.Inject("GetEventErrorInTxn", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == eventIndex {
err = errors.New("failpoint triggered")
s.tctx.L().Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("cur_pos", currentLocation), zap.Any("las_pos", lastLocation),
zap.Any("pos", e.Header.LogPos), log.ShortError(err))
}
})
switch {
case err == context.Canceled:
tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation))
Expand All @@ -1640,13 +1610,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return err1
}
continue
case err == streamer.ErrorMaybeDuplicateEvent:
tctx.L().Warn("read binlog met a truncated file, need to open safe-mode until the next transaction")
err = maybeSkipNRowsEvent(eventIndex)
if err == nil {
continue
}
log.L().Warn("skip duplicate rows event failed", zap.Error(err))
}

if err != nil {
Expand All @@ -1657,15 +1620,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

if s.streamerController.CanRetry(err) {
// GlobalPoint is the last finished GTID
err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint())
// lastLocation is the last finished GTID
err = s.streamerController.ResetReplicationSyncer(tctx, lastLocation)
if err != nil {
return err
}
log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint()))
if err = maybeSkipNRowsEvent(eventIndex); err != nil {
return err
}
continue
}

Expand Down Expand Up @@ -1817,15 +1777,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.RotateEvent:
err2 = s.handleRotateEvent(ev, ec)
case *replication.RowsEvent:
eventIndex++
metrics.BinlogEventRowHistogram.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID).Observe(float64(len(ev.Rows)))
err2 = s.handleRowsEvent(ev, ec)
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(ev.Query))
err2 = s.handleQueryEvent(ev, ec, originSQL)
case *replication.XIDEvent:
// reset eventIndex and force safeMode flag here.
eventIndex = 0
if shardingReSync != nil {
shardingReSync.currLocation.Position.Pos = e.Header.LogPos
shardingReSync.currLocation.Suffix = currentLocation.Suffix
Expand Down
1 change: 0 additions & 1 deletion dm/tests/others_integration_2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ case_sensitive
sql_mode
http_proxies
openapi
duplicate_event
tracker_ignored_ddl