Skip to content

Commit

Permalink
frontier(ticdc): fast check if region is split or merged when forward…
Browse files Browse the repository at this point in the history
…ing resolvedTs (#7280)

close #7281
  • Loading branch information
sdojjy authored Oct 9, 2022
1 parent d3398f0 commit 0f2a253
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 234 deletions.
11 changes: 8 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,10 +878,15 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
// After this resolved ts event is sent, we don't need to send one more
// resolved ts event when the region starts to work.
resolvedEv := model.RegionFeedEvent{
Resolved: []*model.ResolvedSpan{{
Span: sri.span,
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{
{
Span: sri.span,
Region: sri.verID.GetID(),
},
},
ResolvedTs: sri.ts,
}},
},
}
select {
case s.eventCh <- resolvedEv:
Expand Down
205 changes: 121 additions & 84 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
}

checkEvent := func(event model.RegionFeedEvent, ts uint64) {
require.Equal(t, ts, event.Resolved[0].ResolvedTs)
require.Equal(t, ts, event.Resolved.ResolvedTs)
}

initialized := mockInitializedEvent(3 /* regionID */, currentRequestID())
Expand Down Expand Up @@ -605,7 +605,7 @@ consumePreResolvedTs:
select {
case event = <-eventCh:
require.NotNil(t, event.Resolved)
require.Equal(t, uint64(100), event.Resolved[0].ResolvedTs)
require.Equal(t, uint64(100), event.Resolved.ResolvedTs)
case <-time.After(time.Second):
break consumePreResolvedTs
}
Expand Down Expand Up @@ -641,7 +641,7 @@ consumePreResolvedTs:
require.FailNow(t, "reconnection not succeed in 3 seconds")
}
require.NotNil(t, event.Resolved)
require.Equal(t, uint64(120), event.Resolved[0].ResolvedTs)
require.Equal(t, uint64(120), event.Resolved.ResolvedTs)

cancel()
}
Expand Down Expand Up @@ -1080,10 +1080,12 @@ func testHandleFeedEvent(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 100,
},
},
{
Val: &model.RawKVEntry{
Expand Down Expand Up @@ -1151,25 +1153,32 @@ func testHandleFeedEvent(t *testing.T) {
RegionID: 3,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 135,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 135,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 145,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 145,
},
},
}
multipleExpected := model.RegionFeedEvent{
Resolved: make([]*model.ResolvedSpan, multiSize),
}
for i := range multipleExpected.Resolved {
multipleExpected.Resolved[i] = &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Resolved: &model.ResolvedSpans{
Spans: make([]model.RegionComparableSpan, multiSize),
ResolvedTs: 160,
},
}
for i := range multipleExpected.Resolved.Spans {
multipleExpected.Resolved.Spans[i] = model.RegionComparableSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}
}

Expand Down Expand Up @@ -1327,7 +1336,7 @@ func TestStreamSendWithError(t *testing.T) {
select {
case event := <-eventCh:
require.NotNil(t, event.Resolved)
require.Equal(t, 1, len(event.Resolved))
require.Equal(t, 1, len(event.Resolved.Spans))
require.NotNil(t, 0, event.RegionID)
case <-time.After(time.Second):
require.Fail(t, fmt.Sprintf("expected events are not receive, received: %v", initRegions))
Expand Down Expand Up @@ -1431,16 +1440,20 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
},
}

Expand All @@ -1449,7 +1462,7 @@ eventLoop:
for {
select {
case ev := <-eventCh:
if ev.Resolved[0].ResolvedTs != uint64(100) {
if ev.Resolved.ResolvedTs != uint64(100) {
events = append(events, ev)
}
case <-time.After(time.Second):
Expand Down Expand Up @@ -1617,7 +1630,7 @@ ReceiveLoop:
break ReceiveLoop
}
received = append(received, event)
if event.Resolved[0].ResolvedTs == 130 {
if event.Resolved.ResolvedTs == 130 {
break ReceiveLoop
}
case <-time.After(time.Second):
Expand All @@ -1626,7 +1639,7 @@ ReceiveLoop:
}
var lastResolvedTs uint64
for _, e := range received {
if lastResolvedTs > e.Resolved[0].ResolvedTs {
if lastResolvedTs > e.Resolved.ResolvedTs {
require.Fail(t, fmt.Sprintf("the resolvedTs is back off %#v", resolved))
}
}
Expand Down Expand Up @@ -1753,7 +1766,7 @@ func TestIncompatibleTiKV(t *testing.T) {
ch1 <- initialized
select {
case event := <-eventCh:
require.Equal(t, 1, len(event.Resolved))
require.Equal(t, 1, len(event.Resolved.Spans))
case <-time.After(time.Second):
require.Fail(t, "expected events are not receive")
}
Expand Down Expand Up @@ -1823,7 +1836,7 @@ func TestNoPendingRegionError(t *testing.T) {
ch1 <- initialized
ev := <-eventCh
require.NotNil(t, ev.Resolved)
require.Equal(t, uint64(100), ev.Resolved[0].ResolvedTs)
require.Equal(t, uint64(100), ev.Resolved.ResolvedTs)

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
Expand All @@ -1835,7 +1848,7 @@ func TestNoPendingRegionError(t *testing.T) {
ch1 <- resolved
ev = <-eventCh
require.NotNil(t, ev.Resolved)
require.Equal(t, uint64(200), ev.Resolved[0].ResolvedTs)
require.Equal(t, uint64(200), ev.Resolved.ResolvedTs)

cancel()
}
Expand Down Expand Up @@ -1910,22 +1923,28 @@ func TestDropStaleRequest(t *testing.T) {
}}
expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 130,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 130,
},
},
}

Expand Down Expand Up @@ -2010,16 +2029,20 @@ func TestResolveLock(t *testing.T) {
}}
expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: tso,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: tso,
},
},
}
ch1 <- resolved
Expand Down Expand Up @@ -2342,17 +2365,21 @@ func testEventAfterFeedStop(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
RegionID: regionID,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
RegionID: regionID,
},
{
Expand All @@ -2367,10 +2394,12 @@ func testEventAfterFeedStop(t *testing.T) {
RegionID: 3,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
RegionID: regionID,
},
}
Expand Down Expand Up @@ -2542,10 +2571,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 100,
},
},
{
Val: &model.RawKVEntry{
Expand All @@ -2570,10 +2601,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
RegionID: 3,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 145,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 145,
},
},
}

Expand Down Expand Up @@ -3025,17 +3058,19 @@ func testKVClientForceReconnect(t *testing.T) {
ch2 <- resolved

expected := model.RegionFeedEvent{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
Region: regionID3,
}}, ResolvedTs: 135,
},
}

eventLoop:
for {
select {
case ev := <-eventCh:
if ev.Resolved != nil && ev.Resolved[0].ResolvedTs == uint64(100) {
if ev.Resolved != nil && ev.Resolved.ResolvedTs == uint64(100) {
continue
}
require.Equal(t, expected, ev)
Expand Down Expand Up @@ -3263,10 +3298,12 @@ func TestEvTimeUpdate(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 100,
},
},
{
Val: &model.RawKVEntry{
Expand Down
Loading

0 comments on commit 0f2a253

Please sign in to comment.