Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed Aug 10, 2023
1 parent 300545a commit 5945477
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 97 deletions.
194 changes: 110 additions & 84 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,79 +534,116 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
type accessFollower struct {
stateBase
// If tryLeader is true, the request can also be sent to the leader when !leader.isSlow()
tryLeader bool
isStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
learnerOnly bool
isStaleRead bool
option storeSelectorOp
lastIdx int
leaderIdx AccessIndex
highPriorReplicas []AccessIndex
lowPriorReplicas []AccessIndex
learnerOnly bool
// replicaReadType indicates the strategy of choosing replica.
// kv.ReplicaReadFollower, randomly choose follower.
// kv.ReplicaReadMixed
// if option.labels contains tikv.DCLabelKey, it's the closest replica read,
// randomly choose the one of the closest replicas.
// if not, randomly choose replica from leader and followers.
// kv.ReplicaReadLearner, randomly choose from learner.
// kv.ReplicaReadPreferLeader, choose leader firstly, retry with followers.
replicaReadType kv.ReplicaReadType
}

// setHighPriorityReplicas set the high priority replicas.
func (state *accessFollower) setHighPriorityReplicas(replicas []*replica, leaderIdx AccessIndex) {
if state.option.leaderOnly {
return
}
switch state.replicaReadType {
case kv.ReplicaReadFollower:
state.highPriorReplicas = make([]AccessIndex, 0, len(replicas)-1) // do not contain leader
for i, r := range replicas {
if AccessIndex(i) != leaderIdx && r.store.IsLabelsMatch(state.option.labels) {
state.highPriorReplicas = append(state.highPriorReplicas, AccessIndex(i))
}
}
case kv.ReplicaReadMixed:
state.highPriorReplicas = make([]AccessIndex, 0, len(replicas))
for i, r := range replicas {
if r.store.IsLabelsMatch(state.option.labels) {
state.highPriorReplicas = append(state.highPriorReplicas, AccessIndex(i))
}
}
case kv.ReplicaReadLearner:
state.highPriorReplicas = make([]AccessIndex, 0, len(replicas))
for i, r := range replicas {
if r.peer.Role == metapb.PeerRole_Learner {
state.highPriorReplicas = append(state.highPriorReplicas, AccessIndex(i))
}
}
case kv.ReplicaReadPreferLeader:
state.highPriorReplicas = []AccessIndex{leaderIdx}
case kv.ReplicaReadLeader:
// unreachable
}
if len(state.highPriorReplicas) > 1 {
rand.Shuffle(len(state.highPriorReplicas), func(i, j int) {
state.highPriorReplicas[i], state.highPriorReplicas[j] = state.highPriorReplicas[j], state.highPriorReplicas[i]
})
}
}

func (state *accessFollower) setLowPriorityReplicas(replicas []*replica, leaderIdx AccessIndex) {
switch state.replicaReadType {
case kv.ReplicaReadLeader, kv.ReplicaReadLearner, kv.ReplicaReadFollower, kv.ReplicaReadMixed:
return
case kv.ReplicaReadPreferLeader:
state.lowPriorReplicas = make([]AccessIndex, 0, len(replicas)-1)
for i := range replicas {
if AccessIndex(i) != leaderIdx {
state.lowPriorReplicas = append(state.lowPriorReplicas, AccessIndex(i))
}
}
state.lastIdx = -1
}
if len(state.lowPriorReplicas) > 1 {
rand.Shuffle(len(state.lowPriorReplicas), func(i, j int) {
state.lowPriorReplicas[i], state.lowPriorReplicas[j] = state.lowPriorReplicas[j], state.lowPriorReplicas[i]
})
}
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
replicaSize := len(selector.replicas)
resetStaleRead := false
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(replicaSize))
} else {
if replicaSize <= 1 {
state.lastIdx = state.leaderIdx
} else {
// Randomly select a non-leader peer
state.lastIdx = AccessIndex(rand.Intn(replicaSize - 1))
if state.lastIdx >= state.leaderIdx {
state.lastIdx++
}

if state.lastIdx >= 0 && state.isStaleRead {
WithLeaderOnly()(&state.option)
resetStaleRead = true
}

if state.lowPriorReplicas == nil && !state.option.leaderOnly {
for state.lastIdx = 0; state.lastIdx < len(state.highPriorReplicas); state.lastIdx++ {
selectedReplica := selector.replicas[state.highPriorReplicas[state.lastIdx]]
if state.isCandidate(selectedReplica) {
selector.targetIdx = state.highPriorReplicas[state.lastIdx]
break
}
}
} else {
// Stale Read request will retry the leader only by using the WithLeaderOnly option.
if state.isStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read.
resetStaleRead = true
if selector.targetIdx < 0 {
// lazy init low priority replicas.
state.setLowPriorityReplicas(selector.replicas, state.leaderIdx)
}
state.lastIdx++
}

// If selector is under `ReplicaReadPreferLeader` mode, we should choose leader as high priority.
if state.option.preferLeader {
state.lastIdx = state.leaderIdx
}
var offset int
if state.lastIdx >= 0 {
offset = rand.Intn(replicaSize)
}
reloadRegion := false
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
var idx AccessIndex
if state.option.preferLeader {
if i == 0 {
idx = state.lastIdx
} else {
// randomly select next replica, but skip state.lastIdx
if (i+offset)%replicaSize == 0 {
offset++
if selector.targetIdx < 0 && !state.option.leaderOnly {
if state.lowPriorReplicas != nil {
for state.lastIdx = 0; state.lastIdx < len(state.lowPriorReplicas); state.lastIdx++ {
selectedReplica := selector.replicas[state.lowPriorReplicas[state.lastIdx]]
if state.isCandidate(selectedReplica) {
selector.targetIdx = state.lowPriorReplicas[state.lastIdx]
break
}
}
} else {
idx = AccessIndex((int(state.lastIdx) + i) % replicaSize)
}
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
reloadRegion = true
}
}
if reloadRegion {
selector.regionCache.scheduleReloadRegion(selector.region)
}

// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
Expand All @@ -622,7 +659,6 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
selector.invalidateRegion()
return nil, nil
}
state.lastIdx = state.leaderIdx
selector.targetIdx = state.leaderIdx
}
// Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode.
Expand Down Expand Up @@ -663,28 +699,17 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
func (state *accessFollower) isCandidate(replica *replica) bool {
// the epoch is staled or retry exhausted, or the store is unreachable.
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
return false
}
// The request can only be sent to the leader.
if state.option.leaderOnly && idx == state.leaderIdx {
return true
}
// Choose a replica with matched labels.
followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) &&
replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)
if !followerCandidate {
return false
}
// And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers
// as candidates to serve the Read request.
if state.option.preferLeader && replica.store.isSlow() {
if state.replicaReadType == kv.ReplicaReadPreferLeader && replica.store.isSlow() {
return false
}
// If the stores are limited, check if the store is in the list.
return replica.store.IsStoreMatch(state.option.stores)
return true
}

// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read.
Expand Down Expand Up @@ -799,15 +824,16 @@ func newReplicaSelector(
if req.ReplicaReadType == kv.ReplicaReadPreferLeader {
WithPerferLeader()(&option)
}
tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader
state = &accessFollower{
tryLeader: tryLeader,
isStaleRead: req.StaleRead,
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
stateFollower := &accessFollower{
isStaleRead: req.StaleRead,
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
replicaReadType: req.ReplicaReadType,
}
stateFollower.setHighPriorityReplicas(replicas, regionStore.workTiKVIdx)
state = stateFollower
}

return &replicaSelector{
Expand Down
23 changes: 10 additions & 13 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
s.NotNil(replicaSelector)
s.Nil(err)

accessLearner, _ := replicaSelector.state.(*accessFollower)
// Invalidate the region if the leader is not in the region.
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
// Should switch to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), replicaSelector.targetIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
}

Expand Down Expand Up @@ -581,25 +580,23 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
state3, ok := replicaSelector.state.(*accessFollower)
s.True(ok)
s.Equal(regionStore.workTiKVIdx, state3.leaderIdx)
s.Equal(state3.lastIdx, AccessIndex(-1))
s.Equal(replicaSelector.targetIdx, AccessIndex(-1))

lastIdx := AccessIndex(-1)
lastIdx := -1
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
// Should switch to the next follower.
s.NotEqual(lastIdx, state3.lastIdx)
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
s.NotEqual(regionStore.workTiKVIdx, replicaSelector.targetIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
lastIdx = state3.lastIdx
}
// Fallback to the leader for 1 time
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
s.Equal(regionStore.workTiKVIdx, replicaSelector.targetIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
// All replicas are exhausted.
rpcCtx, err = replicaSelector.next(s.bo)
Expand All @@ -617,12 +614,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
state3 = replicaSelector.state.(*accessFollower)
_, ok = replicaSelector.state.(*accessFollower)
s.True(ok)
// Should fallback to the leader immediately.
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
s.Equal(regionStore.workTiKVIdx, replicaSelector.targetIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)

// Test accessFollower state filtering label-not-match stores.
Expand Down

0 comments on commit 5945477

Please sign in to comment.