Skip to content

Commit

Permalink
fix the issue that prefer-leader doesn't try followers (#1105)
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
  • Loading branch information
zyguan and cfzjywxk committed Jan 24, 2024
1 parent 3480b5e commit c9ecc6e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
idx = state.lastIdx
} else {
// randomly select next replica, but skip state.lastIdx
if (i+offset)%replicaSize == 0 {
if (i+offset)%replicaSize == int(state.leaderIdx) {
offset++
}
idx = AccessIndex((i + offset) % replicaSize)
}
} else {
idx = AccessIndex((int(state.lastIdx) + i) % replicaSize)
Expand Down
65 changes: 65 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,71 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() {
s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
}

func (s *testRegionRequestToThreeStoresSuite) TestPreferLeader() {
key := []byte("key")
bo := retry.NewBackoffer(context.Background(), -1)

// load region into cache
loc, err := s.cache.LocateKey(bo, key)
s.Require().NoError(err)

region := s.cache.GetCachedRegionWithRLock(loc.Region)
leader, _, _, _ := region.WorkStorePeer(region.getStore())

// make request
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadPreferLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope

// setup mock client
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
val := "follower"
if addr == leader.addr {
val = "leader"
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(val)}}, nil
}}

// access leader when all peers are reachable
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.NoError(err)
regionErr, err := resp.GetRegionError()
s.NoError(err)
s.Nil(regionErr)
s.Equal("leader", string(resp.Resp.(*kvrpcpb.GetResponse).Value))

// access follower when leader is unreachable
atomic.StoreUint32(&leader.livenessState, uint32(unreachable))

resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.NoError(err)
regionErr, err = resp.GetRegionError()
s.NoError(err)
s.Nil(regionErr)
s.Equal("follower", string(resp.Resp.(*kvrpcpb.GetResponse).Value))

// access the rest follower when leader and one follower are unreachable
follower, _, _, _ := region.FollowerStorePeer(region.getStore(), 0, &storeSelectorOp{})
atomic.StoreUint32(&follower.livenessState, uint32(unreachable))

resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.NoError(err)
regionErr, err = resp.GetRegionError()
s.NoError(err)
s.Nil(regionErr)
s.Equal("follower", string(resp.Resp.(*kvrpcpb.GetResponse).Value))

// return fake error when all peers are unreachable
follower, _, _, _ = region.FollowerStorePeer(region.getStore(), 1, &storeSelectorOp{})
atomic.StoreUint32(&follower.livenessState, uint32(unreachable))

resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.NoError(err)
regionErr, err = resp.GetRegionError()
s.NoError(err)
s.True(IsFakeRegionError(regionErr))
}

func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() {
key := []byte("key")
value := []byte("value1")
Expand Down

0 comments on commit c9ecc6e

Please sign in to comment.