Skip to content

Commit

Permalink
Merge pull request #737 from GMHDBJD/addRowsEventDecodeFunc
Browse files Browse the repository at this point in the history
parser: allow user-defined rows_event decode func
  • Loading branch information
lance6716 authored Nov 16, 2022
2 parents 0cba5f5 + b59a00a commit 49d58c4
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
3 changes: 3 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type BinlogSyncerConfig struct {

// Set Dialer
Dialer client.Dialer

RowsEventDecodeFunc func(*RowsEvent, []byte) error
}

// BinlogSyncer syncs binlog event from server.
Expand Down Expand Up @@ -176,6 +178,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
b.running = false
b.ctx, b.cancel = context.WithCancel(context.Background())

Expand Down
14 changes: 13 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type BinlogParser struct {
useDecimal bool
ignoreJSONDecodeErr bool
verifyChecksum bool

rowsEventDecodeFunc func(*RowsEvent, []byte) error
}

func NewBinlogParser() *BinlogParser {
Expand Down Expand Up @@ -212,6 +214,10 @@ func (p *BinlogParser) SetFlavor(flavor string) {
p.flavor = flavor
}

func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) {
p.rowsEventDecodeFunc = rowsEventDecodeFunc
}

func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
Expand Down Expand Up @@ -297,7 +303,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
}
}

if err := e.Decode(data); err != nil {
var err error
if re, ok := e.(*RowsEvent); ok && p.rowsEventDecodeFunc != nil {
err = p.rowsEventDecodeFunc(re, data)
} else {
err = e.Decode(data)
}
if err != nil {
return nil, &EventError{h, err.Error(), data}
}

Expand Down
32 changes: 32 additions & 0 deletions replication/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,35 @@ func (t *testSyncerSuite) TestParseEvent(c *C) {
c.Assert(err2, IsNil)
}
}

func (t *testSyncerSuite) TestRowsEventDecodeFunc(c *C) {
testCases := []struct {
byteData []byte
eventSize uint32
eventType EventType
}{
// FORMAT_DESCRIPTION_EVENT
{[]byte{0x64, 0x61, 0x72, 0x63, 0xf, 0xb, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0x7b, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x37, 0x2e, 0x32, 0x32, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x61, 0x72, 0x63, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5f, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x2a, 0x2a, 0x0, 0x12, 0x34, 0x0, 0x1, 0xb8, 0x78, 0x9d, 0xfe}, uint32(119), FORMAT_DESCRIPTION_EVENT},
// TABLE MAP EVENT tb(INT)
{[]byte{0x8d, 0x61, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2c, 0x0, 0x0, 0x0, 0xa7, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x64, 0x62, 0x0, 0x3, 0x74, 0x62, 0x6c, 0x0, 0x1, 0x3, 0x0, 0x0, 0x63, 0x17, 0xe6, 0xf0}, uint32(44), TABLE_MAP_EVENT},
// rows INT(1)
{[]byte{0xb6, 0x61, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x28, 0x0, 0x0, 0x0, 0xcf, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0xf9, 0xf7, 0x89, 0x2a}, uint32(40), WRITE_ROWS_EVENTv2},
// TABLE MAP EVENT tb(TINY)
{[]byte{0x22, 0x6c, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2e, 0x0, 0x0, 0x0, 0xfd, 0x0, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x3, 0x64, 0x62, 0x31, 0x0, 0x4, 0x74, 0x62, 0x6c, 0x31, 0x0, 0x1, 0x1, 0x0, 0x0, 0x32, 0xec, 0x2f, 0x4}, uint32(46), TABLE_MAP_EVENT},
// rows LONG(1)
// panic if not set rows event decode func
{[]byte{0xeb, 0x64, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x2d, 0x0, 0x0, 0x0, 0x2a, 0x1, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x6e, 0xef, 0xb2, 0xb1}, uint32(45), WRITE_ROWS_EVENTv2},
}

parser := NewBinlogParser()
parser.SetRowsEventDecodeFunc(func(re *RowsEvent, bs []byte) error {
_, err := re.DecodeHeader(bs)
return err
})
for _, tc := range testCases {
e, err := parser.Parse(tc.byteData)
c.Assert(err, IsNil)
c.Assert(e.Header.EventType, Equals, tc.eventType)
c.Assert(e.Header.EventSize, Equals, tc.eventSize)
}
}
23 changes: 18 additions & 5 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ type RowsEvent struct {
ignoreJSONDecodeErr bool
}

func (e *RowsEvent) Decode(data []byte) (err2 error) {
func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
pos := 0
e.TableID = FixedLengthInt(data[0:e.tableIDSize])
pos += e.tableIDSize
Expand Down Expand Up @@ -890,14 +890,19 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) {
e.Table, ok = e.tables[e.TableID]
if !ok {
if len(e.tables) > 0 {
return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
return 0, errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
} else {
return errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
return 0, errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
}
}
return pos, nil
}

var err error

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
var (
n int
err error
)
// ... repeat rows until event-end
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -932,6 +937,14 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) {
return nil
}

func (e *RowsEvent) Decode(data []byte) error {
pos, err := e.DecodeHeader(data)
if err != nil {
return err
}
return e.DecodeData(pos, data)
}

func isBitSet(bitmap []byte, i int) bool {
return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
}
Expand Down

0 comments on commit 49d58c4

Please sign in to comment.