Skip to content

Commit

Permalink
fix unexpected slow query during GC running after stop 1 tikv-server (#…
Browse files Browse the repository at this point in the history
…899)

* fix unexpected slow query during GC running after stop 1 tikv-server

Signed-off-by: crazycs520 <[email protected]>

* add test

Signed-off-by: crazycs520 <[email protected]>

* address comment

Signed-off-by: crazycs520 <[email protected]>

* address comment

Signed-off-by: crazycs520 <[email protected]>

* fix test

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed Jul 24, 2023
1 parent c7e214f commit 59adec2
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 3 deletions.
6 changes: 4 additions & 2 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
}
targetReplica = selector.replicas[idx]
// Each follower is only tried once
if !targetReplica.isExhausted(1) {
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
state.lastIdx = idx
selector.targetIdx = idx
break
Expand Down Expand Up @@ -604,7 +604,9 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels)))
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) &&
// Make sure the replica is not unreachable.
replica.store.getLivenessState() != unreachable
}

type invalidStore struct {
Expand Down
91 changes: 90 additions & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ func refreshEpochs(regionStore *regionStore) {
}
}

func refreshLivenessStates(regionStore *regionStore) {
for _, store := range regionStore.stores {
atomic.StoreUint32(&store.livenessState, uint32(reachable))
}
}

func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
Expand Down Expand Up @@ -511,6 +517,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Test accessFollower state with kv.ReplicaReadFollower request type.
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
refreshEpochs(regionStore)
refreshLivenessStates(regionStore)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
Expand Down Expand Up @@ -625,10 +632,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(region)
regionStore := s.cache.GetCachedRegionWithRLock(region.Region).getStore()
s.NotNil(regionStore)

reloadRegion := func() {
s.regionRequestSender.replicaSelector.region.invalidate(Other)
region, _ = s.cache.LocateRegionByID(s.bo, s.regionID)
regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore()
}

hasFakeRegionError := func(resp *tikvrpc.Response) bool {
Expand Down Expand Up @@ -660,6 +670,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
s.True(bo.GetTotalBackoffTimes() == 1)
s.cluster.StartStore(s.storeIDs[0])
atomic.StoreUint32(&regionStore.stores[0].livenessState, uint32(reachable))

// Leader is updated because of send success, so no backoff.
bo = retry.NewBackoffer(context.Background(), -1)
Expand All @@ -679,6 +690,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 1)
s.cluster.StartStore(s.storeIDs[1])
atomic.StoreUint32(&regionStore.stores[1].livenessState, uint32(reachable))

// Leader is changed. No backoff.
reloadRegion()
Expand All @@ -695,7 +707,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped
s.Equal(bo.GetTotalBackoffTimes(), 3)
s.False(sender.replicaSelector.region.isValid())
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])

Expand Down Expand Up @@ -929,3 +941,80 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg
// after region error returned, the region should be invalidated.
s.False(region.isValid())
}

func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() {
var leaderAddr string
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
// Returns error when accesses non-leader.
if leaderAddr != addr {
return nil, context.DeadlineExceeded
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
Value: []byte("value"),
}}, nil
}}

req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
req.ReplicaReadType = kv.ReplicaReadMixed

loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
region := s.cache.GetCachedRegionWithRLock(loc.Region)
s.NotNil(region)
regionStore := region.getStore()
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
s.NotEqual(leaderAddr, "")
for i := 0; i < 10; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp)

// Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale.
allFollowerStoreEpochStale := true
for i, store := range regionStore.stores {
if i == int(regionStore.workTiKVIdx) {
continue
}
if store.epoch == regionStore.storeEpochs[i] {
allFollowerStoreEpochStale = false
break
} else {
s.Equal(store.getLivenessState(), unreachable)
}
}
if allFollowerStoreEpochStale {
break
}
}

// mock for GC leader reload all regions.
bo := retry.NewBackofferWithVars(context.Background(), 10, nil)
_, err = s.cache.BatchLoadRegionsWithKeyRange(bo, []byte(""), nil, 1)
s.Nil(err)

loc, err = s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
region = s.cache.GetCachedRegionWithRLock(loc.Region)
s.NotNil(region)
regionStore = region.getStore()
for i, store := range regionStore.stores {
if i == int(regionStore.workTiKVIdx) {
continue
}
// After reload region, the region epoch will be updated, but the store liveness state is still unreachable.
s.Equal(store.epoch, regionStore.storeEpochs[i])
s.Equal(store.getLivenessState(), unreachable)
}

for i := 0; i < 100; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 1, nil)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp)
// since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
s.Equal(0, bo.GetTotalBackoffTimes())
}
}

0 comments on commit 59adec2

Please sign in to comment.