diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 1650d0a98e..22561a4cb3 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -127,6 +127,7 @@ ErrNoUUIDDirMatchGTID,[code=11120:class=functional:scope=internal:level=high], " ErrNoRelayPosMatchGTID,[code=11121:class=functional:scope=internal:level=high], "Message: no relay pos match gtid %s" ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low] ErrMetadataNoBinlogLoc,[code=11123:class=functional:scope=upstream:level=low], "Message: didn't found binlog location in dumped metadata file %s, Workaround: Please check log of dump unit, there maybe errors when read upstream binlog status" +ErrPreviousGTIDNotExist,[code=11124:class=functional:scope=internal:level=high], "Message: no previous gtid event from binlog %s" ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`." ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format." ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format." diff --git a/chaos/cases/conf/source3.yaml b/chaos/cases/conf/source3.yaml index d944aebcc0..2d4bff1ec9 100644 --- a/chaos/cases/conf/source3.yaml +++ b/chaos/cases/conf/source3.yaml @@ -1,5 +1,5 @@ source-id: "replica-03" -enable-gtid: false +enable-gtid: true enable-relay: true from: diff --git a/errors.toml b/errors.toml index 1e635f7f5c..a1ef08ecb3 100644 --- a/errors.toml +++ b/errors.toml @@ -772,6 +772,12 @@ description = "" workaround = "Please check log of dump unit, there maybe errors when read upstream binlog status" tags = ["upstream", "low"] +[error.DM-functional-11124] +message = "no previous gtid event from binlog %s" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-config-20001] message = "checking item %s is not supported\n%s" description = "" diff --git a/pkg/binlog/event/event.go b/pkg/binlog/event/event.go index eb3995358d..2c6c20324e 100644 --- a/pkg/binlog/event/event.go +++ b/pkg/binlog/event/event.go @@ -851,3 +851,19 @@ func GenDummyEvent(header *replication.EventHeader, latestPos uint32, eventSize ev, err := GenQueryEvent(&headerClone, latestPos, 0, 0, 0, nil, nil, queryBytes) return ev, err } + +// GenHeartbeatEvent generates a heartbeat event. +// ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html +func GenHeartbeatEvent(header *replication.EventHeader) *replication.BinlogEvent { + // modify header + headerClone := *header // do a copy + headerClone.Flags = 0 + headerClone.EventSize = 39 + headerClone.Timestamp = 0 + headerClone.EventType = replication.HEARTBEAT_EVENT + + eventBytes := make([]byte, 39) + ev := &replication.BinlogEvent{Header: &headerClone, Event: &replication.GenericEvent{Data: eventBytes}} + + return ev +} diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index 2a45978931..01aaee33db 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -140,3 +140,39 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid } } } + +// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set. +// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event] +func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) { + err := r.StartSyncByGTID(gset) + if err != nil { + return nil, err + } + defer r.Close() + + for { + var e *replication.BinlogEvent + e, err = r.GetEvent(ctx) + if err != nil { + return nil, err + } + + switch e.Header.EventType { + case replication.ROTATE_EVENT: + if e.Header.Timestamp == 0 || e.Header.LogPos == 0 { // fake rotate event + continue + } + return nil, terror.ErrPreviousGTIDNotExist.Generate(gset.String()) + case replication.FORMAT_DESCRIPTION_EVENT: + continue + case replication.PREVIOUS_GTIDS_EVENT: + previousGset, err := event.GTIDsFromPreviousGTIDsEvent(e) + return previousGset, err + case replication.MARIADB_GTID_LIST_EVENT: + previousGset, err := event.GTIDsFromMariaDBGTIDListEvent(e) + return previousGset, err + default: + return nil, terror.ErrPreviousGTIDNotExist.Generate(gset.String()) + } + } +} diff --git a/pkg/binlog/reader/util_test.go b/pkg/binlog/reader/util_test.go index dd0558f021..c4c3d5f50d 100644 --- a/pkg/binlog/reader/util_test.go +++ b/pkg/binlog/reader/util_test.go @@ -62,3 +62,32 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*") c.Assert(gs, IsNil) } + +// added to testTCPReaderSuite to re-use DB connection. +func (t *testTCPReaderSuite) TestGetPreviousGTIDFromGTIDSet(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[1], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + ctx, cancel = context.WithTimeout(context.Background(), utils.DefaultDBTimeout) + ) + defer cancel() + + _, endGS, err := utils.GetMasterStatus(ctx, t.db, flavor) + c.Assert(err, IsNil) + + r1 := NewTCPReader(cfg) + c.Assert(r1, NotNil) + defer r1.Close() + + gs, err := GetPreviousGTIDFromGTIDSet(ctx, r1, endGS) + c.Assert(err, IsNil) + c.Assert(endGS.Contain(gs), IsTrue) +} diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index fe99e1c1c7..bb40760f0f 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/BurntSushi/toml" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" @@ -31,8 +30,10 @@ import ( "go.uber.org/zap" "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/binlog/reader" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -118,59 +119,21 @@ func (r *BinlogReader) checkRelayPos(pos mysql.Position) error { return nil } -// getUUIDByGTID gets uuid subdir which contain the gtid set -func (r *BinlogReader) getUUIDByGTID(gset mysql.GTIDSet) (string, error) { - // get flush logs from oldest to newest - for _, uuid := range r.uuids { - filename := path.Join(r.cfg.RelayDir, uuid, utils.MetaFilename) - var meta Meta - _, err := toml.DecodeFile(filename, &meta) - if err != nil { - return "", terror.ErrRelayLoadMetaData.Delegate(err) - } - - gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, meta.BinlogGTID) - if err != nil { - return "", terror.ErrRelayLoadMetaData.Delegate(err) - } - if gs.Contain(gset) { - r.tctx.L().Info("get uuid subdir by gtid", zap.Stringer("GTID Set", gset), zap.String("uuid", uuid), zap.Stringer("latest GTID Set in subdir", gs)) - return uuid, nil - } - } - - // TODO: use a better mechanism to call relay.meta.Flush - // get the meta save in memory - relayMetaHub := GetRelayMetaHub() - relayMeta := relayMetaHub.GetMeta() - - if len(relayMeta.UUID) > 0 { - gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, relayMeta.BinlogGTID) - if err != nil { - return "", terror.ErrRelayLoadMetaData.Delegate(err) - } - if gs.Contain(gset) { - r.tctx.L().Info("get uuid subdir by gtid", zap.Stringer("GTID Set", gset), zap.String("uuid", relayMeta.UUID)) - return relayMeta.UUID, nil - } - } - return "", terror.ErrNoUUIDDirMatchGTID.Generate(gset.String()) -} - -// GetFilePosByGTID tries to get Pos by GTID for file -func (r *BinlogReader) GetFilePosByGTID(ctx context.Context, filePath string, gset mysql.GTIDSet) (uint32, error) { +// IsGTIDCoverPreviousFiles check whether gset contains file's previous_gset +func (r *BinlogReader) IsGTIDCoverPreviousFiles(ctx context.Context, filePath string, gset mysql.GTIDSet) (bool, error) { fileReader := reader.NewFileReader(&reader.FileReaderConfig{Timezone: r.cfg.Timezone}) defer fileReader.Close() err := fileReader.StartSyncByPos(mysql.Position{Name: filePath, Pos: 4}) if err != nil { - return 0, err + return false, err } - lastPos := uint32(0) + var gs gtid.Set + for { select { case <-ctx.Done(): - return 0, nil + return false, nil default: } @@ -179,95 +142,66 @@ func (r *BinlogReader) GetFilePosByGTID(ctx context.Context, filePath string, gs cancel() if err != nil { // reach end of file + // Maybe we can only Parse the first three fakeRotate, Format_desc and Previous_gtids events. if terror.ErrReaderReachEndOfFile.Equal(err) { - return lastPos, nil + return false, terror.ErrPreviousGTIDNotExist.Generate(filePath) } - return 0, err + return false, err } - switch ev := e.Event.(type) { - case *replication.PreviousGTIDsEvent: - // nil previous gtid event, continue to parse file - if len(ev.GTIDSets) == 0 { - break - } - gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, ev.GTIDSets) - if err != nil { - return 0, err - } - // if PreviousGITDsEvent contain but not equal the gset, go to previous file - if gs.Contain(gset) { - // continue to parse file if gset equals gs - if gset.Contain(gs) { - break - } - return 0, nil - } - case *replication.RotateEvent: - // should not happen - if e.Header.Timestamp != 0 && e.Header.LogPos != 0 { - return lastPos, nil - } + if e.Header.EventType == replication.PREVIOUS_GTIDS_EVENT { + gs, err = event.GTIDsFromPreviousGTIDsEvent(e) + } else if e.Header.EventType == replication.MARIADB_GTID_LIST_EVENT { + gs, err = event.GTIDsFromMariaDBGTIDListEvent(e) + } else { continue - case *replication.GTIDEvent: - u, _ := uuid.FromBytes(ev.SID) - gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, fmt.Sprintf("%s:%d", u.String(), ev.GNO)) - if err != nil { - return 0, err - } - // meet first gtid event greater than gset - if !gset.Contain(gs) { - return lastPos, nil - } - case *replication.MariadbGTIDEvent: - GTID := ev.GTID - gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) - if err != nil { - return 0, err - } - // meet first gtid event greater than gset - if !gset.Contain(gs) { - return lastPos, nil - } } - lastPos = e.Header.LogPos + + if err != nil { + return false, err + } + return gset.Contain(gs.Origin()), nil } } -// getPosByGTID gets position by gtid +// getPosByGTID gets file position by gtid, result should be (filename, 4) func (r *BinlogReader) getPosByGTID(gset mysql.GTIDSet) (*mysql.Position, error) { - uuid, err := r.getUUIDByGTID(gset) - if err != nil { - return nil, err - } - _, suffix, err := utils.ParseSuffixForUUID(uuid) - if err != nil { - return nil, err - } - - uuidDir := path.Join(r.cfg.RelayDir, uuid) - allFiles, err := CollectAllBinlogFiles(uuidDir) - if err != nil { - return nil, err - } + // start from newest uuid dir + for i := len(r.uuids) - 1; i >= 0; i-- { + uuid := r.uuids[i] + _, suffix, err := utils.ParseSuffixForUUID(uuid) + if err != nil { + return nil, err + } - // iterate files from the newest one - for i := len(allFiles) - 1; i >= 0; i-- { - file := allFiles[i] - filePath := path.Join(uuidDir, file) - pos, err := r.GetFilePosByGTID(r.tctx.Ctx, filePath, gset) + uuidDir := path.Join(r.cfg.RelayDir, uuid) + allFiles, err := CollectAllBinlogFiles(uuidDir) if err != nil { return nil, err } - if pos != 0 { - fileName, err := binlog.ParseFilename(file) + + // iterate files from the newest one + for i := len(allFiles) - 1; i >= 0; i-- { + file := allFiles[i] + filePath := path.Join(uuidDir, file) + // if input `gset` not contain previous_gtids_event's gset (complementary set of `gset` overlap with + // previous_gtids_event), that means there're some needed events in previous files. + // so we go to previous one + contain, err := r.IsGTIDCoverPreviousFiles(r.tctx.Ctx, filePath, gset) if err != nil { return nil, err } - return &mysql.Position{ - Name: binlog.ConstructFilenameWithUUIDSuffix(fileName, utils.SuffixIntToStr(suffix)), - Pos: uint32(pos), - }, nil + if contain { + fileName, err := binlog.ParseFilename(file) + if err != nil { + return nil, err + } + // Start at the beginning of the file + return &mysql.Position{ + Name: binlog.ConstructFilenameWithUUIDSuffix(fileName, utils.SuffixIntToStr(suffix)), + Pos: 4, + }, nil + } } } return nil, terror.ErrNoRelayPosMatchGTID.Generate(gset.String()) @@ -499,6 +433,7 @@ func (r *BinlogReader) parseFile( uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name latestPos = offset // set to argument passed in + replaceWithHeartbeat := false onEventFunc := func(e *replication.BinlogEvent) error { r.tctx.L().Debug("read event", zap.Reflect("header", e.Header)) @@ -535,9 +470,9 @@ func (r *BinlogReader) parseFile( break } u, _ := uuid.FromBytes(ev.SID) - err2 := r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO)) - if err2 != nil { - return errors.Trace(err2) + replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO)) + if err != nil { + return errors.Trace(err) } latestPos = int64(e.Header.LogPos) case *replication.MariadbGTIDEvent: @@ -546,9 +481,9 @@ func (r *BinlogReader) parseFile( break } GTID := ev.GTID - err2 := r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) - if err2 != nil { - return errors.Trace(err2) + replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) + if err != nil { + return errors.Trace(err) } latestPos = int64(e.Header.LogPos) case *replication.XIDEvent: @@ -562,6 +497,22 @@ func (r *BinlogReader) parseFile( latestPos = int64(e.Header.LogPos) } + // align with MySQL + // if an event's gtid has been contained by given gset + // replace it with HEARTBEAT event + // for Mariadb, it will bee replaced with MARIADB_GTID_LIST_EVENT + // In DM, we replace both of them with HEARTBEAT event + if replaceWithHeartbeat { + switch e.Event.(type) { + // Only replace transaction event + // Other events such as FormatDescriptionEvent, RotateEvent, etc. should be the same as before + case *replication.RowsEvent, *replication.QueryEvent, *replication.GTIDEvent, *replication.XIDEvent, *replication.TableMapEvent: + // replace with heartbeat event + e = event.GenHeartbeatEvent(e.Header) + default: + } + } + select { case s.ch <- e: case <-ctx.Done(): @@ -666,16 +617,31 @@ func (r *BinlogReader) getCurrentGtidSet() mysql.GTIDSet { return r.currGset.Clone() } -func (r *BinlogReader) advanceCurrentGtidSet(gtid string) error { +// advanceCurrentGtidSet advance gtid set and return whether currGset not updated +func (r *BinlogReader) advanceCurrentGtidSet(gtid string) (bool, error) { if r.currGset == nil { r.currGset = r.prevGset.Clone() } + // Special treatment for Maridb + // MaridbGTIDSet.Update(gtid) will replace gset with given gtid + // ref https://github.com/siddontang/go-mysql/blob/0c5789dd0bd378b4b84f99b320a2d35a80d8858f/mysql/mariadb_gtid.go#L96 + if r.cfg.Flavor == mysql.MariaDBFlavor { + gset, err := mysql.ParseMariadbGTIDSet(gtid) + if err != nil { + return false, err + } + if r.currGset.Contain(gset) { + return true, nil + } + } prev := r.currGset.Clone() err := r.currGset.Update(gtid) if err == nil { if !r.currGset.Equal(prev) { r.prevGset = prev + return false, nil } + return true, nil } - return err + return false, err } diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 040916cc28..f9d9e7d44f 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -628,52 +628,58 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { lastGTID gtid.Set previousGset, _ = gtid.ParserGTID(mysql.MySQLFlavor, "") ) - type FileEventType struct { - filename string - eventTypes []replication.EventType + + type EventResult struct { + eventType replication.EventType + result string // filename result of getPosByGTID + } + + type FileEventResult struct { + filename string + eventResults []EventResult } testCase := []struct { - serverUUID string - uuid string - gtidStr string - fileEventTypes []FileEventType + serverUUID string + uuid string + gtidStr string + fileEventResult []FileEventResult }{ { "ba8f633f-1f15-11eb-b1c7-0242ac110002", "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001", "ba8f633f-1f15-11eb-b1c7-0242ac110002:0", - []FileEventType{ + []FileEventResult{ { "mysql.000001", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.ROTATE_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, + {replication.QUERY_EVENT, "mysql|000001.000001"}, + {replication.XID_EVENT, "mysql|000001.000001"}, + {replication.QUERY_EVENT, "mysql|000001.000001"}, + {replication.XID_EVENT, "mysql|000001.000002"}, // next binlog filename + {replication.ROTATE_EVENT, ""}, }, }, { "mysql.000002", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.ROTATE_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, + {replication.QUERY_EVENT, "mysql|000001.000002"}, + {replication.XID_EVENT, "mysql|000001.000002"}, + {replication.QUERY_EVENT, "mysql|000001.000002"}, + {replication.XID_EVENT, "mysql|000001.000003"}, // next binlog filename + {replication.ROTATE_EVENT, ""}, }, }, { "mysql.000003", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, + {replication.QUERY_EVENT, "mysql|000001.000003"}, + {replication.XID_EVENT, "mysql|000001.000003"}, + {replication.QUERY_EVENT, "mysql|000001.000003"}, + {replication.XID_EVENT, "mysql|000002.000001"}, // next subdir }, }, }, @@ -682,31 +688,31 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { "bf6227a7-1f15-11eb-9afb-0242ac110004", "bf6227a7-1f15-11eb-9afb-0242ac110004.000002", "bf6227a7-1f15-11eb-9afb-0242ac110004:20", - []FileEventType{ + []FileEventResult{ { "mysql.000001", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.ROTATE_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, + {replication.QUERY_EVENT, "mysql|000002.000001"}, + {replication.XID_EVENT, "mysql|000002.000001"}, + {replication.QUERY_EVENT, "mysql|000002.000001"}, + {replication.XID_EVENT, "mysql|000002.000002"}, + {replication.ROTATE_EVENT, ""}, }, }, { "mysql.000002", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.ROTATE_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, + {replication.QUERY_EVENT, "mysql|000002.000002"}, + {replication.XID_EVENT, "mysql|000002.000002"}, + {replication.QUERY_EVENT, "mysql|000002.000002"}, + {replication.XID_EVENT, "mysql|000003.000001"}, + {replication.ROTATE_EVENT, ""}, }, }, { "mysql.000003", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, }, }, }, @@ -714,15 +720,15 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { "bcbf9d42-1f15-11eb-a41c-0242ac110003", "bcbf9d42-1f15-11eb-a41c-0242ac110003.000003", "bcbf9d42-1f15-11eb-a41c-0242ac110003:30", - []FileEventType{ + []FileEventResult{ { "mysql.000001", - []replication.EventType{ - replication.PREVIOUS_GTIDS_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, - replication.QUERY_EVENT, - replication.XID_EVENT, + []EventResult{ + {replication.PREVIOUS_GTIDS_EVENT, ""}, + {replication.QUERY_EVENT, "mysql|000003.000001"}, + {replication.XID_EVENT, "mysql|000003.000001"}, + {replication.QUERY_EVENT, "mysql|000003.000001"}, + {replication.XID_EVENT, "mysql|000003.000001"}, }, }, }, @@ -739,12 +745,10 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(err, IsNil) var allEvents []*replication.BinlogEvent + var allResults []string - // prePosMap record previous uuid's last file's size - // when master switch, we get the previous uuid's last pos now - prePosMap := make(map[string]uint32) + // generate binlog file for _, subDir := range testCase { - prePosMap[subDir.serverUUID] = lastPos lastPos = 4 lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, subDir.gtidStr) c.Assert(err, IsNil) @@ -752,12 +756,21 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { err = os.MkdirAll(uuidDir, 0700) c.Assert(err, IsNil) - for _, fileEventType := range subDir.fileEventTypes { - events, lastPos, lastGTID, previousGset = t.genEvents(c, fileEventType.eventTypes, lastPos, lastGTID, previousGset) + for _, fileEventResult := range subDir.fileEventResult { + eventTypes := []replication.EventType{} + for _, eventResult := range fileEventResult.eventResults { + eventTypes = append(eventTypes, eventResult.eventType) + if len(eventResult.result) != 0 { + allResults = append(allResults, eventResult.result) + } + } + + // generate events + events, lastPos, lastGTID, previousGset = t.genEvents(c, eventTypes, lastPos, lastGTID, previousGset) allEvents = append(allEvents, events...) // write binlog file - f, err2 := os.OpenFile(path.Join(uuidDir, fileEventType.filename), os.O_CREATE|os.O_WRONLY, 0600) + f, err2 := os.OpenFile(path.Join(uuidDir, fileEventResult.filename), os.O_CREATE|os.O_WRONLY, 0600) c.Assert(err2, IsNil) _, err = f.Write(replication.BinLogFileHeader) c.Assert(err, IsNil) @@ -768,7 +781,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { f.Close() // write meta file - meta := Meta{BinLogName: fileEventType.filename, BinLogPos: lastPos, BinlogGTID: previousGset.String()} + meta := Meta{BinLogName: fileEventResult.filename, BinLogPos: lastPos, BinlogGTID: previousGset.String()} metaFile, err2 := os.Create(path.Join(uuidDir, utils.MetaFilename)) c.Assert(err2, IsNil) err = toml.NewEncoder(metaFile).Encode(meta) @@ -788,45 +801,117 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { for { ev, err2 := s.GetEvent(ctx) c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { + if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { continue // ignore fake event } obtainBaseEvents = append(obtainBaseEvents, ev) - // start after the first nil previous event - if len(obtainBaseEvents) == len(allEvents)-1 { + // start from the first format description event + if len(obtainBaseEvents) == len(allEvents) { break } } - preGset, err := gtid.ParserGTID(mysql.MySQLFlavor, "") + preGset, err := mysql.ParseGTIDSet(mysql.MySQLFlavor, "") c.Assert(err, IsNil) - // allEvents: [FORMAT_DESCRIPTION_EVENT, PREVIOUS_GTIDS_EVENT, GTID_EVENT, QUERY_EVENT...] - // obtainBaseEvents: [FORMAT_DESCRIPTION_EVENT(generated), GTID_EVENT, QUERY_EVENT...] + + gtidEventCount := 0 for i, event := range obtainBaseEvents { - if i == 0 { - c.Assert(event.Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT) - continue - } - c.Assert(event.Header, DeepEquals, allEvents[i+1].Header) - switch ev := event.Event.(type) { - case *replication.GTIDEvent: - pos, err2 := r.getPosByGTID(preGset.Origin().Clone()) + c.Assert(event.Header, DeepEquals, allEvents[i].Header) + if ev, ok := event.Event.(*replication.GTIDEvent); ok { u, _ := uuid.FromBytes(ev.SID) + c.Assert(preGset.Update(fmt.Sprintf("%s:%d", u.String(), ev.GNO)), IsNil) + + // get pos by preGset + pos, err2 := r.getPosByGTID(preGset.Clone()) c.Assert(err2, IsNil) - // new uuid dir - if len(preGset.String()) != 0 && !strings.Contains(preGset.String(), u.String()) { - c.Assert(pos.Pos, Equals, prePosMap[u.String()], Commentf("a %d", i)) - } else { - c.Assert(pos.Pos, Equals, event.Header.LogPos-event.Header.EventSize, Commentf("b %d", i)) - } - case *replication.QueryEvent: - err2 := preGset.Set(ev.GSet) - c.Assert(err2, IsNil) - case *replication.XIDEvent: - err2 := preGset.Set(ev.GSet) - c.Assert(err2, IsNil) + // check result + c.Assert(pos.Name, Equals, allResults[gtidEventCount]) + c.Assert(pos.Pos, Equals, uint32(4)) + gtidEventCount++ + } + } + + r.Close() + r = NewBinlogReader(log.L(), cfg) + + excludeStrs := []string{} + // exclude first uuid + excludeServerUUID := testCase[0].serverUUID + excludeUUID := testCase[0].uuid + for _, s := range strings.Split(preGset.String(), ",") { + if !strings.Contains(s, excludeServerUUID) { + excludeStrs = append(excludeStrs, s) + } + } + excludeStr := strings.Join(excludeStrs, ",") + excludeGset, err := mysql.ParseGTIDSet(mysql.MySQLFlavor, excludeStr) + c.Assert(err, IsNil) + + // StartSyncByGtid exclude first uuid + s, err = r.StartSyncByGTID(excludeGset) + c.Assert(err, IsNil) + obtainBaseEvents = []*replication.BinlogEvent{} + + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { + continue // ignore fake event + } + obtainBaseEvents = append(obtainBaseEvents, ev) + // start from the first format description event + if len(obtainBaseEvents) == len(allEvents) { + break + } + } + + gset := excludeGset.Clone() + // every gtid event not from first uuid should become heartbeat event + for i, event := range obtainBaseEvents { + switch event.Header.EventType { + case replication.HEARTBEAT_EVENT: + c.Assert(event.Header.LogPos, Equals, allEvents[i].Header.LogPos) + case replication.GTID_EVENT: + // check gtid event comes from first uuid subdir + ev, _ := event.Event.(*replication.GTIDEvent) + u, _ := uuid.FromBytes(ev.SID) + c.Assert(u.String(), Equals, excludeServerUUID) + c.Assert(event.Header, DeepEquals, allEvents[i].Header) + c.Assert(gset.Update(fmt.Sprintf("%s:%d", u.String(), ev.GNO)), IsNil) + default: + c.Assert(event.Header, DeepEquals, allEvents[i].Header) } } + // gset same as preGset now + c.Assert(gset.Equal(preGset), IsTrue) + + // purge first uuid subdir's first binlog file + c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil) + + r.Close() + r = NewBinlogReader(log.L(), cfg) + _, err = r.StartSyncByGTID(preGset) + c.Assert(err, IsNil) + + r.Close() + r = NewBinlogReader(log.L(), cfg) + _, err = r.StartSyncByGTID(excludeGset) + // error because file has been purge + c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue) + + // purge first uuid subdir + c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil) + + r.Close() + r = NewBinlogReader(log.L(), cfg) + _, err = r.StartSyncByGTID(preGset) + c.Assert(err, IsNil) + + r.Close() + r = NewBinlogReader(log.L(), cfg) + _, err = r.StartSyncByGTID(excludeGset) + // error because subdir has been purge + c.Assert(err, ErrorMatches, ".*no such file or directory.*") } func (t *testReaderSuite) TestStartSyncError(c *C) { @@ -854,7 +939,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) s, err = r.StartSyncByGTID(t.lastGTID.Origin().Clone()) - c.Assert(err, ErrorMatches, ".*no relay subdir match gtid.*") + c.Assert(err, ErrorMatches, ".*no relay pos match gtid.*") c.Assert(s, IsNil) // write UUIDs into index file @@ -869,7 +954,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) s, err = r.StartSyncByGTID(t.lastGTID.Origin().Clone()) - c.Assert(err, ErrorMatches, ".*load meta data.*no such file or directory.*") + c.Assert(err, ErrorMatches, ".*no such file or directory.*") c.Assert(s, IsNil) // can not re-start the reader @@ -899,6 +984,40 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) } +func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { + var ( + baseDir = c.MkDir() + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} + r = NewBinlogReader(log.L(), cfg) + mysqlGset, _ = mysql.ParseMysqlGTIDSet("b60868af-5a6f-11e9-9ea3-0242ac160006:1-6") + mariadbGset, _ = mysql.ParseMariadbGTIDSet("0-1-5") + ) + r.prevGset = mysqlGset.Clone() + r.currGset = nil + notUpdated, err := r.advanceCurrentGtidSet("b60868af-5a6f-11e9-9ea3-0242ac160006:6") + c.Assert(err, IsNil) + c.Assert(notUpdated, IsTrue) + c.Assert(mysqlGset.Equal(r.currGset), IsTrue) + notUpdated, err = r.advanceCurrentGtidSet("b60868af-5a6f-11e9-9ea3-0242ac160006:7") + c.Assert(err, IsNil) + c.Assert(notUpdated, IsFalse) + c.Assert(mysqlGset.Equal(r.prevGset), IsTrue) + c.Assert(r.currGset.String(), Equals, "b60868af-5a6f-11e9-9ea3-0242ac160006:1-7") + + r.cfg.Flavor = mysql.MariaDBFlavor + r.prevGset = mariadbGset.Clone() + r.currGset = nil + notUpdated, err = r.advanceCurrentGtidSet("0-1-3") + c.Assert(err, IsNil) + c.Assert(notUpdated, IsTrue) + c.Assert(mariadbGset.Equal(r.currGset), IsTrue) + notUpdated, err = r.advanceCurrentGtidSet("0-1-6") + c.Assert(err, IsNil) + c.Assert(notUpdated, IsFalse) + c.Assert(mariadbGset.Equal(r.prevGset), IsTrue) + c.Assert(r.currGset.String(), Equals, "0-1-6") +} + func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32, latestGTID gtid.Set) ([]*replication.BinlogEvent, uint32, gtid.Set) { var ( header = &replication.EventHeader{ diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 7612e28b41..c43b18e96a 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -182,6 +182,9 @@ const ( // pkg/dumpling codeMetadataNoBinlogLoc + + // pkg/streamer + codePreviousGTIDNotExist ) // Config related error code list @@ -780,6 +783,8 @@ var ( // pkg/dumplling ErrMetadataNoBinlogLoc = New(codeMetadataNoBinlogLoc, ClassFunctional, ScopeUpstream, LevelLow, "didn't found binlog location in dumped metadata file %s", "Please check log of dump unit, there maybe errors when read upstream binlog status") + ErrPreviousGTIDNotExist = New(codePreviousGTIDNotExist, ClassFunctional, ScopeInternal, LevelHigh, "no previous gtid event from binlog %s", "") + // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s", "Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`.") ErrConfigTomlTransform = New(codeConfigTomlTransform, ClassConfig, ScopeInternal, LevelMedium, "%s", "Please check the configuration file has correct TOML format.") diff --git a/relay/relay.go b/relay/relay.go index a45ed0acdb..12b1fd19cf 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" + binlogReader "github.com/pingcap/dm/pkg/binlog/reader" "github.com/pingcap/dm/pkg/conn" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" @@ -565,17 +566,27 @@ func (r *Relay) reSetupMeta(ctx context.Context) error { } // try adjust meta with start pos from config - adjusted, err := r.meta.AdjustWithStartPos(r.cfg.BinLogName, r.cfg.BinlogGTID, r.cfg.EnableGTID, latestPosName, latestGTIDStr) + _, err = r.meta.AdjustWithStartPos(r.cfg.BinLogName, r.cfg.BinlogGTID, r.cfg.EnableGTID, latestPosName, latestGTIDStr) if err != nil { return err } - if adjusted { - _, pos := r.meta.Pos() - _, gtid := r.meta.GTID() - r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gtid)) + _, pos := r.meta.Pos() + _, gtid := r.meta.GTID() + if r.cfg.EnableGTID { + // Adjust given gtid + // This means we always pull the binlog from the beginning of file. + gtid, err = r.adjustGTID(ctx, gtid) + if err != nil { + return terror.Annotate(err, "fail to adjust gtid for relay") + } + err = r.SaveMeta(pos, gtid) + if err != nil { + return err + } } + r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gtid)) r.updateMetricsRelaySubDirIndex() return nil @@ -909,3 +920,17 @@ func (r *Relay) setSyncConfig() error { r.syncerCfg = syncerCfg return nil } + +// AdjustGTID implements Relay.AdjustGTID +func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) { + // setup a TCP binlog reader (because no relay can be used when upgrading). + syncCfg := r.syncerCfg + randomServerID, err := utils.ReuseServerID(ctx, r.cfg.ServerID, r.db) + if err != nil { + return nil, terror.Annotate(err, "fail to get random server id when relay adjust gtid") + } + syncCfg.ServerID = randomServerID + + tcpReader := binlogReader.NewTCPReader(syncCfg) + return binlogReader.GetPreviousGTIDFromGTIDSet(ctx, tcpReader, gset) +} diff --git a/relay/relay_test.go b/relay/relay_test.go index 4aa5517b98..3816e66f0d 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -511,25 +511,31 @@ func (t *testRelaySuite) TestReSetupMeta(c *C) { r.cfg.EnableGTID = true r.cfg.BinlogGTID = "24ecd093-8cec-11e9-aa0d-0242ac170002:1-23" r.cfg.BinLogName = "mysql-bin.000005" + + c.Assert(r.setSyncConfig(), IsNil) + // all adjusted gset should be empty since we didn't flush logs + emptyGTID, err := gtid.ParserGTID(r.cfg.Flavor, "") + c.Assert(err, IsNil) + c.Assert(r.reSetupMeta(ctx), IsNil) uuid001 := fmt.Sprintf("%s.000001", uuid) - t.verifyMetadata(c, r, uuid001, gmysql.Position{Name: r.cfg.BinLogName, Pos: 4}, r.cfg.BinlogGTID, []string{uuid001}) + t.verifyMetadata(c, r, uuid001, gmysql.Position{Name: r.cfg.BinLogName, Pos: 4}, emptyGTID.String(), []string{uuid001}) // re-setup meta again, often happen when connecting a server behind a VIP. c.Assert(r.reSetupMeta(ctx), IsNil) uuid002 := fmt.Sprintf("%s.000002", uuid) - t.verifyMetadata(c, r, uuid002, minCheckpoint, r.cfg.BinlogGTID, []string{uuid001, uuid002}) + t.verifyMetadata(c, r, uuid002, minCheckpoint, emptyGTID.String(), []string{uuid001, uuid002}) r.cfg.BinLogName = "mysql-bin.000002" r.cfg.BinlogGTID = "24ecd093-8cec-11e9-aa0d-0242ac170002:1-50,24ecd093-8cec-11e9-aa0d-0242ac170003:1-50" r.cfg.UUIDSuffix = 2 c.Assert(r.reSetupMeta(ctx), IsNil) - t.verifyMetadata(c, r, uuid002, gmysql.Position{Name: r.cfg.BinLogName, Pos: 4}, r.cfg.BinlogGTID, []string{uuid002}) + t.verifyMetadata(c, r, uuid002, gmysql.Position{Name: r.cfg.BinLogName, Pos: 4}, emptyGTID.String(), []string{uuid002}) // re-setup meta again, often happen when connecting a server behind a VIP. c.Assert(r.reSetupMeta(ctx), IsNil) uuid003 := fmt.Sprintf("%s.000003", uuid) - t.verifyMetadata(c, r, uuid003, minCheckpoint, r.cfg.BinlogGTID, []string{uuid002, uuid003}) + t.verifyMetadata(c, r, uuid003, minCheckpoint, emptyGTID.String(), []string{uuid002, uuid003}) } func (t *testRelaySuite) verifyMetadata(c *C, r *Relay, uuidExpected string, diff --git a/relay/transformer/transformer.go b/relay/transformer/transformer.go index 6682883212..f164ce823f 100644 --- a/relay/transformer/transformer.go +++ b/relay/transformer/transformer.go @@ -69,6 +69,8 @@ func (t *transformer) Transform(e *replication.BinlogEvent) Result { switch ev := e.Event.(type) { case *replication.PreviousGTIDsEvent: result.CanSaveGTID = true + case *replication.MariadbGTIDListEvent: + result.CanSaveGTID = true case *replication.RotateEvent: result.LogPos = uint32(ev.Position) // next event's position result.NextLogName = string(ev.NextLogName) // for RotateEvent, update binlog name