From c9ecc6ed3b85863d07b1eef0592c1a9fecd90a67 Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 24 Jan 2024 11:05:38 +0800 Subject: [PATCH] fix the issue that prefer-leader doesn't try followers (#1105) Signed-off-by: zyguan Co-authored-by: cfzjywxk --- internal/locate/region_request.go | 3 +- internal/locate/region_request3_test.go | 65 +++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index b5638eefc..ebb5c7dfb 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -706,9 +706,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector idx = state.lastIdx } else { // randomly select next replica, but skip state.lastIdx - if (i+offset)%replicaSize == 0 { + if (i+offset)%replicaSize == int(state.leaderIdx) { offset++ } + idx = AccessIndex((i + offset) % replicaSize) } } else { idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 9c45e47b5..c12d87c8f 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1674,6 +1674,71 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } +func (s *testRegionRequestToThreeStoresSuite) TestPreferLeader() { + key := []byte("key") + bo := retry.NewBackoffer(context.Background(), -1) + + // load region into cache + loc, err := s.cache.LocateKey(bo, key) + s.Require().NoError(err) + + region := s.cache.GetCachedRegionWithRLock(loc.Region) + leader, _, _, _ := region.WorkStorePeer(region.getStore()) + + // make request + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadPreferLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + + // setup mock client + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + val := "follower" + if addr == leader.addr { + val = "leader" + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(val)}}, nil + }} + + // access leader when all peers are reachable + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.NoError(err) + regionErr, err := resp.GetRegionError() + s.NoError(err) + s.Nil(regionErr) + s.Equal("leader", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + + // access follower when leader is unreachable + atomic.StoreUint32(&leader.livenessState, uint32(unreachable)) + + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.NoError(err) + regionErr, err = resp.GetRegionError() + s.NoError(err) + s.Nil(regionErr) + s.Equal("follower", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + + // access the rest follower when leader and one follower are unreachable + follower, _, _, _ := region.FollowerStorePeer(region.getStore(), 0, &storeSelectorOp{}) + atomic.StoreUint32(&follower.livenessState, uint32(unreachable)) + + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.NoError(err) + regionErr, err = resp.GetRegionError() + s.NoError(err) + s.Nil(regionErr) + s.Equal("follower", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + + // return fake error when all peers are unreachable + follower, _, _, _ = region.FollowerStorePeer(region.getStore(), 1, &storeSelectorOp{}) + atomic.StoreUint32(&follower.livenessState, uint32(unreachable)) + + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.NoError(err) + regionErr, err = resp.GetRegionError() + s.NoError(err) + s.True(IsFakeRegionError(regionErr)) +} + func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { key := []byte("key") value := []byte("value1")