Skip to content

Commit

Permalink
*: refine non-global stale-read request retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed Jun 28, 2023
1 parent 5fde309 commit f83ae46
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
28 changes: 13 additions & 15 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,11 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
type accessFollower struct {
stateBase
// If tryLeader is true, the request can also be sent to the leader.
tryLeader bool
isGlobalStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
tryLeader bool
isStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand All @@ -540,12 +540,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
}
} else {
// Stale Read request will retry the leader or next peer on error,
// if txnScope is global, we will only retry the leader by using the WithLeaderOnly option,
// if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector.
if state.isGlobalStaleRead {
// Stale Read request will retry the leader only by using the WithLeaderOnly option,
if state.isStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read.
resetStaleRead = true
}
state.lastIdx++
Expand Down Expand Up @@ -648,11 +646,11 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
op(&option)
}
state = &accessFollower{
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed,
isGlobalStaleRead: req.IsGlobalStaleRead(),
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed,
isStaleRead: req.StaleRead,
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
}
}

Expand Down
36 changes: 36 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,39 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch
s.NotNil(regionErr)
s.Equal(target, client.closedAddr)
}

func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
req.EnableStaleRead()
req.ReadReplicaScope = "z1" // not global stale read.
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)

oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()

client := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if req.StaleRead {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}},
}}
} else {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}
}
return response, nil
}}

s.regionRequestSender.client = client
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
}

0 comments on commit f83ae46

Please sign in to comment.