Skip to content

Commit

Permalink
sorter(ticdc): fix dml order (#8598) (#8635)
Browse files Browse the repository at this point in the history
close #8597
  • Loading branch information
ti-chi-bot authored Mar 23, 2023
1 parent 21338df commit afc33c5
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
4 changes: 4 additions & 0 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
if i.RawKV.OpType == OpTypeDelete && j.RawKV.OpType != OpTypeDelete {
return true
}
// update DML
if i.RawKV.OldValue != nil && j.RawKV.OldValue == nil {
return true
}
}
return i.CRTs < j.CRTs
}
Expand Down
30 changes: 30 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,33 @@ func TestResolvedTs(t *testing.T) {
smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID}
require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs))
}

func TestComparePolymorphicEvents(t *testing.T) {
cases := []struct {
a *PolymorphicEvent
b *PolymorphicEvent
}{
{
a: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypeDelete,
}),
b: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypePut,
}),
},
{
a: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypePut,
OldValue: []byte{0},
Value: []byte{0},
}),
b: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypePut,
Value: []byte{0},
}),
},
}
for _, item := range cases {
require.True(t, ComparePolymorphicEvents(item.a, item.b))
}
}
22 changes: 19 additions & 3 deletions cdc/sorter/encoding/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"go.uber.org/zap"
)

const (
typeDelete = iota + 1
typeUpdate
typeInsert
)

// DecodeKey decodes a key to uniqueID, tableID, startTs, CRTs.
func DecodeKey(key []byte) (uniqueID uint32, tableID uint64, startTs, CRTs uint64) {
// uniqueID, tableID, CRTs, startTs, Key, Put/Delete
Expand Down Expand Up @@ -54,7 +60,7 @@ func EncodeTsKey(uniqueID uint32, tableID uint64, ts uint64) []byte {
}

// EncodeKey encodes a key according to event.
// Format: uniqueID, tableID, CRTs, startTs, Put/Delete, Key.
// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key.
func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) []byte {
if event.RawKV == nil {
log.Panic("rawkv must not be nil", zap.Any("event", event))
Expand All @@ -75,9 +81,19 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [
// startTs
binary.BigEndian.PutUint64(uint64Buf[:], event.StartTs)
buf = append(buf, uint64Buf[:]...)
// Let Delete < Put
binary.BigEndian.PutUint16(uint64Buf[:], ^uint16(event.RawKV.OpType))
// Let Delete < Update < Insert
binary.BigEndian.PutUint16(uint64Buf[:], getDMLOrder(event.RawKV))
buf = append(buf, uint64Buf[:2]...)
// key
return append(buf, event.RawKV.Key...)
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(rowKV *model.RawKVEntry) uint16 {
if rowKV.OpType == model.OpTypeDelete {
return typeDelete
} else if rowKV.OldValue != nil {
return typeUpdate
}
return typeInsert
}
21 changes: 21 additions & 0 deletions cdc/sorter/encoding/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ func TestEncodeKey(t *testing.T) {
CRTs: 3,
}),
)

// update < insert
mustLess(
0, 0,
1, 1,
model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{2},
StartTs: 1,
CRTs: 1,
Value: []byte{2},
OldValue: []byte{2},
}),
model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{2},
StartTs: 1,
CRTs: 1,
Value: []byte{2},
}),
)
}

func TestEncodeTsKey(t *testing.T) {
Expand Down

0 comments on commit afc33c5

Please sign in to comment.