diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 07e9fb6317cb9..2b7d76e28d795 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -83,8 +83,9 @@ var ( ErrStorageInvalidPermission = errors.Normalize("external storage permission", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidPermission")) // Snapshot restore - ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch")) - ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer")) + ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch")) + ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer")) + ErrRestoreRegionWithoutPeer = errors.Normalize("restore met a region without any peer", errors.RFCCodeText("BR:EBS:ErrRestoreRegionWithoutPeer")) // Errors reported from TiKV. ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage")) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index f7efce83362f5..7432c3d9af0ce 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -372,6 +372,7 @@ type RecoverRegion struct { // 2. build a leader list for all region during the tikv startup // 3. get max allocate id func (recovery *Recovery) MakeRecoveryPlan() error { + storeBalanceScore := make(map[uint64]int, len(recovery.allStores)) // Group region peer info by region id. find the max allocateId // region [id] [peer[0-n]] var regions = make(map[uint64][]*RecoverRegion, 0) @@ -410,16 +411,20 @@ func (recovery *Recovery) MakeRecoveryPlan() error { } } else { // Generate normal commands. - log.Debug("detected valid peer", zap.Uint64("region id", regionId)) - for i, peer := range peers { - log.Debug("make plan", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId)) - plan := &recovpb.RecoverRegionRequest{RegionId: peer.RegionId, AsLeader: i == 0} - // sorted by log term -> last index -> commit index in a region - if plan.AsLeader { - log.Debug("as leader peer", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId)) - recovery.RecoveryPlan[peer.StoreId] = append(recovery.RecoveryPlan[peer.StoreId], plan) - } + log.Debug("detected valid region", zap.Uint64("region id", regionId)) + // calc the leader candidates + leaderCandidates, err := LeaderCandidates(peers) + if err != nil { + log.Warn("region without peer", zap.Uint64("region id", regionId)) + return errors.Trace(err) } + + // select the leader base on tikv storeBalanceScore + leader := SelectRegionLeader(storeBalanceScore, leaderCandidates) + log.Debug("as leader peer", zap.Uint64("store id", leader.StoreId), zap.Uint64("region id", leader.RegionId)) + plan := &recovpb.RecoverRegionRequest{RegionId: leader.RegionId, AsLeader: true} + recovery.RecoveryPlan[leader.StoreId] = append(recovery.RecoveryPlan[leader.StoreId], plan) + storeBalanceScore[leader.StoreId] += 1 } } return nil diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 259d3fa28d888..73a4411c445c1 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -750,3 +750,43 @@ func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64] } return validPeers, nil } + +// in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers +// LeaderCandidates select all peers can be select as a leader during the restore +func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error) { + if peers == nil { + return nil, errors.Annotatef(berrors.ErrRestoreRegionWithoutPeer, + "invalid region range") + } + candidates := make([]*RecoverRegion, 0, len(peers)) + // by default, the peers[0] to be assign as a leader, since peers already sorted by leader selection rule + leader := peers[0] + candidates = append(candidates, leader) + for _, peer := range peers[1:] { + // qualificated candidate is leader.logterm = candidate.logterm && leader.lastindex = candidate.lastindex && && leader.commitindex = candidate.commitindex + if peer.LastLogTerm == leader.LastLogTerm && peer.LastIndex == leader.LastIndex && peer.CommitIndex == leader.CommitIndex { + log.Debug("leader candidate", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId)) + candidates = append(candidates, peer) + } + } + return candidates, nil +} + +// for region A, has candidate leader x, y, z +// peer x on store 1 with storeBalanceScore 3 +// peer y on store 3 with storeBalanceScore 2 +// peer z on store 4 with storeBalanceScore 1 +// result: peer z will be select as leader on store 4 +func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion { + // by default, the peers[0] to be assign as a leader + leader := peers[0] + minLeaderStore := storeBalanceScore[leader.StoreId] + for _, peer := range peers[1:] { + log.Debug("leader candidate", zap.Int("score", storeBalanceScore[peer.StoreId]), zap.Int("min-score", minLeaderStore), zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId)) + if storeBalanceScore[peer.StoreId] < minLeaderStore { + minLeaderStore = storeBalanceScore[peer.StoreId] + leader = peer + } + } + return leader +} diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 44620e9cb4e5c..482818a1ad958 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -460,3 +460,52 @@ func TestCheckConsistencyAndValidPeer(t *testing.T) { require.Error(t, err) require.Regexp(t, ".*invalid restore range.*", err.Error()) } + +func TestLeaderCandidates(t *testing.T) { + //key space is continuous + validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false) + validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false) + validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false) + + peers := []*restore.RecoverRegion{ + validPeer1, + validPeer2, + validPeer3, + } + + candidates, err := restore.LeaderCandidates(peers) + require.NoError(t, err) + require.Equal(t, 3, len(candidates)) +} + +func TestSelectRegionLeader(t *testing.T) { + validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false) + validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false) + validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false) + + peers := []*restore.RecoverRegion{ + validPeer1, + validPeer2, + validPeer3, + } + // init store banlance score all is 0 + storeBalanceScore := make(map[uint64]int, len(peers)) + leader := restore.SelectRegionLeader(storeBalanceScore, peers) + require.Equal(t, validPeer1, leader) + + // change store banlance store + storeBalanceScore[2] = 3 + storeBalanceScore[3] = 2 + storeBalanceScore[1] = 1 + leader = restore.SelectRegionLeader(storeBalanceScore, peers) + require.Equal(t, validPeer3, leader) + + // one peer + peer := []*restore.RecoverRegion{ + validPeer3, + } + // init store banlance score all is 0 + storeScore := make(map[uint64]int, len(peer)) + leader = restore.SelectRegionLeader(storeScore, peer) + require.Equal(t, validPeer3, leader) +} diff --git a/errors.toml b/errors.toml index e34de79661c62..df952cc7af45a 100644 --- a/errors.toml +++ b/errors.toml @@ -66,6 +66,11 @@ error = ''' restore met a invalid peer ''' +["BR:EBS:ErrRestoreRegionWithoutPeer"] +error = ''' +restore met a region without any peer +''' + ["BR:EBS:ErrRestoreTotalKVMismatch"] error = ''' restore total tikvs mismatch