Skip to content

Commit

Permalink
Resume max retry time check for stale read retry with leader option
Browse files Browse the repository at this point in the history
Signed-off-by: cfzjywxk <[email protected]>
  • Loading branch information
cfzjywxk committed Jul 31, 2023
1 parent 719e645 commit cfd5507
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
15 changes: 14 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1))
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
Expand Down Expand Up @@ -644,6 +644,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
return rpcCtx, nil
}

func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// Allow another extra retry for the following case:
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
// 2. Data is not ready is returned from the leader peer.
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2)
} else {
return leader.isExhausted(1)
}
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
Expand Down
56 changes: 56 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package locate
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1142,3 +1143,58 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
s.Equal(0, retryTimes)
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}

region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
s.True(region.isValid())

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)

Check failure on line 1188 in internal/locate/region_request3_test.go

View workflow job for this annotation

GitHub Actions / golangci

lostcancel: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (govet)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}

0 comments on commit cfd5507

Please sign in to comment.