Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: balance region for batch cop task #24521

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
87e8db4
save work
windtalker Apr 27, 2021
6ad74a5
refine
windtalker Apr 28, 2021
d33d2a0
save work
windtalker May 6, 2021
60c3621
save work
windtalker May 6, 2021
fc86ee7
save work
windtalker May 6, 2021
0e4e72c
save work
windtalker May 7, 2021
9d7bb1e
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 7, 2021
0cf711a
save work
windtalker May 7, 2021
0bf0874
save work
windtalker May 7, 2021
39c8786
save work
windtalker May 7, 2021
9ffb7a6
merge master
windtalker May 10, 2021
ccbb3f3
refine
windtalker May 10, 2021
62f2a13
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 10, 2021
9f32a6b
refine
windtalker May 10, 2021
3a641fc
add comments
windtalker May 10, 2021
416378f
fix ci
windtalker May 11, 2021
b92fb5d
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
2986c4e
fix ci
windtalker May 11, 2021
c86d192
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
8af078f
refine
windtalker May 11, 2021
1859b5b
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
a7eb17f
address comments
windtalker May 11, 2021
1023411
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
09d01f6
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
d682af3
refine code
windtalker May 12, 2021
624abed
refine
windtalker May 12, 2021
56140a3
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 12, 2021
e1cdae2
address comments
windtalker May 12, 2021
2fbe914
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 12, 2021
ddd0700
merge master
windtalker May 14, 2021
8493913
address comments
windtalker May 14, 2021
5be4895
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 17, 2021
f1de8b4
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 17, 2021
3a76a3e
add some log
windtalker May 17, 2021
ee94b2d
Merge branch 'master' into balance_region_for_batch_cop_task
ti-chi-bot May 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 185 additions & 19 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package copr
import (
"context"
"io"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
Expand All @@ -40,8 +43,9 @@ import (
type batchCopTask struct {
storeAddr string
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext

copTasks []copTaskAndRPCContext
regionInfos []tikv.RegionInfo
}

type batchCopResponse struct {
Expand Down Expand Up @@ -93,9 +97,152 @@ func (rs *batchCopResponse) RespTime() time.Duration {
return rs.respTime
}

type copTaskAndRPCContext struct {
task *copTask
ctx *tikv.RPCContext
// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
batchTask := &batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
for index, ri := range task.regionInfos {
// for each region, figure out the valid store num
validStoreNum := 0
if index == 0 {
continue
}
if len(ri.AllStores) <= 1 {
validStoreNum = 1
} else {
for _, storeID := range ri.AllStores {
if _, ok := storeTaskMap[storeID]; ok {
validStoreNum++
}
}
}
if validStoreNum == 1 {
// if only one store is valid, just put it to storeTaskMap
storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri)
} else {
// if more than one store is valid, put the region
// to store candidate map
totalRegionCandidateNum += validStoreNum
totalRemainingRegionNum += 1
taskKey := ri.Region.String()
for _, storeID := range ri.AllStores {
if _, validStore := storeTaskMap[storeID]; !validStore {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]tikv.RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
fzhedu marked this conversation as resolved.
Show resolved Hide resolved
// duplicated region, should not happen, just give up balance
logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing")
return originalTasks
}
storeCandidateRegionMap[storeID][taskKey] = ri
}
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
ret = append(ret, task)
}
return ret
}

func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
Expand Down Expand Up @@ -138,13 +285,15 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx})
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
copTasks: []copTaskAndRPCContext{{task, rpcCtx}},
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand All @@ -159,9 +308,25 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
}
continue
}

for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
}
if log.GetLevel() <= zap.DebugLevel {
msg := "Before region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(batchTasks)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasks takes too much time",
Expand Down Expand Up @@ -311,25 +476,25 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) {
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *tikvstore.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks))
for _, task := range task.copTasks {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}

Expand All @@ -351,13 +516,14 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
})
req.StoreTp = tikvrpc.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks)))
resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong)
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
}
if err != nil {
err = derr.ToTiDBErr(err)
return nil, errors.Trace(err)
}
defer cancel()
Expand Down
14 changes: 7 additions & 7 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
var regionInfos []*coprocessor.RegionInfo
originalTask, ok := req.Meta.(*batchCopTask)
if ok {
for _, task := range originalTask.copTasks {
for _, ri := range originalTask.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}
}
Expand All @@ -214,8 +214,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium)
sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.SendReqToAddr(bo.TiKVBackoffer(), originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
Expand Down
Loading