diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 29c87e464..6c480b1ec 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -703,6 +703,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector return rpcCtx, nil } +func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { + // Allow another extra retry for the following case: + // 1. The stale read is enabled and leader peer is selected as the target peer at first. + // 2. Data is not ready is returned from the leader peer. + // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. + // 4. The leader peer should be retried again using snapshot read. + if state.isStaleRead && state.option.leaderOnly { + return leader.isExhausted(2) + } else { + return leader.isExhausted(1) + } +} + func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { if selector.checkLiveness(bo, selector.targetReplica()) != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index fc6cdb4cb..d7c56cb5c 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -37,6 +37,7 @@ package locate import ( "context" "fmt" + "strconv" "sync/atomic" "testing" "time" @@ -1190,3 +1191,59 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { } } } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + isFirstReq := true + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + // Return `DataIsNotReady` for the first time on leader. + if isFirstReq { + isFirstReq = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) +}