Skip to content

Commit

Permalink
dm: revert dm#2048 (#3491)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Nov 17, 2021
1 parent f847b33 commit c4c78d3
Show file tree
Hide file tree
Showing 30 changed files with 250 additions and 176 deletions.
2 changes: 1 addition & 1 deletion dm/dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
func (o *Optimist) removeLock(lock *optimism.Lock) (bool, error) {
failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd",
log.L().Info("wait new ddl info putted into etcd in optimistic",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {

failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd",
log.L().Info("wait new ddl info putted into etcd in pessimistic",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

Expand Down
13 changes: 0 additions & 13 deletions dm/pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ import (
"github.com/pingcap/ticdc/dm/pkg/utils"
)

// ErrorMaybeDuplicateEvent indicates that there may be duplicate event in next binlog file
// this is mainly happened when upstream master changed when relay log not finish reading a transaction.
var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated")

// Meta represents binlog meta information in relay.meta.
type Meta struct {
BinLogName string `toml:"binlog-name" json:"binlog-name"`
Expand Down Expand Up @@ -560,15 +556,6 @@ func (r *BinlogReader) parseFile(
if err != nil {
if possibleLast && isIgnorableParseError(err) {
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
// the file is truncated, we send a mock event with `IGNORABLE_EVENT` to notify the the consumer
// TODO: should add a integration test for this
e := &replication.BinlogEvent{
RawData: []byte(ErrorMaybeDuplicateEvent.Error()),
Header: &replication.EventHeader{
EventType: replication.IGNORABLE_EVENT,
},
}
s.ch <- e
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
Expand Down
7 changes: 0 additions & 7 deletions dm/pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package streamer

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -67,12 +66,6 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent,
heartbeatHeader := &replication.EventHeader{}
return event.GenHeartbeatEvent(heartbeatHeader), nil
case c := <-s.ch:
// special check for maybe truncated relay log
if c.Header.EventType == replication.IGNORABLE_EVENT {
if bytes.Equal(c.RawData, []byte(ErrorMaybeDuplicateEvent.Error())) {
return nil, ErrorMaybeDuplicateEvent
}
}
return c, nil
case s.err = <-s.ech:
return nil, s.err
Expand Down
68 changes: 12 additions & 56 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ func (r *Relay) process(ctx context.Context) error {
// handles binlog events with retry mechanism.
// it only do the retry for some binlog reader error now.
for {
eventIdx, err := r.handleEvents(ctx, reader2, transformer2, writer2)
checkError:
err := r.handleEvents(ctx, reader2, transformer2, writer2)
if err == nil {
return nil
} else if !readerRetry.Check(ctx, err) {
Expand All @@ -325,33 +324,6 @@ func (r *Relay) process(ctx context.Context) error {
return err
}
r.logger.Info("retrying to read binlog")
if r.cfg.EnableGTID && eventIdx > 0 {
// check if server has switched
isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor)
// should start from the transaction beginning when switch to a new server
if err2 != nil {
r.logger.Warn("check new server failed, continue outer loop", log.ShortError(err2))
err = err2
goto checkError
}
if !isNew {
for i := 0; i < eventIdx; {
res, err2 := reader2.GetEvent(ctx)
if err2 != nil {
err = err2
goto checkError
}
tResult := transformer2.Transform(res.Event)
// do not count skip event
if !tResult.Ignore {
i++
}
}
if eventIdx > 0 {
r.logger.Info("discard duplicate event", zap.Int("count", eventIdx))
}
}
}
}
}

Expand Down Expand Up @@ -465,16 +437,15 @@ func (r *Relay) handleEvents(
reader2 reader.Reader,
transformer2 transformer.Transformer,
writer2 writer.Writer,
) (int, error) {
) error {
var (
_, lastPos = r.meta.Pos()
_, lastGTID = r.meta.GTID()
err error
eventIndex int
)
if lastGTID == nil {
if lastGTID, err = gtid.ParserGTID(r.cfg.Flavor, ""); err != nil {
return 0, err
return err
}
}

Expand All @@ -483,20 +454,10 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)
failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) {
if intVal, ok := v.(int); ok && intVal == eventIndex {
err = errors.New("fail point triggered")
_, gtid := r.meta.GTID()
r.logger.Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("gtid", gtid), log.ShortError(err))
// wait backoff retry interval
time.Sleep(1 * time.Second)
}
})
if err != nil {
switch errors.Cause(err) {
case context.Canceled:
return 0, nil
return nil
case replication.ErrChecksumMismatch:
relayLogDataCorruptionCounter.Inc()
case replication.ErrSyncClosed, replication.ErrNeedSyncAgain:
Expand All @@ -515,7 +476,7 @@ func (r *Relay) handleEvents(
}
binlogReadErrorCounter.Inc()
}
return eventIndex, err
return err
}

binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds())
Expand Down Expand Up @@ -547,15 +508,13 @@ func (r *Relay) handleEvents(

if _, ok := e.Event.(*replication.RotateEvent); ok && utils.IsFakeRotateEvent(e.Header) {
isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db.DB, r.cfg.Flavor)
// should start from the transaction beginning when switch to a new server
if err2 != nil {
return 0, err2
return err2
}
// upstream database switch
// report an error, let outer logic handle it
// should start from the transaction beginning when switch to a new server
if isNew {
return 0, terror.ErrRotateEventWithDifferentServerID.Generate()
return terror.ErrRotateEventWithDifferentServerID.Generate()
}
}

Expand All @@ -566,7 +525,7 @@ func (r *Relay) handleEvents(
// and meta file is not created when relay resumed.
firstEvent = false
if err2 := r.saveAndFlushMeta(lastPos, lastGTID); err2 != nil {
return 0, err2
return err2
}
}

Expand All @@ -576,7 +535,7 @@ func (r *Relay) handleEvents(
wResult, err := writer2.WriteEvent(e)
if err != nil {
relayLogWriteErrorCounter.Inc()
return eventIndex, err
return err
} else if wResult.Ignore {
r.logger.Info("ignore event by writer",
zap.Reflect("header", e.Header),
Expand All @@ -595,7 +554,7 @@ func (r *Relay) handleEvents(
lastPos.Pos = tResult.LogPos
err = lastGTID.Set(tResult.GTIDSet)
if err != nil {
return 0, terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet)
return terror.ErrRelayUpdateGTID.Delegate(err, lastGTID, tResult.GTIDSet)
}
if !r.cfg.EnableGTID {
// if go-mysql set RawModeEnabled to true
Expand All @@ -620,17 +579,14 @@ func (r *Relay) handleEvents(
if needSavePos {
err = r.SaveMeta(lastPos, lastGTID)
if err != nil {
return 0, terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
}
eventIndex = 0
} else {
eventIndex++
}
if tResult.NextLogName != "" && !utils.IsFakeRotateEvent(e.Header) {
// if the binlog is rotated, we need to save and flush the next binlog filename to meta
lastPos.Name = tResult.NextLogName
if err := r.saveAndFlushMeta(lastPos, lastGTID); err != nil {
return 0, err
return err
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions dm/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
replication.ErrSyncClosed,
replication.ErrNeedSyncAgain,
} {
_, handleErr := r.handleEvents(context.Background(), reader2, transformer2, writer2)
handleErr := r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(handleErr), Equals, reader2.err)
}

Expand All @@ -458,7 +458,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// writer return error to force handleEvents return
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
// after handle rotate event, we save and flush the meta immediately
c.Assert(r.meta.Dirty(), Equals, false)
Expand All @@ -477,7 +477,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
lm := r.meta.(*LocalMeta)
backupUUID := lm.currentUUID
lm.currentUUID = "not exist"
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(os.IsNotExist(errors.Cause(err)), Equals, true)
lm.currentUUID = backupUUID
}
Expand All @@ -489,15 +489,15 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// writer return error
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
c.Assert(r.meta.Dirty(), Equals, false)

// writer without error
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
writer2.err = nil
_, err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
Expand All @@ -512,7 +512,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {

// write a QueryEvent with GTID sets
reader2.result.Event = queryEv
_, err = r.handleEvents(ctx2, reader2, transformer2, writer2)
err = r.handleEvents(ctx2, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
Expand All @@ -531,7 +531,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel4()
_, err = r.handleEvents(ctx4, reader2, transformer2, writer2)
err = r.handleEvents(ctx4, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx4.Done():
Expand All @@ -544,7 +544,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
writer2.result.Ignore = true
ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel5()
_, err = r.handleEvents(ctx5, reader2, transformer2, writer2)
err = r.handleEvents(ctx5, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx5.Done():
Expand Down
5 changes: 0 additions & 5 deletions dm/relay/retry/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"

"github.com/pingcap/ticdc/dm/pkg/backoff"
"github.com/pingcap/ticdc/dm/pkg/retry"
"github.com/pingcap/ticdc/dm/pkg/terror"
Expand Down Expand Up @@ -58,9 +56,6 @@ func NewReaderRetry(cfg ReaderRetryConfig) (*ReaderRetry, error) {

// Check checks whether should retry for the error.
func (rr *ReaderRetry) Check(ctx context.Context, err error) bool {
failpoint.Inject("RelayAllowRetry", func() {
failpoint.Return(true)
})
if !retry.IsConnectionError(err) {
return false
}
Expand Down
Loading

0 comments on commit c4c78d3

Please sign in to comment.