Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: balance region for batch cop task #24521

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
87e8db4
save work
windtalker Apr 27, 2021
6ad74a5
refine
windtalker Apr 28, 2021
d33d2a0
save work
windtalker May 6, 2021
60c3621
save work
windtalker May 6, 2021
fc86ee7
save work
windtalker May 6, 2021
0e4e72c
save work
windtalker May 7, 2021
9d7bb1e
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 7, 2021
0cf711a
save work
windtalker May 7, 2021
0bf0874
save work
windtalker May 7, 2021
39c8786
save work
windtalker May 7, 2021
9ffb7a6
merge master
windtalker May 10, 2021
ccbb3f3
refine
windtalker May 10, 2021
62f2a13
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 10, 2021
9f32a6b
refine
windtalker May 10, 2021
3a641fc
add comments
windtalker May 10, 2021
416378f
fix ci
windtalker May 11, 2021
b92fb5d
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
2986c4e
fix ci
windtalker May 11, 2021
c86d192
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
8af078f
refine
windtalker May 11, 2021
1859b5b
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
a7eb17f
address comments
windtalker May 11, 2021
1023411
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
09d01f6
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 11, 2021
d682af3
refine code
windtalker May 12, 2021
624abed
refine
windtalker May 12, 2021
56140a3
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 12, 2021
e1cdae2
address comments
windtalker May 12, 2021
2fbe914
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 12, 2021
ddd0700
merge master
windtalker May 14, 2021
8493913
address comments
windtalker May 14, 2021
5be4895
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 17, 2021
f1de8b4
Merge branch 'master' of https://github.com/pingcap/tidb into balance…
windtalker May 17, 2021
3a76a3e
add some log
windtalker May 17, 2021
ee94b2d
Merge branch 'master' into balance_region_for_batch_cop_task
ti-chi-bot May 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 149 additions & 19 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package copr
import (
"context"
"io"
"math"
"sync"
"sync/atomic"
"time"
Expand All @@ -42,8 +43,9 @@ import (
type batchCopTask struct {
storeAddr string
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext

copTasks []copTaskAndRPCContext
regionInfos []tikv.RegionInfo
}

type batchCopResponse struct {
Expand Down Expand Up @@ -95,9 +97,133 @@ func (rs *batchCopResponse) RespTime() time.Duration {
return rs.respTime
}

type copTaskAndRPCContext struct {
task *copTask
ctx *tikv.RPCContext
// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
batchTask := &batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
candidateMap := make(map[string]tikv.RegionInfo)
storeCandidateRegionMap[taskStoreID] = candidateMap
}

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
for index, ri := range task.regionInfos {
// for each region, figure out the valid store num
validStoreNum := 0
if index == 0 {
continue
}
if len(ri.AllStores) <= 1 {
validStoreNum = 1
} else {
for _, storeID := range ri.AllStores {
if _, ok := storeCandidateRegionMap[storeID]; ok {
validStoreNum++
}
}
}
if validStoreNum == 1 {
// if only one store is valid, just put it to storeTaskMap
storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri)
} else {
// if more than one store is valid, put the region
// to store candidate map
totalRegionCandidateNum += validStoreNum
totalRemainingRegionNum += 1
taskKey := ri.Region.String()
for _, storeID := range ri.AllStores {
if candidateMap, ok := storeCandidateRegionMap[storeID]; ok {
if _, ok := candidateMap[taskKey]; ok {
// duplicated region, should not happen, just give up balance
return originalTasks
}
candidateMap[taskKey] = ri
}
}
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func() uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := float64(0)
for storeID := range storeTaskMap {
if store == uint64(math.MaxUint64) && len(storeCandidateRegionMap[storeID]) > 0 {
store = storeID
weightedRegionNum = float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
} else {
num := float64(len(storeCandidateRegionMap[storeID])) / avgStorePerRegion
if num == 0 {
continue
}
num += float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
}
return store
}
store := findNextStore()
for totalRemainingRegionNum > 0 {
if len(storeCandidateRegionMap[store]) == 0 {
store = findNextStore()
}
if store == uint64(math.MaxUint64) {
break
}
for key, ri := range storeCandidateRegionMap[store] {
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
}
}
if totalRemainingRegionNum > 0 {
weightedTaskNum := float64(len(storeCandidateRegionMap[store]))/avgStorePerRegion + float64(len(storeTaskMap[store].regionInfos))
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
for _, id := range ri.AllStores {
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedTaskNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
if id != store && len(storeCandidateRegionMap[id]) > 0 && float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion+float64(len(storeTaskMap[id].regionInfos)) <= weightedTaskNum {
store = id
weightedTaskNum = float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion + float64(len(storeTaskMap[id].regionInfos))
}
}
}
break
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor these code to make it clearer. It is a typical problem, may be a better algo to solve it.

Copy link
Member

@hanfei1991 hanfei1991 May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, the optimistic algo is equal to a match problem or max flow problem, of which time complexity is expensive(O(V^2) for match prob and O(V^3) for max-flow prob). So the greedy method is ok with me.


var ret []*batchCopTask
for _, task := range storeTaskMap {
ret = append(ret, task)
}
return ret
}

func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
Expand Down Expand Up @@ -138,13 +264,15 @@ func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.Key
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx})
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
copTasks: []copTaskAndRPCContext{{task, rpcCtx}},
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand All @@ -157,9 +285,11 @@ func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.Key
}
continue
}

for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
}
batchTasks = balanceBatchCopTask(batchTasks)

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasks takes too much time",
Expand Down Expand Up @@ -315,25 +445,25 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task *
// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) {
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *tikvstore.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks))
for _, task := range task.copTasks {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}

Expand All @@ -355,8 +485,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, ta
})
req.StoreTp = tikvrpc.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks)))
resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong)
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
Expand Down
14 changes: 7 additions & 7 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req
var regionInfos []*coprocessor.RegionInfo
originalTask, ok := req.Meta.(*batchCopTask)
if ok {
for _, task := range originalTask.copTasks {
for _, ri := range originalTask.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}
}
Expand All @@ -218,8 +218,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium)
sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.SendReqToAddr(bo.TiKVBackoffer(), originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,59 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package copr
package tikv
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we change the package belonging? batch cop only involves cop requests, so putting it copr package is reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2 reasons I move the package:

  1. request_sender.go is in tikv package, and since batch_request_sender is also a sender, it is reasonable to move it to the tikv package(although it actually send request to TiFlash)
  2. I add a new method onSendFailForBatchRegions in region_cache.go, and it will take []RegionInfo as its argument, since RegionInfo is only used in batchCopTask, its definition is in batch_request_sender.go, and if batch_request_sender.go is in copr package, it will introduce circular dependency


import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/kvproto/pkg/metapb"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type RegionInfo struct {
Region RegionVerID
Meta *metapb.Region
Ranges *KeyRanges
AllStores []uint64
}

// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
type RegionBatchRequestSender struct {
*tikv.RegionRequestSender
*RegionRequestSender
}

// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: tikv.NewRegionRequestSender(cache, client),
RegionRequestSender: NewRegionRequestSender(cache, client),
}
}

func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
// use the first ctx to send request, because every ctx has same address.
cancel = func() {}
rpcCtx := ctxs[0].ctx
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, cancel, errors.Trace(e)
}
ctx := bo.GetCtx()
if rawHook := ctx.Value(tikv.RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*tikv.RPCCanceller).WithCancel(ctx)
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
}
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil {
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}
if err != nil {
cancel()
ss.SetRPCError(err)
e := ss.onSendFail(bo, ctxs, err)
e := ss.onSendFailForBatchRegions(bo, rpcCtx, regionInfos, err)
if e != nil {
return nil, false, func() {}, errors.Trace(e)
}
Expand All @@ -67,30 +73,25 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []co
return
}

func (ss *RegionBatchRequestSender) onSendFail(bo *backoffer, ctxs []copTaskAndRPCContext, err error) error {
func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *RPCContext, regionInfos []RegionInfo, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
} else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 {
} else if atomic.LoadUint32(&ShuttingDown) > 0 {
return tikverr.ErrTiDBShuttingDown
}

for _, failedCtx := range ctxs {
ctx := failedCtx.ctx
if ctx.Meta != nil {
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
ss.GetRegionCache().OnSendFail(bo.TiKVBackoffer(), ctx, true, err)
}
}
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs))
err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
return errors.Trace(err)
}
Loading