diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index aac428b48..36cf476d4 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -532,58 +532,116 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { type accessFollower struct { stateBase // If tryLeader is true, the request can also be sent to the leader when !leader.isSlow() - tryLeader bool - isStaleRead bool - option storeSelectorOp - leaderIdx AccessIndex - lastIdx AccessIndex - learnerOnly bool + isStaleRead bool + option storeSelectorOp + lastIdx int + leaderIdx AccessIndex + highPriorReplicas []AccessIndex + lowPriorReplicas []AccessIndex + learnerOnly bool + // replicaReadType indicates the strategy of choosing replica. + // kv.ReplicaReadFollower, randomly choose follower. + // kv.ReplicaReadMixed + // if option.labels contains tikv.DCLabelKey, it's the closest replica read, + // randomly choose the one of the closest replicas. + // if not, randomly choose replica from leader and followers. + // kv.ReplicaReadLearner, randomly choose from learner. + // kv.ReplicaReadPreferLeader, choose leader firstly, retry with followers. + replicaReadType kv.ReplicaReadType +} + +// setHighPriorityReplicas set the high priority replicas. +func (state *accessFollower) setHighPriorityReplicas(replicas []*replica, leaderIdx AccessIndex) { + if state.option.leaderOnly { + return + } + switch state.replicaReadType { + case kv.ReplicaReadFollower: + state.highPriorReplicas = make([]AccessIndex, 0, len(replicas)-1) // do not contain leader + for i, r := range replicas { + if AccessIndex(i) != leaderIdx && r.store.IsLabelsMatch(state.option.labels) { + state.highPriorReplicas = append(state.highPriorReplicas, AccessIndex(i)) + } + } + case kv.ReplicaReadMixed: + state.highPriorReplicas = make([]AccessIndex, 0, len(replicas)) + for i, r := range replicas { + if r.store.IsLabelsMatch(state.option.labels) { + state.highPriorReplicas = append(state.highPriorReplicas, AccessIndex(i)) + } + } + case kv.ReplicaReadLearner: + state.highPriorReplicas = make([]AccessIndex, 0, len(replicas)) + for i, r := range replicas { + if r.peer.Role == metapb.PeerRole_Learner { + state.highPriorReplicas = append(state.highPriorReplicas, AccessIndex(i)) + } + } + case kv.ReplicaReadPreferLeader: + state.highPriorReplicas = []AccessIndex{leaderIdx} + case kv.ReplicaReadLeader: + // unreachable + } + if len(state.highPriorReplicas) > 1 { + rand.Shuffle(len(state.highPriorReplicas), func(i, j int) { + state.highPriorReplicas[i], state.highPriorReplicas[j] = state.highPriorReplicas[j], state.highPriorReplicas[i] + }) + } } -func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - replicaSize := len(selector.replicas) - resetStaleRead := false - if state.lastIdx < 0 { - if state.tryLeader { - state.lastIdx = AccessIndex(rand.Intn(replicaSize)) - } else { - if replicaSize <= 1 { - state.lastIdx = state.leaderIdx - } else { - // Randomly select a non-leader peer - state.lastIdx = AccessIndex(rand.Intn(replicaSize - 1)) - if state.lastIdx >= state.leaderIdx { - state.lastIdx++ - } +func (state *accessFollower) setLowPriorityReplicas(replicas []*replica, leaderIdx AccessIndex) { + switch state.replicaReadType { + case kv.ReplicaReadLeader, kv.ReplicaReadLearner, kv.ReplicaReadFollower, kv.ReplicaReadMixed: + return + case kv.ReplicaReadPreferLeader: + state.lowPriorReplicas = make([]AccessIndex, 0, len(replicas)-1) + for i := range replicas { + if AccessIndex(i) != leaderIdx { + state.lowPriorReplicas = append(state.lowPriorReplicas, AccessIndex(i)) } } - } else { - // Stale Read request will retry the leader only by using the WithLeaderOnly option. - if state.isStaleRead { - WithLeaderOnly()(&state.option) - // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read. - resetStaleRead = true - } - state.lastIdx++ + state.lastIdx = -1 + } + if len(state.lowPriorReplicas) > 1 { + rand.Shuffle(len(state.lowPriorReplicas), func(i, j int) { + state.lowPriorReplicas[i], state.lowPriorReplicas[j] = state.lowPriorReplicas[j], state.lowPriorReplicas[i] + }) } +} - // If selector is under `ReplicaReadPreferLeader` mode, we should choose leader as high priority. - if state.option.preferLeader { - state.lastIdx = state.leaderIdx +func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + resetStaleRead := false + + if state.lastIdx >= 0 && state.isStaleRead { + WithLeaderOnly()(&state.option) + resetStaleRead = true } - for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - idx := AccessIndex((int(state.lastIdx) + i) % replicaSize) - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ { - idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) + + if state.lowPriorReplicas == nil && !state.option.leaderOnly { + for state.lastIdx = 0; state.lastIdx < len(state.highPriorReplicas); state.lastIdx++ { + selectedReplica := selector.replicas[state.highPriorReplicas[state.lastIdx]] + if state.isCandidate(selectedReplica) { + selector.targetIdx = state.highPriorReplicas[state.lastIdx] + break + } } - if state.isCandidate(idx, selector.replicas[idx]) { - state.lastIdx = idx - selector.targetIdx = idx - break + if selector.targetIdx < 0 { + // lazy init low priority replicas. + state.setLowPriorityReplicas(selector.replicas, state.leaderIdx) + } + } + if selector.targetIdx < 0 && !state.option.leaderOnly { + if state.lowPriorReplicas != nil { + for state.lastIdx = 0; state.lastIdx < len(state.lowPriorReplicas); state.lastIdx++ { + selectedReplica := selector.replicas[state.lowPriorReplicas[state.lastIdx]] + if state.isCandidate(selectedReplica) { + selector.targetIdx = state.lowPriorReplicas[state.lastIdx] + break + } + } } } + // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { if len(state.option.labels) > 0 { @@ -598,7 +656,6 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector selector.invalidateRegion() return nil, nil } - state.lastIdx = state.leaderIdx selector.targetIdx = state.leaderIdx } // Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode. @@ -626,28 +683,17 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic } } -func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { +func (state *accessFollower) isCandidate(replica *replica) bool { // the epoch is staled or retry exhausted. if replica.isEpochStale() || replica.isExhausted(1) { return false } - // The request can only be sent to the leader. - if state.option.leaderOnly && idx == state.leaderIdx { - return true - } - // Choose a replica with matched labels. - followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && - replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner) - if !followerCandidate { - return false - } // And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers // as candidates to serve the Read request. - if state.option.preferLeader && replica.store.isSlow() { + if state.replicaReadType == kv.ReplicaReadPreferLeader && replica.store.isSlow() { return false } - // If the stores are limited, check if the store is in the list. - return replica.store.IsStoreMatch(state.option.stores) + return true } // tryIdleReplica is the state where we find the leader is busy and retry the request using replica read. @@ -762,15 +808,16 @@ func newReplicaSelector( if req.ReplicaReadType == kv.ReplicaReadPreferLeader { WithPerferLeader()(&option) } - tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader - state = &accessFollower{ - tryLeader: tryLeader, - isStaleRead: req.StaleRead, - option: option, - leaderIdx: regionStore.workTiKVIdx, - lastIdx: -1, - learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, + stateFollower := &accessFollower{ + isStaleRead: req.StaleRead, + option: option, + leaderIdx: regionStore.workTiKVIdx, + lastIdx: -1, + learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, + replicaReadType: req.ReplicaReadType, } + stateFollower.setHighPriorityReplicas(replicas, regionStore.workTiKVIdx) + state = stateFollower } return &replicaSelector{ diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e05c2cac5..9defcb03a 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -333,13 +333,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { s.NotNil(replicaSelector) s.Nil(err) - accessLearner, _ := replicaSelector.state.(*accessFollower) // Invalidate the region if the leader is not in the region. atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. - s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx) + // Should switch to the next follower. + s.Equal(AccessIndex(tikvLearnerAccessIdx), replicaSelector.targetIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) } @@ -573,25 +572,23 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { state3, ok := replicaSelector.state.(*accessFollower) s.True(ok) s.Equal(regionStore.workTiKVIdx, state3.leaderIdx) - s.Equal(state3.lastIdx, AccessIndex(-1)) + s.Equal(replicaSelector.targetIdx, AccessIndex(-1)) - lastIdx := AccessIndex(-1) + lastIdx := -1 for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. + // Should switch to the next follower. s.NotEqual(lastIdx, state3.lastIdx) // Shouldn't access the leader if followers aren't exhausted. - s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) - s.Equal(replicaSelector.targetIdx, state3.lastIdx) + s.NotEqual(regionStore.workTiKVIdx, replicaSelector.targetIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) lastIdx = state3.lastIdx } // Fallback to the leader for 1 time rpcCtx, err = replicaSelector.next(s.bo) s.Nil(err) - s.Equal(regionStore.workTiKVIdx, state3.lastIdx) - s.Equal(replicaSelector.targetIdx, state3.lastIdx) + s.Equal(regionStore.workTiKVIdx, replicaSelector.targetIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // All replicas are exhausted. rpcCtx, err = replicaSelector.next(s.bo) @@ -613,8 +610,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Should fallback to the leader immediately. rpcCtx, err = replicaSelector.next(s.bo) s.Nil(err) - s.Equal(regionStore.workTiKVIdx, state3.lastIdx) - s.Equal(replicaSelector.targetIdx, state3.lastIdx) + s.Equal(regionStore.workTiKVIdx, replicaSelector.targetIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // Test accessFollower state filtering label-not-match stores.