From 437350dc7d222631aeea90756005ba87812c2199 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 22 Feb 2022 16:23:21 +0800 Subject: [PATCH 1/3] sink/codec(ticdc): remove useless MixedBuild and Reset funcs --- cdc/sink/codec/avro.go | 10 --- cdc/sink/codec/canal.go | 10 --- cdc/sink/codec/canal_flat.go | 10 --- cdc/sink/codec/craft.go | 10 --- cdc/sink/codec/interface.go | 8 -- cdc/sink/codec/json.go | 159 +++++++---------------------------- cdc/sink/codec/json_test.go | 36 -------- cdc/sink/codec/maxwell.go | 5 -- 8 files changed, 32 insertions(+), 216 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 4f3eb2ec49d..e75f5b86e94 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -146,16 +146,6 @@ func (a *AvroEventBatchEncoder) Build() (mqMessages []*MQMessage) { return old } -// MixedBuild implements the EventBatchEncoder interface -func (a *AvroEventBatchEncoder) MixedBuild(withVersion bool) []byte { - panic("Mixed Build only use for JsonEncoder") -} - -// Reset implements the EventBatchEncoder interface -func (a *AvroEventBatchEncoder) Reset() { - panic("Reset only used for JsonEncoder") -} - // Size is the current size of resultBuf func (a *AvroEventBatchEncoder) Size() int { if a.resultBuf == nil { diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index a635b3604be..b950e649b98 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -473,11 +473,6 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { return []*MQMessage{ret} } -// MixedBuild implements the EventBatchEncoder interface -func (d *CanalEventBatchEncoder) MixedBuild(withVersion bool) []byte { - panic("Mixed Build only use for JsonEncoder") -} - // Size implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) Size() int { // TODO: avoid marshaling the messages every time for calculating the size of the packet @@ -488,11 +483,6 @@ func (d *CanalEventBatchEncoder) Size() int { return proto.Size(d.packet) } -// Reset implements the EventBatchEncoder interface -func (d *CanalEventBatchEncoder) Reset() { - panic("Reset only used for JsonEncoder") -} - // SetParams is no-op for now func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error { // no op diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 11aebed69dd..4e7194748c8 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -354,21 +354,11 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { return ret } -// MixedBuild is not used here -func (c *CanalFlatEventBatchEncoder) MixedBuild(_ bool) []byte { - panic("MixedBuild not supported by CanalFlatEventBatchEncoder") -} - // Size implements the EventBatchEncoder interface func (c *CanalFlatEventBatchEncoder) Size() int { return -1 } -// Reset is only supported by JSONEventBatchEncoder -func (c *CanalFlatEventBatchEncoder) Reset() { - panic("not supported") -} - // SetParams sets the encoding parameters for the canal flat protocol. func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error { if s, ok := params["enable-tidb-extension"]; ok { diff --git a/cdc/sink/codec/craft.go b/cdc/sink/codec/craft.go index 8c7c859a31e..16e138bdf4b 100644 --- a/cdc/sink/codec/craft.go +++ b/cdc/sink/codec/craft.go @@ -78,21 +78,11 @@ func (e *CraftEventBatchEncoder) Build() []*MQMessage { return ret } -// MixedBuild implements the EventBatchEncoder interface -func (e *CraftEventBatchEncoder) MixedBuild(withVersion bool) []byte { - panic("Only JsonEncoder supports mixed build") -} - // Size implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) Size() int { return e.rowChangedBuffer.Size() } -// Reset implements the EventBatchEncoder interface -func (e *CraftEventBatchEncoder) Reset() { - e.rowChangedBuffer.Reset() -} - // SetParams reads relevant parameters for craft protocol func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error { var err error diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 3a552226f23..d11eb8050b0 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -37,16 +37,8 @@ type EventBatchEncoder interface { EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) // Build builds the batch and returns the bytes of key and value. Build() []*MQMessage - // MixedBuild builds the batch and returns the bytes of mixed keys and values. - // This is used for cdc log, to merge key and value into one byte slice - // when first create file, we should set withVersion to true, to tell us that - // the first 8 byte represents the encoder version - // TODO decouple it out - MixedBuild(withVersion bool) []byte // Size returns the size of the batch(bytes) Size() int - // Reset reset the kv buffer - Reset() // SetParams provides the encoder with more info on the sink SetParams(params map[string]string) error } diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index cb947b8464a..6b3d81b169c 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -373,11 +373,6 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent // JSONEventBatchEncoder encodes the events into the byte of a batch into. type JSONEventBatchEncoder struct { - // TODO remove deprecated fields - keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now - valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now - supportMixedBuild bool // TODO decouple this out - messageBuf []*MQMessage curBatchSize int // configs @@ -395,11 +390,6 @@ func (d *JSONEventBatchEncoder) GetMaxBatchSize() int { return d.maxBatchSize } -// SetMixedBuildSupport is used by CDC Log -func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) { - d.supportMixedBuild = enabled -} - // EncodeCheckpointEvent implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { keyMsg := newResolvedMessage(ts) @@ -413,13 +403,6 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er var valueLenByte [8]byte binary.BigEndian.PutUint64(valueLenByte[:], 0) - if d.supportMixedBuild { - d.keyBuf.Write(keyLenByte[:]) - d.keyBuf.Write(key) - d.valueBuf.Write(valueLenByte[:]) - return nil, nil - } - keyBuf := new(bytes.Buffer) var versionByte [8]byte binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) @@ -451,50 +434,42 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) var valueLenByte [8]byte binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - if d.supportMixedBuild { - d.keyBuf.Write(keyLenByte[:]) - d.keyBuf.Write(key) + // for single message that longer than max-message-size, do not send it. + // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` + length := len(key) + len(value) + maximumRecordOverhead + 16 + 8 + if length > d.maxMessageBytes { + log.Warn("Single message too large", + zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table)) + return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs() + } - d.valueBuf.Write(valueLenByte[:]) - d.valueBuf.Write(value) - } else { - // for single message that longer than max-message-size, do not send it. - // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` - length := len(key) + len(value) + maximumRecordOverhead + 16 + 8 - if length > d.maxMessageBytes { - log.Warn("Single message too large", - zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table)) - return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs() - } + if len(d.messageBuf) == 0 || + d.curBatchSize >= d.maxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes { - if len(d.messageBuf) == 0 || - d.curBatchSize >= d.maxBatchSize || - d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes { + versionHead := make([]byte, 8) + binary.BigEndian.PutUint64(versionHead, BatchVersion1) - versionHead := make([]byte, 8) - binary.BigEndian.PutUint64(versionHead, BatchVersion1) + d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil)) + d.curBatchSize = 0 + } - d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil)) - d.curBatchSize = 0 - } + message := d.messageBuf[len(d.messageBuf)-1] + message.Key = append(message.Key, keyLenByte[:]...) + message.Key = append(message.Key, key...) + message.Value = append(message.Value, valueLenByte[:]...) + message.Value = append(message.Value, value...) + message.Ts = e.CommitTs + message.Schema = &e.Table.Schema + message.Table = &e.Table.Table + message.IncRowsCount() - message := d.messageBuf[len(d.messageBuf)-1] - message.Key = append(message.Key, keyLenByte[:]...) - message.Key = append(message.Key, key...) - message.Value = append(message.Value, valueLenByte[:]...) - message.Value = append(message.Value, value...) - message.Ts = e.CommitTs - message.Schema = &e.Table.Schema - message.Table = &e.Table.Table - message.IncRowsCount() - - if message.Length() > d.maxMessageBytes { - // `len(d.messageBuf) == 1` is implied - log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", - zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes)) - } - d.curBatchSize++ + if message.Length() > d.maxMessageBytes { + // `len(d.messageBuf) == 1` is implied + log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", + zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes)) } + d.curBatchSize++ return nil } @@ -515,14 +490,6 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e var valueLenByte [8]byte binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - if d.supportMixedBuild { - d.keyBuf.Write(keyLenByte[:]) - d.keyBuf.Write(key) - d.valueBuf.Write(valueLenByte[:]) - d.valueBuf.Write(value) - return nil, nil - } - keyBuf := new(bytes.Buffer) var versionByte [8]byte binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) @@ -540,70 +507,14 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e // Build implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { - if d.supportMixedBuild { - if d.valueBuf.Len() == 0 { - return nil - } - /* there could be multiple types of event encoded within a single message which means the type is not sure */ - ret := NewMQMessage(config.ProtocolOpen, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil) - return []*MQMessage{ret} - } - ret := d.messageBuf d.messageBuf = make([]*MQMessage, 0) return ret } -// MixedBuild implements the EventBatchEncoder interface -func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte { - if !d.supportMixedBuild { - log.Panic("mixedBuildSupport not enabled!") - return nil - } - keyBytes := d.keyBuf.Bytes() - valueBytes := d.valueBuf.Bytes() - mixedBytes := make([]byte, len(keyBytes)+len(valueBytes)) - - index := uint64(0) - keyIndex := uint64(0) - valueIndex := uint64(0) - - if withVersion { - // the first 8 bytes is the version, we should copy directly - // then skip 8 bytes for next round key value parse - copy(mixedBytes[:8], keyBytes[:8]) - index = uint64(8) // skip version - keyIndex = uint64(8) // skip version - } - - for { - if keyIndex >= uint64(len(keyBytes)) { - break - } - keyLen := binary.BigEndian.Uint64(keyBytes[keyIndex : keyIndex+8]) - offset := keyLen + 8 - copy(mixedBytes[index:index+offset], keyBytes[keyIndex:keyIndex+offset]) - keyIndex += offset - index += offset - - valueLen := binary.BigEndian.Uint64(valueBytes[valueIndex : valueIndex+8]) - offset = valueLen + 8 - copy(mixedBytes[index:index+offset], valueBytes[valueIndex:valueIndex+offset]) - valueIndex += offset - index += offset - } - return mixedBytes -} - // Size implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Size() int { - return d.keyBuf.Len() + d.valueBuf.Len() -} - -// Reset implements the EventBatchEncoder interface -func (d *JSONEventBatchEncoder) Reset() { - d.keyBuf.Reset() - d.valueBuf.Reset() + return -1 } // SetParams reads relevant parameters for Open Protocol @@ -657,13 +568,7 @@ func newJSONEventBatchEncoderBuilder(opts map[string]string) EncoderBuilder { // NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. func NewJSONEventBatchEncoder() EventBatchEncoder { - batch := &JSONEventBatchEncoder{ - keyBuf: &bytes.Buffer{}, - valueBuf: &bytes.Buffer{}, - } - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - batch.keyBuf.Write(versionByte[:]) + batch := &JSONEventBatchEncoder{} return batch } diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 23cdc042846..a21e4e988d5 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -117,22 +117,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) c.Assert(err, check.IsNil) - mixedEncoder := newEncoder() - mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) - for _, row := range cs { - err := encoder.AppendRowChangedEvent(row) - c.Assert(err, check.IsNil) - - err = mixedEncoder.AppendRowChangedEvent(row) - c.Assert(err, check.IsNil) - } - // test mixed decode - mixed := mixedEncoder.MixedBuild(true) - c.Assert(len(mixed), check.Equals, mixedEncoder.Size()) - mixedDecoder, err := newDecoder(mixed, nil) - c.Assert(err, check.IsNil) - checkRowDecoder(mixedDecoder, cs) - // test normal decode if len(cs) > 0 { res := encoder.Build() c.Assert(res, check.HasLen, 1) @@ -145,11 +129,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.ddlCases { encoder := newEncoder() - mixedEncoder := newEncoder() err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) c.Assert(err, check.IsNil) - mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for i, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) c.Assert(err, check.IsNil) @@ -158,17 +140,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco c.Assert(err, check.IsNil) checkDDLDecoder(decoder, cs[i:i+1]) - msg, err = mixedEncoder.EncodeDDLEvent(ddl) - c.Assert(msg, check.IsNil) - c.Assert(err, check.IsNil) } - - // test mixed encode - mixed := mixedEncoder.MixedBuild(true) - c.Assert(len(mixed), check.Equals, mixedEncoder.Size()) - mixedDecoder, err := newDecoder(mixed, nil) - c.Assert(err, check.IsNil) - checkDDLDecoder(mixedDecoder, cs) } for _, cs := range s.resolvedTsCases { @@ -177,7 +149,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) c.Assert(err, check.IsNil) - mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for i, ts := range cs { msg, err := encoder.EncodeCheckpointEvent(ts) c.Assert(err, check.IsNil) @@ -190,13 +161,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco c.Assert(msg, check.IsNil) c.Assert(err, check.IsNil) } - - // test mixed encode - mixed := mixedEncoder.MixedBuild(true) - c.Assert(len(mixed), check.Equals, mixedEncoder.Size()) - mixedDecoder, err := newDecoder(mixed, nil) - c.Assert(err, check.IsNil) - checkTSDecoder(mixedDecoder, cs) } } diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 0ad2eb8527a..6c47061aca4 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -298,11 +298,6 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { return []*MQMessage{ret} } -// MixedBuild implements the EventBatchEncoder interface -func (d *MaxwellEventBatchEncoder) MixedBuild(withVersion bool) []byte { - return nil -} - // Reset implements the EventBatchEncoder interface func (d *MaxwellEventBatchEncoder) Reset() { d.keyBuf.Reset() From dc71104ac6b7d31addc2a00f506056879d2fd0ac Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 22 Feb 2022 16:41:44 +0800 Subject: [PATCH 2/3] sink/codec(ticdc): remove useless code --- cdc/sink/codec/json_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index a21e4e988d5..41762ad6b2e 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -145,7 +145,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.resolvedTsCases { encoder := newEncoder() - mixedEncoder := newEncoder() err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) c.Assert(err, check.IsNil) @@ -156,10 +155,6 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco decoder, err := newDecoder(msg.Key, msg.Value) c.Assert(err, check.IsNil) checkTSDecoder(decoder, cs[i:i+1]) - - msg, err = mixedEncoder.EncodeCheckpointEvent(ts) - c.Assert(msg, check.IsNil) - c.Assert(err, check.IsNil) } } } From 39e84da7d322c21f47344e65b4631a45d842eb95 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 22 Feb 2022 17:16:13 +0800 Subject: [PATCH 3/3] sink/codec(ticdc): fix test --- cdc/sink/codec/json_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 41762ad6b2e..3a39a05b4ff 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -117,6 +117,11 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) c.Assert(err, check.IsNil) + for _, row := range cs { + err := encoder.AppendRowChangedEvent(row) + c.Assert(err, check.IsNil) + } + if len(cs) > 0 { res := encoder.Build() c.Assert(res, check.HasLen, 1)