Skip to content

Commit

Permalink
sink/codec(ticdc): remove useless MixedBuild and Reset funcs (#4653)
Browse files Browse the repository at this point in the history
ref #4423
  • Loading branch information
Rustin170506 authored Feb 22, 2022
1 parent 8bf77f1 commit d4dbf71
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 217 deletions.
10 changes: 0 additions & 10 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,6 @@ func (a *AvroEventBatchEncoder) Build() (mqMessages []*MQMessage) {
return old
}

// MixedBuild implements the EventBatchEncoder interface
func (a *AvroEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Mixed Build only use for JsonEncoder")
}

// Reset implements the EventBatchEncoder interface
func (a *AvroEventBatchEncoder) Reset() {
panic("Reset only used for JsonEncoder")
}

// Size is the current size of resultBuf
func (a *AvroEventBatchEncoder) Size() int {
if a.resultBuf == nil {
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,6 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
return []*MQMessage{ret}
}

// MixedBuild implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Mixed Build only use for JsonEncoder")
}

// Size implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Size() int {
// TODO: avoid marshaling the messages every time for calculating the size of the packet
Expand All @@ -488,11 +483,6 @@ func (d *CanalEventBatchEncoder) Size() int {
return proto.Size(d.packet)
}

// Reset implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Reset() {
panic("Reset only used for JsonEncoder")
}

// SetParams is no-op for now
func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error {
// no op
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,21 +354,11 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
return ret
}

// MixedBuild is not used here
func (c *CanalFlatEventBatchEncoder) MixedBuild(_ bool) []byte {
panic("MixedBuild not supported by CanalFlatEventBatchEncoder")
}

// Size implements the EventBatchEncoder interface
func (c *CanalFlatEventBatchEncoder) Size() int {
return -1
}

// Reset is only supported by JSONEventBatchEncoder
func (c *CanalFlatEventBatchEncoder) Reset() {
panic("not supported")
}

// SetParams sets the encoding parameters for the canal flat protocol.
func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error {
if s, ok := params["enable-tidb-extension"]; ok {
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,11 @@ func (e *CraftEventBatchEncoder) Build() []*MQMessage {
return ret
}

// MixedBuild implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Only JsonEncoder supports mixed build")
}

// Size implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Size() int {
return e.rowChangedBuffer.Size()
}

// Reset implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Reset() {
e.rowChangedBuffer.Reset()
}

// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,8 @@ type EventBatchEncoder interface {
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Build() []*MQMessage
// MixedBuild builds the batch and returns the bytes of mixed keys and values.
// This is used for cdc log, to merge key and value into one byte slice
// when first create file, we should set withVersion to true, to tell us that
// the first 8 byte represents the encoder version
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
Size() int
// Reset reset the kv buffer
Reset()
// SetParams provides the encoder with more info on the sink
SetParams(params map[string]string) error
}
Expand Down
159 changes: 32 additions & 127 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,6 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent

// JSONEventBatchEncoder encodes the events into the byte of a batch into.
type JSONEventBatchEncoder struct {
// TODO remove deprecated fields
keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
supportMixedBuild bool // TODO decouple this out

messageBuf []*MQMessage
curBatchSize int
// configs
Expand All @@ -395,11 +390,6 @@ func (d *JSONEventBatchEncoder) GetMaxBatchSize() int {
return d.maxBatchSize
}

// SetMixedBuildSupport is used by CDC Log
func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) {
d.supportMixedBuild = enabled
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
keyMsg := newResolvedMessage(ts)
Expand All @@ -413,13 +403,6 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], 0)

if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
d.valueBuf.Write(valueLenByte[:])
return nil, nil
}

keyBuf := new(bytes.Buffer)
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
Expand Down Expand Up @@ -451,50 +434,42 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
} else {
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}
if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.curBatchSize = 0
}

d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.curBatchSize = 0
}
message := d.messageBuf[len(d.messageBuf)-1]
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

message := d.messageBuf[len(d.messageBuf)-1]
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
return nil
}

Expand All @@ -515,14 +490,6 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
return nil, nil
}

keyBuf := new(bytes.Buffer)
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
Expand All @@ -540,70 +507,14 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e

// Build implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.supportMixedBuild {
if d.valueBuf.Len() == 0 {
return nil
}
/* there could be multiple types of event encoded within a single message which means the type is not sure */
ret := NewMQMessage(config.ProtocolOpen, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil)
return []*MQMessage{ret}
}

ret := d.messageBuf
d.messageBuf = make([]*MQMessage, 0)
return ret
}

// MixedBuild implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte {
if !d.supportMixedBuild {
log.Panic("mixedBuildSupport not enabled!")
return nil
}
keyBytes := d.keyBuf.Bytes()
valueBytes := d.valueBuf.Bytes()
mixedBytes := make([]byte, len(keyBytes)+len(valueBytes))

index := uint64(0)
keyIndex := uint64(0)
valueIndex := uint64(0)

if withVersion {
// the first 8 bytes is the version, we should copy directly
// then skip 8 bytes for next round key value parse
copy(mixedBytes[:8], keyBytes[:8])
index = uint64(8) // skip version
keyIndex = uint64(8) // skip version
}

for {
if keyIndex >= uint64(len(keyBytes)) {
break
}
keyLen := binary.BigEndian.Uint64(keyBytes[keyIndex : keyIndex+8])
offset := keyLen + 8
copy(mixedBytes[index:index+offset], keyBytes[keyIndex:keyIndex+offset])
keyIndex += offset
index += offset

valueLen := binary.BigEndian.Uint64(valueBytes[valueIndex : valueIndex+8])
offset = valueLen + 8
copy(mixedBytes[index:index+offset], valueBytes[valueIndex:valueIndex+offset])
valueIndex += offset
index += offset
}
return mixedBytes
}

// Size implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Size() int {
return d.keyBuf.Len() + d.valueBuf.Len()
}

// Reset implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Reset() {
d.keyBuf.Reset()
d.valueBuf.Reset()
return -1
}

// SetParams reads relevant parameters for Open Protocol
Expand Down Expand Up @@ -657,13 +568,7 @@ func newJSONEventBatchEncoderBuilder(opts map[string]string) EncoderBuilder {

// NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder.
func NewJSONEventBatchEncoder() EventBatchEncoder {
batch := &JSONEventBatchEncoder{
keyBuf: &bytes.Buffer{},
valueBuf: &bytes.Buffer{},
}
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
batch.keyBuf.Write(versionByte[:])
batch := &JSONEventBatchEncoder{}
return batch
}

Expand Down
Loading

0 comments on commit d4dbf71

Please sign in to comment.