Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/codec(ticdc): remove useless MixedBuild and Reset funcs #4653

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,6 @@ func (a *AvroEventBatchEncoder) Build() (mqMessages []*MQMessage) {
return old
}

// MixedBuild implements the EventBatchEncoder interface
func (a *AvroEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Mixed Build only use for JsonEncoder")
}

// Reset implements the EventBatchEncoder interface
func (a *AvroEventBatchEncoder) Reset() {
panic("Reset only used for JsonEncoder")
}

// Size is the current size of resultBuf
func (a *AvroEventBatchEncoder) Size() int {
if a.resultBuf == nil {
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,6 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
return []*MQMessage{ret}
}

// MixedBuild implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Mixed Build only use for JsonEncoder")
}

// Size implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Size() int {
// TODO: avoid marshaling the messages every time for calculating the size of the packet
Expand All @@ -488,11 +483,6 @@ func (d *CanalEventBatchEncoder) Size() int {
return proto.Size(d.packet)
}

// Reset implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Reset() {
panic("Reset only used for JsonEncoder")
}

// SetParams is no-op for now
func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error {
// no op
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,21 +354,11 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
return ret
}

// MixedBuild is not used here
func (c *CanalFlatEventBatchEncoder) MixedBuild(_ bool) []byte {
panic("MixedBuild not supported by CanalFlatEventBatchEncoder")
}

// Size implements the EventBatchEncoder interface
func (c *CanalFlatEventBatchEncoder) Size() int {
return -1
}

// Reset is only supported by JSONEventBatchEncoder
func (c *CanalFlatEventBatchEncoder) Reset() {
panic("not supported")
}

// SetParams sets the encoding parameters for the canal flat protocol.
func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error {
if s, ok := params["enable-tidb-extension"]; ok {
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,11 @@ func (e *CraftEventBatchEncoder) Build() []*MQMessage {
return ret
}

// MixedBuild implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Only JsonEncoder supports mixed build")
}

// Size implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Size() int {
return e.rowChangedBuffer.Size()
}

// Reset implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Reset() {
e.rowChangedBuffer.Reset()
}

// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,8 @@ type EventBatchEncoder interface {
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Build() []*MQMessage
// MixedBuild builds the batch and returns the bytes of mixed keys and values.
// This is used for cdc log, to merge key and value into one byte slice
// when first create file, we should set withVersion to true, to tell us that
// the first 8 byte represents the encoder version
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
Size() int
// Reset reset the kv buffer
Reset()
// SetParams provides the encoder with more info on the sink
SetParams(params map[string]string) error
}
Expand Down
159 changes: 32 additions & 127 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,6 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent

// JSONEventBatchEncoder encodes the events into the byte of a batch into.
type JSONEventBatchEncoder struct {
// TODO remove deprecated fields
keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
supportMixedBuild bool // TODO decouple this out

messageBuf []*MQMessage
curBatchSize int
// configs
Expand All @@ -395,11 +390,6 @@ func (d *JSONEventBatchEncoder) GetMaxBatchSize() int {
return d.maxBatchSize
}

// SetMixedBuildSupport is used by CDC Log
func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) {
d.supportMixedBuild = enabled
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
keyMsg := newResolvedMessage(ts)
Expand All @@ -413,13 +403,6 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], 0)

if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
d.valueBuf.Write(valueLenByte[:])
return nil, nil
}

keyBuf := new(bytes.Buffer)
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
Expand Down Expand Up @@ -451,50 +434,42 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
} else {
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}
if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.curBatchSize = 0
}

d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.curBatchSize = 0
}
message := d.messageBuf[len(d.messageBuf)-1]
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

message := d.messageBuf[len(d.messageBuf)-1]
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
return nil
}

Expand All @@ -515,14 +490,6 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
return nil, nil
}

keyBuf := new(bytes.Buffer)
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
Expand All @@ -540,70 +507,14 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e

// Build implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.supportMixedBuild {
if d.valueBuf.Len() == 0 {
return nil
}
/* there could be multiple types of event encoded within a single message which means the type is not sure */
ret := NewMQMessage(config.ProtocolOpen, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil)
return []*MQMessage{ret}
}

ret := d.messageBuf
d.messageBuf = make([]*MQMessage, 0)
return ret
}

// MixedBuild implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte {
if !d.supportMixedBuild {
log.Panic("mixedBuildSupport not enabled!")
return nil
}
keyBytes := d.keyBuf.Bytes()
valueBytes := d.valueBuf.Bytes()
mixedBytes := make([]byte, len(keyBytes)+len(valueBytes))

index := uint64(0)
keyIndex := uint64(0)
valueIndex := uint64(0)

if withVersion {
// the first 8 bytes is the version, we should copy directly
// then skip 8 bytes for next round key value parse
copy(mixedBytes[:8], keyBytes[:8])
index = uint64(8) // skip version
keyIndex = uint64(8) // skip version
}

for {
if keyIndex >= uint64(len(keyBytes)) {
break
}
keyLen := binary.BigEndian.Uint64(keyBytes[keyIndex : keyIndex+8])
offset := keyLen + 8
copy(mixedBytes[index:index+offset], keyBytes[keyIndex:keyIndex+offset])
keyIndex += offset
index += offset

valueLen := binary.BigEndian.Uint64(valueBytes[valueIndex : valueIndex+8])
offset = valueLen + 8
copy(mixedBytes[index:index+offset], valueBytes[valueIndex:valueIndex+offset])
valueIndex += offset
index += offset
}
return mixedBytes
}

// Size implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Size() int {
return d.keyBuf.Len() + d.valueBuf.Len()
}

// Reset implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Reset() {
d.keyBuf.Reset()
d.valueBuf.Reset()
return -1
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
}

// SetParams reads relevant parameters for Open Protocol
Expand Down Expand Up @@ -657,13 +568,7 @@ func newJSONEventBatchEncoderBuilder(opts map[string]string) EncoderBuilder {

// NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder.
func NewJSONEventBatchEncoder() EventBatchEncoder {
batch := &JSONEventBatchEncoder{
keyBuf: &bytes.Buffer{},
valueBuf: &bytes.Buffer{},
}
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
batch.keyBuf.Write(versionByte[:])
batch := &JSONEventBatchEncoder{}
return batch
}

Expand Down
36 changes: 0 additions & 36 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder := newEncoder()
mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for _, row := range cs {
err := encoder.AppendRowChangedEvent(row)
c.Assert(err, check.IsNil)

err = mixedEncoder.AppendRowChangedEvent(row)
c.Assert(err, check.IsNil)
}
// test mixed decode
mixed := mixedEncoder.MixedBuild(true)
c.Assert(len(mixed), check.Equals, mixedEncoder.Size())
mixedDecoder, err := newDecoder(mixed, nil)
c.Assert(err, check.IsNil)
checkRowDecoder(mixedDecoder, cs)
// test normal decode
if len(cs) > 0 {
res := encoder.Build()
c.Assert(res, check.HasLen, 1)
Expand All @@ -145,11 +129,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco

for _, cs := range s.ddlCases {
encoder := newEncoder()
mixedEncoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for i, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
c.Assert(err, check.IsNil)
Expand All @@ -158,17 +140,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
c.Assert(err, check.IsNil)
checkDDLDecoder(decoder, cs[i:i+1])

msg, err = mixedEncoder.EncodeDDLEvent(ddl)
c.Assert(msg, check.IsNil)
c.Assert(err, check.IsNil)
}

// test mixed encode
mixed := mixedEncoder.MixedBuild(true)
c.Assert(len(mixed), check.Equals, mixedEncoder.Size())
mixedDecoder, err := newDecoder(mixed, nil)
c.Assert(err, check.IsNil)
checkDDLDecoder(mixedDecoder, cs)
}

for _, cs := range s.resolvedTsCases {
Expand All @@ -177,7 +149,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for i, ts := range cs {
msg, err := encoder.EncodeCheckpointEvent(ts)
c.Assert(err, check.IsNil)
Expand All @@ -190,13 +161,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
c.Assert(msg, check.IsNil)
c.Assert(err, check.IsNil)
}

// test mixed encode
mixed := mixedEncoder.MixedBuild(true)
c.Assert(len(mixed), check.Equals, mixedEncoder.Size())
mixedDecoder, err := newDecoder(mixed, nil)
c.Assert(err, check.IsNil)
checkTSDecoder(mixedDecoder, cs)
}
}

Expand Down
Loading