Skip to content

Commit

Permalink
br: leader selection with tikv store balance score during the ebs dat…
Browse files Browse the repository at this point in the history
…a restore (#39402)

close #39404
  • Loading branch information
fengou1 committed Nov 29, 2022
1 parent 23543a4 commit d2ace99
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 11 deletions.
5 changes: 3 additions & 2 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ var (
ErrStorageInvalidPermission = errors.Normalize("external storage permission", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidPermission"))

// Snapshot restore
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreRegionWithoutPeer = errors.Normalize("restore met a region without any peer", errors.RFCCodeText("BR:EBS:ErrRestoreRegionWithoutPeer"))

// Errors reported from TiKV.
ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage"))
Expand Down
23 changes: 14 additions & 9 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type RecoverRegion struct {
// 2. build a leader list for all region during the tikv startup
// 3. get max allocate id
func (recovery *Recovery) MakeRecoveryPlan() error {
storeBalanceScore := make(map[uint64]int, len(recovery.allStores))
// Group region peer info by region id. find the max allocateId
// region [id] [peer[0-n]]
var regions = make(map[uint64][]*RecoverRegion, 0)
Expand Down Expand Up @@ -410,16 +411,20 @@ func (recovery *Recovery) MakeRecoveryPlan() error {
}
} else {
// Generate normal commands.
log.Debug("detected valid peer", zap.Uint64("region id", regionId))
for i, peer := range peers {
log.Debug("make plan", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: peer.RegionId, AsLeader: i == 0}
// sorted by log term -> last index -> commit index in a region
if plan.AsLeader {
log.Debug("as leader peer", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
recovery.RecoveryPlan[peer.StoreId] = append(recovery.RecoveryPlan[peer.StoreId], plan)
}
log.Debug("detected valid region", zap.Uint64("region id", regionId))
// calc the leader candidates
leaderCandidates, err := LeaderCandidates(peers)
if err != nil {
log.Warn("region without peer", zap.Uint64("region id", regionId))
return errors.Trace(err)
}

// select the leader base on tikv storeBalanceScore
leader := SelectRegionLeader(storeBalanceScore, leaderCandidates)
log.Debug("as leader peer", zap.Uint64("store id", leader.StoreId), zap.Uint64("region id", leader.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: leader.RegionId, AsLeader: true}
recovery.RecoveryPlan[leader.StoreId] = append(recovery.RecoveryPlan[leader.StoreId], plan)
storeBalanceScore[leader.StoreId] += 1
}
}
return nil
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,43 @@ func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]
}
return validPeers, nil
}

// in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers
// LeaderCandidates select all peers can be select as a leader during the restore
func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error) {
if peers == nil {
return nil, errors.Annotatef(berrors.ErrRestoreRegionWithoutPeer,
"invalid region range")
}
candidates := make([]*RecoverRegion, 0, len(peers))
// by default, the peers[0] to be assign as a leader, since peers already sorted by leader selection rule
leader := peers[0]
candidates = append(candidates, leader)
for _, peer := range peers[1:] {
// qualificated candidate is leader.logterm = candidate.logterm && leader.lastindex = candidate.lastindex && && leader.commitindex = candidate.commitindex
if peer.LastLogTerm == leader.LastLogTerm && peer.LastIndex == leader.LastIndex && peer.CommitIndex == leader.CommitIndex {
log.Debug("leader candidate", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
candidates = append(candidates, peer)
}
}
return candidates, nil
}

// for region A, has candidate leader x, y, z
// peer x on store 1 with storeBalanceScore 3
// peer y on store 3 with storeBalanceScore 2
// peer z on store 4 with storeBalanceScore 1
// result: peer z will be select as leader on store 4
func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion {
// by default, the peers[0] to be assign as a leader
leader := peers[0]
minLeaderStore := storeBalanceScore[leader.StoreId]
for _, peer := range peers[1:] {
log.Debug("leader candidate", zap.Int("score", storeBalanceScore[peer.StoreId]), zap.Int("min-score", minLeaderStore), zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
if storeBalanceScore[peer.StoreId] < minLeaderStore {
minLeaderStore = storeBalanceScore[peer.StoreId]
leader = peer
}
}
return leader
}
49 changes: 49 additions & 0 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,52 @@ func TestCheckConsistencyAndValidPeer(t *testing.T) {
require.Error(t, err)
require.Regexp(t, ".*invalid restore range.*", err.Error())
}

func TestLeaderCandidates(t *testing.T) {
//key space is continuous
validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false)
validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false)
validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false)

peers := []*restore.RecoverRegion{
validPeer1,
validPeer2,
validPeer3,
}

candidates, err := restore.LeaderCandidates(peers)
require.NoError(t, err)
require.Equal(t, 3, len(candidates))
}

func TestSelectRegionLeader(t *testing.T) {
validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false)
validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false)
validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false)

peers := []*restore.RecoverRegion{
validPeer1,
validPeer2,
validPeer3,
}
// init store banlance score all is 0
storeBalanceScore := make(map[uint64]int, len(peers))
leader := restore.SelectRegionLeader(storeBalanceScore, peers)
require.Equal(t, validPeer1, leader)

// change store banlance store
storeBalanceScore[2] = 3
storeBalanceScore[3] = 2
storeBalanceScore[1] = 1
leader = restore.SelectRegionLeader(storeBalanceScore, peers)
require.Equal(t, validPeer3, leader)

// one peer
peer := []*restore.RecoverRegion{
validPeer3,
}
// init store banlance score all is 0
storeScore := make(map[uint64]int, len(peer))
leader = restore.SelectRegionLeader(storeScore, peer)
require.Equal(t, validPeer3, leader)
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ error = '''
restore met a invalid peer
'''

["BR:EBS:ErrRestoreRegionWithoutPeer"]
error = '''
restore met a region without any peer
'''

["BR:EBS:ErrRestoreTotalKVMismatch"]
error = '''
restore total tikvs mismatch
Expand Down

0 comments on commit d2ace99

Please sign in to comment.