Skip to content

Commit

Permalink
Count only one tiflash replica in disaggregated mode. (#805)
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed May 22, 2023
1 parent d5baf4c commit a2f93a3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 7 deletions.
34 changes: 27 additions & 7 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,32 @@ func fetchRespInfo(resp *tikvrpc.Response) string {
return extraInfo
}

func (s *RegionRequestSender) countReplicaNumber(peers []*metapb.Peer) int {
isTiFlashWriteNode := func(storeId uint64) bool {
store := s.regionCache.getStoreByStoreID(storeId)
engine, _ := store.GetLabelValue("engine")
engineRole, _ := store.GetLabelValue("engine_role")
return engine == "tiflash" && engineRole == "write"
}
// In disaggregated-mode(tiflash write-node), only count 1 replica for tiflash, no matter how many tiflash write-nodes there are.
replicaNumber := 0
hasTiFlashWriteNode := false
for _, peer := range peers {
role := peer.GetRole()
if role == metapb.PeerRole_Voter {
replicaNumber++
} else if role == metapb.PeerRole_Learner {
if !isTiFlashWriteNode(peer.StoreId) {
replicaNumber++
} else if !hasTiFlashWriteNode {
hasTiFlashWriteNode = true
replicaNumber++
}
}
}
return replicaNumber
}

func (s *RegionRequestSender) sendReqToRegion(
bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration,
) (resp *tikvrpc.Response, retry bool, err error) {
Expand Down Expand Up @@ -1348,13 +1374,7 @@ func (s *RegionRequestSender) sendReqToRegion(
// Count the replica number as the RU cost factor.
req.ReplicaNumber = 1
if rpcCtx.Meta != nil && len(rpcCtx.Meta.GetPeers()) > 0 {
req.ReplicaNumber = 0
for _, peer := range rpcCtx.Meta.GetPeers() {
role := peer.GetRole()
if role == metapb.PeerRole_Voter || role == metapb.PeerRole_Learner {
req.ReplicaNumber++
}
}
req.ReplicaNumber = int64(s.countReplicaNumber(rpcCtx.Meta.GetPeers()))
}

var sessionID uint64
Expand Down
40 changes: 40 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,43 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch
s.NotNil(regionErr)
s.Equal(target, client.closedAddr)
}

func (s *testRegionRequestToSingleStoreSuite) TestCountReplicaNumber() {
fmt.Println("TestCountReplicaNumber")
fmt.Println(s.cache.storeMu.stores)
tikvLabels := []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}
tiflashLabels := []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}
tiflashWNLabels := []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}, {Key: "engine_role", Value: "write"}}

s.cache.SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 0, tikvLabels)
s.cache.SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 0, tikvLabels)
s.cache.SetRegionCacheStore(3, "", "", tikvrpc.TiKV, 0, tikvLabels)
s.cache.SetRegionCacheStore(4, "", "", tikvrpc.TiFlash, 0, tiflashLabels)
s.cache.SetRegionCacheStore(5, "", "", tikvrpc.TiFlash, 0, tiflashLabels)
{
peers := []*metapb.Peer{{StoreId: 1, Role: metapb.PeerRole_Voter}}
s.Equal(1, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 2, Role: metapb.PeerRole_Voter})
s.Equal(2, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 3, Role: metapb.PeerRole_Voter})
s.Equal(3, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 4, Role: metapb.PeerRole_Learner})
s.Equal(4, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 5, Role: metapb.PeerRole_Learner})
s.Equal(5, s.regionRequestSender.countReplicaNumber(peers))
}
s.cache.SetRegionCacheStore(4, "", "", tikvrpc.TiFlash, 0, tiflashWNLabels)
s.cache.SetRegionCacheStore(5, "", "", tikvrpc.TiFlash, 0, tiflashWNLabels)
{
peers := []*metapb.Peer{{StoreId: 1, Role: metapb.PeerRole_Voter}}
s.Equal(1, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 2, Role: metapb.PeerRole_Voter})
s.Equal(2, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 3, Role: metapb.PeerRole_Voter})
s.Equal(3, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 4, Role: metapb.PeerRole_Learner})
s.Equal(4, s.regionRequestSender.countReplicaNumber(peers))
peers = append(peers, &metapb.Peer{StoreId: 5, Role: metapb.PeerRole_Learner})
s.Equal(4, s.regionRequestSender.countReplicaNumber(peers)) // Only count 1 tiflash replica for tiflash write-nodes.
}
}

0 comments on commit a2f93a3

Please sign in to comment.