From 87e8db4f5cded9382d1ba4cd8782cd6e622057da Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 27 Apr 2021 17:49:13 +0800 Subject: [PATCH 01/21] save work --- store/copr/batch_coprocessor.go | 56 +++++++++++++++++++++++++++--- store/copr/batch_request_sender.go | 3 +- store/tikv/region_cache.go | 37 +++++++++++++++----- store/tikv/region_cache_test.go | 2 +- store/tikv/region_request.go | 2 ++ 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index cec3e49644363..4b2cec4f0d811 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -95,8 +95,52 @@ func (rs *batchCopResponse) RespTime() time.Duration { } type copTaskAndRPCContext struct { - task *copTask - ctx *tikv.RPCContext + task *copTask + allStoreAddrs []string + ctx *tikv.RPCContext +} + +func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { + storeTaskMap := make(map[string]*batchCopTask) + for _, task := range originalTasks { + for index, copTask := range task.copTasks { + if index == 0 || len(copTask.allStoreAddrs) <= 1 { + if batchCop, ok := storeTaskMap[task.storeAddr]; ok { + batchCop.copTasks = append(batchCop.copTasks, copTask) + } else { + batchTask := &batchCopTask{ + storeAddr: task.storeAddr, + cmdType: task.cmdType, + copTasks: []copTaskAndRPCContext{copTask}, + } + storeTaskMap[task.storeAddr] = batchTask + } + } + } + } + for _, task := range originalTasks { + for index, copTask := range task.copTasks { + if index != 0 && len(copTask.allStoreAddrs) > 1 { + bestAddr := "" + for _, storeAddr := range copTask.allStoreAddrs { + if _, ok := storeTaskMap[storeAddr]; ok { + if bestAddr == "" { + bestAddr = storeAddr + } else if len(storeTaskMap[bestAddr].copTasks) > len(storeTaskMap[storeAddr].copTasks) { + bestAddr = storeAddr + } + } + } + storeTaskMap[bestAddr].copTasks = append(storeTaskMap[bestAddr].copTasks, copTask) + } + } + } + + var ret []*batchCopTask + for _, task := range storeTaskMap { + ret = append(ret, task) + } + return ret } func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) { @@ -124,7 +168,7 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik storeTaskMap := make(map[string]*batchCopTask) needRetry := false for _, task := range tasks { - rpcCtx, err := cache.GetTiFlashRPCContext(bo, task.region, false) + rpcCtx, allStoreAddr, err := cache.GetTiFlashRPCContext(bo, task.region, false, true) if err != nil { return nil, errors.Trace(err) } @@ -138,12 +182,12 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik continue } if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { - batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx}) + batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, allStoreAddrs: allStoreAddr, ctx: rpcCtx}) } else { batchTask := &batchCopTask{ storeAddr: rpcCtx.Addr, cmdType: cmdType, - copTasks: []copTaskAndRPCContext{{task, rpcCtx}}, + copTasks: []copTaskAndRPCContext{{task, allStoreAddr, rpcCtx}}, } storeTaskMap[rpcCtx.Addr] = batchTask } @@ -156,9 +200,11 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik } continue } + for _, task := range storeTaskMap { batchTasks = append(batchTasks, task) } + batchTasks = balanceBatchCopTask(batchTasks) if elapsed := time.Since(start); elapsed > time.Millisecond*500 { logutil.BgLogger().Warn("buildBatchCopTasks takes too much time", diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 6a48d27fbb16a..0562e8ae9e4f2 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -75,9 +75,10 @@ func (ss *RegionBatchRequestSender) onSendFail(bo *tikv.Backoffer, ctxs []copTas return tikverr.ErrTiDBShuttingDown } + addr := ctxs[0].ctx.Addr for _, failedCtx := range ctxs { ctx := failedCtx.ctx - if ctx.Meta != nil { + if ctx.Meta != nil && ctx.Addr == addr { ss.GetRegionCache().OnSendFail(bo, ctx, ss.NeedReloadRegion(ctx), err) } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 9adfa65dd316a..ad295ec8fd241 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -526,15 +526,16 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe // GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region // must be out of date and already dropped from cache or not flash store found. // `loadBalance` is an option. For MPP and batch cop, it is pointless and might cause try the failed store repeatly. -func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBalance bool) (*RPCContext, error) { +func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBalance bool, collectAllStoreAddrs bool) (*RPCContext, []string, error) { ts := time.Now().Unix() + allStoreAddrs := make([]string, 0, 1) cachedRegion := c.getCachedRegionWithRLock(id) if cachedRegion == nil { - return nil, nil + return nil, nil, nil } if !cachedRegion.checkRegionCacheTTL(ts) { - return nil, nil + return nil, nil, nil } regionStore := cachedRegion.getStore() @@ -551,11 +552,11 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBa storeIdx, store := regionStore.accessStore(TiFlashOnly, accessIdx) addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { - return nil, err + return nil, nil, err } if len(addr) == 0 { cachedRegion.invalidate(StoreNotFound) - return nil, nil + return nil, nil, nil } if store.getResolveState() == needCheck { _, err := store.reResolve(c) @@ -572,7 +573,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBa // TiFlash will always try to find out a valid peer, avoiding to retry too many times. continue } - return &RPCContext{ + rpcContex := &RPCContext{ Region: id, Meta: cachedRegion.meta, Peer: peer, @@ -581,11 +582,31 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBa Addr: addr, AccessMode: TiFlashOnly, TiKVNum: regionStore.accessStoreNum(TiKVOnly), - }, nil + } + if collectAllStoreAddrs { + allStoreAddrs = append(allStoreAddrs, addr) + for startOffset := 1; startOffset < regionStore.accessStoreNum(TiFlashOnly); startOffset++ { + accessIdx = AccessIndex((sIdx + i + startOffset) % regionStore.accessStoreNum(TiFlashOnly)) + storeIdx, store = regionStore.accessStore(TiFlashOnly, accessIdx) + addr, err = c.getStoreAddr(bo, cachedRegion, store, storeIdx) + if err != nil { + continue + } + if len(addr) == 0 { + continue + } + if store.getResolveState() == needCheck { + continue + } + allStoreAddrs = append(allStoreAddrs, addr) + } + return rpcContex, allStoreAddrs, nil + } + return rpcContex, nil, nil } cachedRegion.invalidate(Other) - return nil, nil + return nil, nil, nil } // KeyLocation is the region and range that a key is located. diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 592541eb5b9d2..ebfb7d95a14e4 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -875,7 +875,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { c.Assert(lctx.Peer.Id, Equals, peer3) // epoch-not-match on tiflash - ctxTiFlash, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region, true) + ctxTiFlash, _, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region, true, false) c.Assert(err, IsNil) c.Assert(ctxTiFlash.Peer.Id, Equals, s.peer1) ctxTiFlash.Peer.Role = metapb.PeerRole_Learner diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index cad0ed0379e96..550bbacca4bf1 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -203,6 +203,8 @@ func (s *RegionRequestSender) getRPCContext( return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...) case tikvrpc.TiFlash: return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) + context, _, err := s.regionCache.GetTiFlashRPCContext(bo, regionID, true, false) + return context, err case tikvrpc.TiDB: return &RPCContext{Addr: s.storeAddr}, nil default: From 6ad74a55a080adf2c0eed23de5ae6de28817f695 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 28 Apr 2021 16:40:28 +0800 Subject: [PATCH 02/21] refine --- store/copr/batch_coprocessor.go | 113 ++++++++++++++++++++++++++------ 1 file changed, 93 insertions(+), 20 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 4b2cec4f0d811..23b3548e0cd12 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -102,40 +102,113 @@ type copTaskAndRPCContext struct { func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { storeTaskMap := make(map[string]*batchCopTask) + storeCandidateTaskMap := make(map[string]map[string]copTaskAndRPCContext) + totalCandidateStoreNum := 0 + totalCandidateCopTaskNum := 0 for _, task := range originalTasks { - for index, copTask := range task.copTasks { - if index == 0 || len(copTask.allStoreAddrs) <= 1 { - if batchCop, ok := storeTaskMap[task.storeAddr]; ok { - batchCop.copTasks = append(batchCop.copTasks, copTask) - } else { - batchTask := &batchCopTask{ - storeAddr: task.storeAddr, - cmdType: task.cmdType, - copTasks: []copTaskAndRPCContext{copTask}, - } - storeTaskMap[task.storeAddr] = batchTask - } - } + batchTask := &batchCopTask{ + storeAddr: task.storeAddr, + cmdType: task.cmdType, + copTasks: []copTaskAndRPCContext{task.copTasks[0]}, } + storeTaskMap[task.storeAddr] = batchTask } for _, task := range originalTasks { for index, copTask := range task.copTasks { - if index != 0 && len(copTask.allStoreAddrs) > 1 { - bestAddr := "" + // for each cop task, figure out the valid store num + validStoreNum := 0 + if index == 0 { + continue + } + if len(copTask.allStoreAddrs) <= 1 { + validStoreNum = 1 + } else { for _, storeAddr := range copTask.allStoreAddrs { if _, ok := storeTaskMap[storeAddr]; ok { - if bestAddr == "" { - bestAddr = storeAddr - } else if len(storeTaskMap[bestAddr].copTasks) > len(storeTaskMap[storeAddr].copTasks) { - bestAddr = storeAddr + validStoreNum++ + } + } + } + if validStoreNum == 1 { + // if only one store is valid, just put it to storeTaskMap + storeTaskMap[task.storeAddr].copTasks = append(storeTaskMap[task.storeAddr].copTasks, copTask) + } else { + // if more than one store is valid, put the cop task + // to store candidate map + totalCandidateStoreNum += validStoreNum + totalCandidateCopTaskNum += 1 + /// put this cop task to candidate task map + taskKey := copTask.task.region.String() + for _, storeAddr := range copTask.allStoreAddrs { + if candidateMap, ok := storeCandidateTaskMap[storeAddr]; ok { + if _, ok := candidateMap[taskKey]; ok { + // duplicated region, should not happen, just give up balance + return originalTasks } + candidateMap[taskKey] = copTask + } else { + candidateMap := make(map[string]copTaskAndRPCContext) + candidateMap[taskKey] = copTask + storeCandidateTaskMap[storeAddr] = candidateMap } } - storeTaskMap[bestAddr].copTasks = append(storeTaskMap[bestAddr].copTasks, copTask) } } } + avgStorePerTask := float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) + findNextStore := func() (string, float64) { + store := "" + possibleTaskNum := float64(0) + for storeAddr := range storeTaskMap { + if store == "" && len(storeCandidateTaskMap[storeAddr]) > 0 { + store = storeAddr + possibleTaskNum = float64(len(storeCandidateTaskMap[storeAddr]))/avgStorePerTask + float64(len(storeTaskMap[storeAddr].copTasks)) + } else { + num := float64(len(storeCandidateTaskMap[storeAddr])) / avgStorePerTask + if num == 0 { + continue + } + num += float64(len(storeTaskMap[storeAddr].copTasks)) + if num < possibleTaskNum { + store = storeAddr + possibleTaskNum = num + } + } + } + return store, possibleTaskNum + } + if totalCandidateStoreNum == 0 { + return originalTasks + } + store, possibleTaskNum := findNextStore() + for totalCandidateStoreNum > 0 { + if len(storeCandidateTaskMap[store]) == 0 { + store, possibleTaskNum = findNextStore() + } + for key, copTask := range storeCandidateTaskMap[store] { + storeTaskMap[store].copTasks = append(storeTaskMap[store].copTasks, copTask) + totalCandidateCopTaskNum-- + for _, addr := range copTask.allStoreAddrs { + if _, ok := storeCandidateTaskMap[addr]; ok { + delete(storeCandidateTaskMap[addr], key) + totalCandidateStoreNum-- + } + } + if totalCandidateCopTaskNum > 0 { + possibleTaskNum = float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].copTasks)) + avgStorePerTask = float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) + for _, addr := range copTask.allStoreAddrs { + if addr != store && len(storeCandidateTaskMap[addr]) > 0 && float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask+float64(len(storeTaskMap[addr].copTasks)) <= possibleTaskNum { + store = addr + possibleTaskNum = float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask + float64(len(storeTaskMap[addr].copTasks)) + } + } + } + break + } + } + var ret []*batchCopTask for _, task := range storeTaskMap { ret = append(ret, task) From d33d2a082d2fe8ddf72d03195a234343da01998e Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 6 May 2021 14:15:37 +0800 Subject: [PATCH 03/21] save work --- store/copr/batch_coprocessor.go | 78 ++++++++++---------- store/copr/mpp.go | 14 ++-- store/{copr => tikv}/batch_request_sender.go | 47 ++++++------ store/tikv/region_cache.go | 72 ++++++++++++++++++ store/tikv/region_request.go | 1 - 5 files changed, 141 insertions(+), 71 deletions(-) rename store/{copr => tikv}/batch_request_sender.go (55%) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 23b3548e0cd12..b55c3f230450e 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -41,8 +41,9 @@ import ( type batchCopTask struct { storeAddr string cmdType tikvrpc.CmdType + ctx *tikv.RPCContext - copTasks []copTaskAndRPCContext + regionInfos []tikv.RegionInfo } type batchCopResponse struct { @@ -102,28 +103,28 @@ type copTaskAndRPCContext struct { func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { storeTaskMap := make(map[string]*batchCopTask) - storeCandidateTaskMap := make(map[string]map[string]copTaskAndRPCContext) + storeCandidateTaskMap := make(map[string]map[string]tikv.RegionInfo) totalCandidateStoreNum := 0 totalCandidateCopTaskNum := 0 for _, task := range originalTasks { batchTask := &batchCopTask{ - storeAddr: task.storeAddr, - cmdType: task.cmdType, - copTasks: []copTaskAndRPCContext{task.copTasks[0]}, + storeAddr: task.storeAddr, + cmdType: task.cmdType, + regionInfos: []tikv.RegionInfo{task.regionInfos[0]}, } storeTaskMap[task.storeAddr] = batchTask } for _, task := range originalTasks { - for index, copTask := range task.copTasks { + for index, ri := range task.regionInfos { // for each cop task, figure out the valid store num validStoreNum := 0 if index == 0 { continue } - if len(copTask.allStoreAddrs) <= 1 { + if len(ri.AllStoreAddrs) <= 1 { validStoreNum = 1 } else { - for _, storeAddr := range copTask.allStoreAddrs { + for _, storeAddr := range ri.AllStoreAddrs { if _, ok := storeTaskMap[storeAddr]; ok { validStoreNum++ } @@ -131,24 +132,24 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } if validStoreNum == 1 { // if only one store is valid, just put it to storeTaskMap - storeTaskMap[task.storeAddr].copTasks = append(storeTaskMap[task.storeAddr].copTasks, copTask) + storeTaskMap[task.storeAddr].regionInfos = append(storeTaskMap[task.storeAddr].regionInfos, ri) } else { // if more than one store is valid, put the cop task // to store candidate map totalCandidateStoreNum += validStoreNum totalCandidateCopTaskNum += 1 /// put this cop task to candidate task map - taskKey := copTask.task.region.String() - for _, storeAddr := range copTask.allStoreAddrs { + taskKey := ri.Region.String() + for _, storeAddr := range ri.AllStoreAddrs { if candidateMap, ok := storeCandidateTaskMap[storeAddr]; ok { if _, ok := candidateMap[taskKey]; ok { // duplicated region, should not happen, just give up balance return originalTasks } - candidateMap[taskKey] = copTask + candidateMap[taskKey] = ri } else { - candidateMap := make(map[string]copTaskAndRPCContext) - candidateMap[taskKey] = copTask + candidateMap := make(map[string]tikv.RegionInfo) + candidateMap[taskKey] = ri storeCandidateTaskMap[storeAddr] = candidateMap } } @@ -163,13 +164,13 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { for storeAddr := range storeTaskMap { if store == "" && len(storeCandidateTaskMap[storeAddr]) > 0 { store = storeAddr - possibleTaskNum = float64(len(storeCandidateTaskMap[storeAddr]))/avgStorePerTask + float64(len(storeTaskMap[storeAddr].copTasks)) + possibleTaskNum = float64(len(storeCandidateTaskMap[storeAddr]))/avgStorePerTask + float64(len(storeTaskMap[storeAddr].regionInfos)) } else { num := float64(len(storeCandidateTaskMap[storeAddr])) / avgStorePerTask if num == 0 { continue } - num += float64(len(storeTaskMap[storeAddr].copTasks)) + num += float64(len(storeTaskMap[storeAddr].regionInfos)) if num < possibleTaskNum { store = storeAddr possibleTaskNum = num @@ -186,22 +187,22 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { if len(storeCandidateTaskMap[store]) == 0 { store, possibleTaskNum = findNextStore() } - for key, copTask := range storeCandidateTaskMap[store] { - storeTaskMap[store].copTasks = append(storeTaskMap[store].copTasks, copTask) + for key, ri := range storeCandidateTaskMap[store] { + storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) totalCandidateCopTaskNum-- - for _, addr := range copTask.allStoreAddrs { + for _, addr := range ri.AllStoreAddrs { if _, ok := storeCandidateTaskMap[addr]; ok { delete(storeCandidateTaskMap[addr], key) totalCandidateStoreNum-- } } if totalCandidateCopTaskNum > 0 { - possibleTaskNum = float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].copTasks)) + possibleTaskNum = float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].regionInfos)) avgStorePerTask = float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) - for _, addr := range copTask.allStoreAddrs { - if addr != store && len(storeCandidateTaskMap[addr]) > 0 && float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask+float64(len(storeTaskMap[addr].copTasks)) <= possibleTaskNum { + for _, addr := range ri.AllStoreAddrs { + if addr != store && len(storeCandidateTaskMap[addr]) > 0 && float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask+float64(len(storeTaskMap[addr].regionInfos)) <= possibleTaskNum { store = addr - possibleTaskNum = float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask + float64(len(storeTaskMap[addr].copTasks)) + possibleTaskNum = float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask + float64(len(storeTaskMap[addr].regionInfos)) } } } @@ -255,12 +256,13 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik continue } if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { - batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, allStoreAddrs: allStoreAddr, ctx: rpcCtx}) + batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{task.region, rpcCtx.Meta, task.ranges, allStoreAddr}) } else { batchTask := &batchCopTask{ - storeAddr: rpcCtx.Addr, - cmdType: cmdType, - copTasks: []copTaskAndRPCContext{{task, allStoreAddr, rpcCtx}}, + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStoreAddr}}, } storeTaskMap[rpcCtx.Addr] = batchTask } @@ -433,8 +435,8 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *tikv.Backoffer, t // Merge all ranges and request again. func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *tikv.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { var ranges []tikvstore.KeyRange - for _, taskCtx := range batchTask.copTasks { - taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) { + for _, ri := range batchTask.regionInfos { + ri.Ranges.Do(func(ran *tikvstore.KeyRange) { ranges = append(ranges, *ran) }) } @@ -442,16 +444,16 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *tikv.Backo } func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) - var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks)) - for _, task := range task.copTasks { + sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) + var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) + for _, ri := range task.regionInfos { regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: task.task.region.GetID(), + RegionId: ri.Region.GetID(), RegionEpoch: &metapb.RegionEpoch{ - ConfVer: task.task.region.GetConfVer(), - Version: task.task.region.GetVer(), + ConfVer: ri.Region.GetConfVer(), + Version: ri.Region.GetVer(), }, - Ranges: task.task.ranges.ToPBRanges(), + Ranges: ri.Ranges.ToPBRanges(), }) } @@ -473,8 +475,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffe }) req.StoreTp = tikvrpc.TiFlash - logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks))) - resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong) + logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos))) + resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong) // If there are store errors, we should retry for all regions. if retry { return b.retryBatchCopTask(ctx, bo, task) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 74fe82627a036..2492d483b12a0 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -183,14 +183,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, var regionInfos []*coprocessor.RegionInfo originalTask, ok := req.Meta.(*batchCopTask) if ok { - for _, task := range originalTask.copTasks { + for _, ri := range originalTask.regionInfos { regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: task.task.region.GetID(), + RegionId: ri.Region.GetID(), RegionEpoch: &metapb.RegionEpoch{ - ConfVer: task.task.region.GetConfVer(), - Version: task.task.region.GetVer(), + ConfVer: ri.Region.GetConfVer(), + Version: ri.Region.GetVer(), }, - Ranges: task.task.ranges.ToPBRanges(), + Ranges: ri.Ranges.ToPBRanges(), }) } } @@ -217,8 +217,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { - sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient()) - rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium) + sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient()) + rpcResp, _, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. diff --git a/store/copr/batch_request_sender.go b/store/tikv/batch_request_sender.go similarity index 55% rename from store/copr/batch_request_sender.go rename to store/tikv/batch_request_sender.go index 0562e8ae9e4f2..2098a6b736cc3 100644 --- a/store/copr/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package copr +package tikv import ( "context" @@ -19,45 +19,52 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/kvproto/pkg/metapb" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/tikvrpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +type RegionInfo struct { + Region RegionVerID + Meta *metapb.Region + Ranges *KeyRanges + AllStoreAddrs []string +} + // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. type RegionBatchRequestSender struct { - *tikv.RegionRequestSender + *RegionRequestSender } // NewRegionBatchRequestSender creates a RegionBatchRequestSender object. -func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *RegionBatchRequestSender { +func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender { return &RegionBatchRequestSender{ - RegionRequestSender: tikv.NewRegionRequestSender(cache, client), + RegionRequestSender: NewRegionRequestSender(cache, client), } } -func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *tikv.Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { +// SendReqToAddr sends a request to tikv/tiflash server. +func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { // use the first ctx to send request, because every ctx has same address. cancel = func() {} - rpcCtx := ctxs[0].ctx if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, false, cancel, errors.Trace(e) } ctx := bo.GetCtx() - if rawHook := ctx.Value(tikv.RPCCancellerCtxKey{}); rawHook != nil { - ctx, cancel = rawHook.(*tikv.RPCCanceller).WithCancel(ctx) + if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { + ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) } start := time.Now() resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout) if ss.Stats != nil { - tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) + RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) } if err != nil { cancel() ss.SetRPCError(err) - e := ss.onSendFail(bo, ctxs, err) + e := ss.onSendFailForBatchRegions(bo, rpcCtx, regionInfos, err) if e != nil { return nil, false, func() {}, errors.Trace(e) } @@ -67,26 +74,16 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *tikv.Backoffer, ctxs return } -func (ss *RegionBatchRequestSender) onSendFail(bo *tikv.Backoffer, ctxs []copTaskAndRPCContext, err error) error { +func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *RPCContext, regionInfos []RegionInfo, err error) error { // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { return errors.Trace(err) - } else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 { + } else if atomic.LoadUint32(&ShuttingDown) > 0 { return tikverr.ErrTiDBShuttingDown } - addr := ctxs[0].ctx.Addr - for _, failedCtx := range ctxs { - ctx := failedCtx.ctx - if ctx.Meta != nil && ctx.Addr == addr { - ss.GetRegionCache().OnSendFail(bo, ctx, ss.NeedReloadRegion(ctx), err) - } - } + ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, ss.NeedReloadRegion(ctx), err) - // Retry on send request failure when it's not canceled. - // When a store is not available, the leader of related region should be elected quickly. - // TODO: the number of retry time should be limited:since region may be unavailable - // when some unrecoverable disaster happened. - err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs)) + err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctx: %v, regionInfos: %v, try next peer later", err, ctx, regionInfos)) return errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index ad295ec8fd241..cbf11a96417ac 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -111,6 +111,15 @@ func (r *RegionStore) accessStore(mode AccessMode, idx AccessIndex) (int, *Store return sidx, r.stores[sidx] } +func (r *RegionStore) getAccessIndex(mode AccessMode, store *Store) AccessIndex { + for index, sidx := range r.accessIndex[mode] { + if r.stores[sidx].storeID == store.storeID { + return AccessIndex(index) + } + } + return -1 +} + func (r *RegionStore) accessStoreNum(mode AccessMode) int { return len(r.accessIndex[mode]) } @@ -686,6 +695,69 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) return r, nil } +// OnSendFailForBatchRegions handles send request fail logic. +func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, regionInfos []RegionInfo, scheduleReload bool, err error) { + metrics.RegionCacheCounterWithSendFail.Inc() + if store.storeType != tikvrpc.TiFlash { + logutil.Logger(bo.ctx).Warn("OnSendFailForBatchRegions only support TiFlash") + return + } + for _, ri := range regionInfos { + if ri.Meta == nil { + continue + } + r := c.getCachedRegionWithRLock(ri.Region) + if r != nil { + peersNum := len(r.meta.Peers) + if len(ri.Meta.Peers) != peersNum { + logutil.Logger(bo.ctx).Info("retry and refresh current region after send request fail and up/down stores length changed", + zap.Stringer("current", &ri.Region), + zap.Bool("needReload", scheduleReload), + zap.Reflect("oldPeers", ri.Meta.Peers), + zap.Reflect("newPeers", r.meta.Peers), + zap.Error(err)) + continue + } + + rs := r.getStore() + + accessMode := TiFlashOnly + accessIdx := rs.getAccessIndex(accessMode, store) + if accessIdx == -1 { + logutil.Logger(bo.ctx).Warn("OnSendFailForBatchRegions can not get access index for region " + ri.Region.String()) + continue + } + if err != nil { + storeIdx, s := rs.accessStore(accessMode, accessIdx) + // Mark the store as failure if it's not a redirection request because we + // can't know the status of the proxy store by it. + // send fail but store is reachable, keep retry current peer for replica leader request. + // but we still need switch peer for follower-read or learner-read(i.e. tiflash) + // invalidate regions in store. + epoch := rs.storeEpochs[storeIdx] + if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { + logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) + metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() + } + // schedule a store addr resolve. + s.markNeedCheck(c.notifyCheckCh) + } + + // try next peer to found new leader. + rs.switchNextFlashPeer(r, accessIdx) + logutil.Logger(bo.ctx).Info("switch region tiflash peer to next due to send request fail", + zap.Stringer("current", &ri.Region), + zap.Bool("needReload", scheduleReload), + zap.Error(err)) + + // force reload region when retry all known peers in region. + if scheduleReload { + r.scheduleReload() + } + } + } +} + // OnSendFail handles send request fail logic. func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) { metrics.RegionCacheCounterWithSendFail.Inc() diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 550bbacca4bf1..e98db093e5d4f 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -202,7 +202,6 @@ func (s *RegionRequestSender) getRPCContext( } return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...) case tikvrpc.TiFlash: - return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) context, _, err := s.regionCache.GetTiFlashRPCContext(bo, regionID, true, false) return context, err case tikvrpc.TiDB: From 60c36211ca56cb1362ff4ef8e022d050eb1f1990 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 6 May 2021 15:36:41 +0800 Subject: [PATCH 04/21] save work --- store/copr/batch_coprocessor.go | 61 ++++++++++++++------------- store/tikv/batch_request_sender.go | 8 ++-- store/tikv/region_cache.go | 68 +++++++++++++++++------------- store/tikv/region_cache_test.go | 2 +- store/tikv/region_request.go | 3 +- 5 files changed, 77 insertions(+), 65 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index b55c3f230450e..ccd3ccba062d1 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -16,6 +16,7 @@ package copr import ( "context" "io" + "math" "sync" "sync/atomic" "time" @@ -102,8 +103,8 @@ type copTaskAndRPCContext struct { } func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { - storeTaskMap := make(map[string]*batchCopTask) - storeCandidateTaskMap := make(map[string]map[string]tikv.RegionInfo) + storeTaskMap := make(map[uint64]*batchCopTask) + storeCandidateTaskMap := make(map[uint64]map[string]tikv.RegionInfo) totalCandidateStoreNum := 0 totalCandidateCopTaskNum := 0 for _, task := range originalTasks { @@ -112,27 +113,28 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { cmdType: task.cmdType, regionInfos: []tikv.RegionInfo{task.regionInfos[0]}, } - storeTaskMap[task.storeAddr] = batchTask + storeTaskMap[task.regionInfos[0].AllStores[0]] = batchTask } for _, task := range originalTasks { + taskStoreID := task.regionInfos[0].AllStores[0] for index, ri := range task.regionInfos { // for each cop task, figure out the valid store num validStoreNum := 0 if index == 0 { continue } - if len(ri.AllStoreAddrs) <= 1 { + if len(ri.AllStores) <= 1 { validStoreNum = 1 } else { - for _, storeAddr := range ri.AllStoreAddrs { - if _, ok := storeTaskMap[storeAddr]; ok { + for _, storeID := range ri.AllStores { + if _, ok := storeTaskMap[storeID]; ok { validStoreNum++ } } } if validStoreNum == 1 { // if only one store is valid, just put it to storeTaskMap - storeTaskMap[task.storeAddr].regionInfos = append(storeTaskMap[task.storeAddr].regionInfos, ri) + storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri) } else { // if more than one store is valid, put the cop task // to store candidate map @@ -140,8 +142,8 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { totalCandidateCopTaskNum += 1 /// put this cop task to candidate task map taskKey := ri.Region.String() - for _, storeAddr := range ri.AllStoreAddrs { - if candidateMap, ok := storeCandidateTaskMap[storeAddr]; ok { + for _, storeID := range ri.AllStores { + if candidateMap, ok := storeCandidateTaskMap[storeID]; ok { if _, ok := candidateMap[taskKey]; ok { // duplicated region, should not happen, just give up balance return originalTasks @@ -150,7 +152,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } else { candidateMap := make(map[string]tikv.RegionInfo) candidateMap[taskKey] = ri - storeCandidateTaskMap[storeAddr] = candidateMap + storeCandidateTaskMap[storeID] = candidateMap } } } @@ -158,21 +160,21 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } avgStorePerTask := float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) - findNextStore := func() (string, float64) { - store := "" + findNextStore := func() (uint64, float64) { + store := uint64(math.MaxUint64) possibleTaskNum := float64(0) - for storeAddr := range storeTaskMap { - if store == "" && len(storeCandidateTaskMap[storeAddr]) > 0 { - store = storeAddr - possibleTaskNum = float64(len(storeCandidateTaskMap[storeAddr]))/avgStorePerTask + float64(len(storeTaskMap[storeAddr].regionInfos)) + for storeID := range storeTaskMap { + if store == uint64(math.MaxUint64) && len(storeCandidateTaskMap[storeID]) > 0 { + store = storeID + possibleTaskNum = float64(len(storeCandidateTaskMap[storeID]))/avgStorePerTask + float64(len(storeTaskMap[storeID].regionInfos)) } else { - num := float64(len(storeCandidateTaskMap[storeAddr])) / avgStorePerTask + num := float64(len(storeCandidateTaskMap[storeID])) / avgStorePerTask if num == 0 { continue } - num += float64(len(storeTaskMap[storeAddr].regionInfos)) + num += float64(len(storeTaskMap[storeID].regionInfos)) if num < possibleTaskNum { - store = storeAddr + store = storeID possibleTaskNum = num } } @@ -190,19 +192,19 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { for key, ri := range storeCandidateTaskMap[store] { storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) totalCandidateCopTaskNum-- - for _, addr := range ri.AllStoreAddrs { - if _, ok := storeCandidateTaskMap[addr]; ok { - delete(storeCandidateTaskMap[addr], key) + for _, id := range ri.AllStores { + if _, ok := storeCandidateTaskMap[id]; ok { + delete(storeCandidateTaskMap[id], key) totalCandidateStoreNum-- } } if totalCandidateCopTaskNum > 0 { possibleTaskNum = float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].regionInfos)) avgStorePerTask = float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) - for _, addr := range ri.AllStoreAddrs { - if addr != store && len(storeCandidateTaskMap[addr]) > 0 && float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask+float64(len(storeTaskMap[addr].regionInfos)) <= possibleTaskNum { - store = addr - possibleTaskNum = float64(len(storeCandidateTaskMap[addr]))/avgStorePerTask + float64(len(storeTaskMap[addr].regionInfos)) + for _, id := range ri.AllStores { + if id != store && len(storeCandidateTaskMap[id]) > 0 && float64(len(storeCandidateTaskMap[id]))/avgStorePerTask+float64(len(storeTaskMap[id].regionInfos)) <= possibleTaskNum { + store = id + possibleTaskNum = float64(len(storeCandidateTaskMap[id]))/avgStorePerTask + float64(len(storeTaskMap[id].regionInfos)) } } } @@ -242,7 +244,7 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik storeTaskMap := make(map[string]*batchCopTask) needRetry := false for _, task := range tasks { - rpcCtx, allStoreAddr, err := cache.GetTiFlashRPCContext(bo, task.region, false, true) + rpcCtx, err := cache.GetTiFlashRPCContext(bo, task.region, false) if err != nil { return nil, errors.Trace(err) } @@ -255,14 +257,15 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik // Then `splitRegion` will reloads these regions. continue } + allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store) if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { - batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{task.region, rpcCtx.Meta, task.ranges, allStoreAddr}) + batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{task.region, rpcCtx.Meta, task.ranges, allStores}) } else { batchTask := &batchCopTask{ storeAddr: rpcCtx.Addr, cmdType: cmdType, ctx: rpcCtx, - regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStoreAddr}}, + regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStores}}, } storeTaskMap[rpcCtx.Addr] = batchTask } diff --git a/store/tikv/batch_request_sender.go b/store/tikv/batch_request_sender.go index 2098a6b736cc3..96f102b457383 100644 --- a/store/tikv/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -27,10 +27,10 @@ import ( ) type RegionInfo struct { - Region RegionVerID - Meta *metapb.Region - Ranges *KeyRanges - AllStoreAddrs []string + Region RegionVerID + Meta *metapb.Region + Ranges *KeyRanges + AllStores []uint64 } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index cbf11a96417ac..51bc021792e8e 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -532,19 +532,49 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe }, nil } +func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 { + allStores := make([]uint64, 0, 1) + allStores = append(allStores, currentStore.storeID) + ts := time.Now().Unix() + cachedRegion := c.getCachedRegionWithRLock(id) + if cachedRegion == nil { + return allStores + } + if !cachedRegion.checkRegionCacheTTL(ts) { + return allStores + } + regionStore := cachedRegion.getStore() + currentIndex := regionStore.getAccessIndex(TiFlashOnly, currentStore) + if currentIndex == -1 { + return allStores + } + for startOffset := 1; startOffset < regionStore.accessStoreNum(TiFlashOnly); startOffset++ { + accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(TiFlashOnly)) + storeIdx, store := regionStore.accessStore(TiFlashOnly, accessIdx) + if store.getResolveState() == needCheck { + continue + } + storeFailEpoch := atomic.LoadUint32(&store.epoch) + if storeFailEpoch != regionStore.storeEpochs[storeIdx] { + continue + } + allStores = append(allStores, store.storeID) + } + return allStores +} + // GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region // must be out of date and already dropped from cache or not flash store found. // `loadBalance` is an option. For MPP and batch cop, it is pointless and might cause try the failed store repeatly. -func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBalance bool, collectAllStoreAddrs bool) (*RPCContext, []string, error) { +func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBalance bool) (*RPCContext, error) { ts := time.Now().Unix() - allStoreAddrs := make([]string, 0, 1) cachedRegion := c.getCachedRegionWithRLock(id) if cachedRegion == nil { - return nil, nil, nil + return nil, nil } if !cachedRegion.checkRegionCacheTTL(ts) { - return nil, nil, nil + return nil, nil } regionStore := cachedRegion.getStore() @@ -561,11 +591,11 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBa storeIdx, store := regionStore.accessStore(TiFlashOnly, accessIdx) addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { - return nil, nil, err + return nil, err } if len(addr) == 0 { cachedRegion.invalidate(StoreNotFound) - return nil, nil, nil + return nil, nil } if store.getResolveState() == needCheck { _, err := store.reResolve(c) @@ -582,7 +612,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBa // TiFlash will always try to find out a valid peer, avoiding to retry too many times. continue } - rpcContex := &RPCContext{ + return &RPCContext{ Region: id, Meta: cachedRegion.meta, Peer: peer, @@ -591,31 +621,11 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID, loadBa Addr: addr, AccessMode: TiFlashOnly, TiKVNum: regionStore.accessStoreNum(TiKVOnly), - } - if collectAllStoreAddrs { - allStoreAddrs = append(allStoreAddrs, addr) - for startOffset := 1; startOffset < regionStore.accessStoreNum(TiFlashOnly); startOffset++ { - accessIdx = AccessIndex((sIdx + i + startOffset) % regionStore.accessStoreNum(TiFlashOnly)) - storeIdx, store = regionStore.accessStore(TiFlashOnly, accessIdx) - addr, err = c.getStoreAddr(bo, cachedRegion, store, storeIdx) - if err != nil { - continue - } - if len(addr) == 0 { - continue - } - if store.getResolveState() == needCheck { - continue - } - allStoreAddrs = append(allStoreAddrs, addr) - } - return rpcContex, allStoreAddrs, nil - } - return rpcContex, nil, nil + }, nil } cachedRegion.invalidate(Other) - return nil, nil, nil + return nil, nil } // KeyLocation is the region and range that a key is located. diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index ebfb7d95a14e4..592541eb5b9d2 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -875,7 +875,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { c.Assert(lctx.Peer.Id, Equals, peer3) // epoch-not-match on tiflash - ctxTiFlash, _, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region, true, false) + ctxTiFlash, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region, true) c.Assert(err, IsNil) c.Assert(ctxTiFlash.Peer.Id, Equals, s.peer1) ctxTiFlash.Peer.Role = metapb.PeerRole_Learner diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index e98db093e5d4f..cad0ed0379e96 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -202,8 +202,7 @@ func (s *RegionRequestSender) getRPCContext( } return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...) case tikvrpc.TiFlash: - context, _, err := s.regionCache.GetTiFlashRPCContext(bo, regionID, true, false) - return context, err + return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) case tikvrpc.TiDB: return &RPCContext{Addr: s.storeAddr}, nil default: From fc86ee701e1f383175f1c507ac7f7e9c61da9ca1 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 6 May 2021 15:44:16 +0800 Subject: [PATCH 05/21] save work --- store/copr/batch_coprocessor.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index ccd3ccba062d1..3134f7c34babc 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -96,12 +96,6 @@ func (rs *batchCopResponse) RespTime() time.Duration { return rs.respTime } -type copTaskAndRPCContext struct { - task *copTask - allStoreAddrs []string - ctx *tikv.RPCContext -} - func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { storeTaskMap := make(map[uint64]*batchCopTask) storeCandidateTaskMap := make(map[uint64]map[string]tikv.RegionInfo) From 0e4e72ccedcc07777c46f60b792c4b1d0172de35 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 7 May 2021 12:53:58 +0800 Subject: [PATCH 06/21] save work --- store/tikv/batch_request_sender.go | 8 +++++--- store/tikv/region_cache.go | 15 +++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/store/tikv/batch_request_sender.go b/store/tikv/batch_request_sender.go index 96f102b457383..b9277dba814b7 100644 --- a/store/tikv/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -47,7 +47,6 @@ func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatch // SendReqToAddr sends a request to tikv/tiflash server. func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { - // use the first ctx to send request, because every ctx has same address. cancel = func() {} if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, false, cancel, errors.Trace(e) @@ -82,8 +81,11 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx return tikverr.ErrTiDBShuttingDown } - ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, ss.NeedReloadRegion(ctx), err) + // always reload region because + // 1. for batch request sender, there is no easy way to tracker failedStoreID for all the regions + // 2. io error should be very rare, so the cost of reload region is acceptable + ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) - err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctx: %v, regionInfos: %v, try next peer later", err, ctx, regionInfos)) + err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) return errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 51bc021792e8e..8fed038bcdd4d 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -709,7 +709,7 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, regionInfos []RegionInfo, scheduleReload bool, err error) { metrics.RegionCacheCounterWithSendFail.Inc() if store.storeType != tikvrpc.TiFlash { - logutil.Logger(bo.ctx).Warn("OnSendFailForBatchRegions only support TiFlash") + logutil.Logger(bo.ctx).Info("Should not reach here, OnSendFailForBatchRegions only support TiFlash") return } for _, ri := range regionInfos { @@ -721,7 +721,7 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, reg peersNum := len(r.meta.Peers) if len(ri.Meta.Peers) != peersNum { logutil.Logger(bo.ctx).Info("retry and refresh current region after send request fail and up/down stores length changed", - zap.Stringer("current", &ri.Region), + zap.Stringer("region", &ri.Region), zap.Bool("needReload", scheduleReload), zap.Reflect("oldPeers", ri.Meta.Peers), zap.Reflect("newPeers", r.meta.Peers), @@ -734,16 +734,11 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, reg accessMode := TiFlashOnly accessIdx := rs.getAccessIndex(accessMode, store) if accessIdx == -1 { - logutil.Logger(bo.ctx).Warn("OnSendFailForBatchRegions can not get access index for region " + ri.Region.String()) + logutil.Logger(bo.ctx).Warn("can not get access index for region " + ri.Region.String()) continue } if err != nil { storeIdx, s := rs.accessStore(accessMode, accessIdx) - // Mark the store as failure if it's not a redirection request because we - // can't know the status of the proxy store by it. - // send fail but store is reachable, keep retry current peer for replica leader request. - // but we still need switch peer for follower-read or learner-read(i.e. tiflash) - // invalidate regions in store. epoch := rs.storeEpochs[storeIdx] if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) @@ -753,10 +748,10 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, reg s.markNeedCheck(c.notifyCheckCh) } - // try next peer to found new leader. + // try next peer rs.switchNextFlashPeer(r, accessIdx) logutil.Logger(bo.ctx).Info("switch region tiflash peer to next due to send request fail", - zap.Stringer("current", &ri.Region), + zap.Stringer("region", &ri.Region), zap.Bool("needReload", scheduleReload), zap.Error(err)) From 0cf711a050b788a4842eb66d838f3d5317811456 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 7 May 2021 14:19:51 +0800 Subject: [PATCH 07/21] save work --- store/copr/batch_coprocessor.go | 52 +++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index fa2bca472754a..98073b98015ec 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -96,11 +96,16 @@ func (rs *batchCopResponse) RespTime() time.Duration { return rs.respTime } +// balanceBatchCopTask balance the regions between available stores, the basic rule is +// 1. the first region of each original batch cop task belongs to its original store +// 2. for the remaining regions: +// if there is only 1 available store, then put the region to the related store +// otherwise, use a greedy algorithm to put it into the store with highest weight func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { storeTaskMap := make(map[uint64]*batchCopTask) storeCandidateTaskMap := make(map[uint64]map[string]tikv.RegionInfo) - totalCandidateStoreNum := 0 - totalCandidateCopTaskNum := 0 + totalTaskCandidateNum := 0 + totalRemainingTaskNum := 0 for _, task := range originalTasks { batchTask := &batchCopTask{ storeAddr: task.storeAddr, @@ -132,8 +137,8 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } else { // if more than one store is valid, put the cop task // to store candidate map - totalCandidateStoreNum += validStoreNum - totalCandidateCopTaskNum += 1 + totalTaskCandidateNum += validStoreNum + totalRemainingTaskNum += 1 /// put this cop task to candidate task map taskKey := ri.Region.String() for _, storeID := range ri.AllStores { @@ -153,52 +158,55 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } } - avgStorePerTask := float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) - findNextStore := func() (uint64, float64) { + avgStorePerTask := float64(totalTaskCandidateNum) / float64(totalRemainingTaskNum) + findNextStore := func() uint64 { store := uint64(math.MaxUint64) - possibleTaskNum := float64(0) + weightedTaskNum := float64(0) for storeID := range storeTaskMap { if store == uint64(math.MaxUint64) && len(storeCandidateTaskMap[storeID]) > 0 { store = storeID - possibleTaskNum = float64(len(storeCandidateTaskMap[storeID]))/avgStorePerTask + float64(len(storeTaskMap[storeID].regionInfos)) + weightedTaskNum = float64(len(storeCandidateTaskMap[storeID]))/avgStorePerTask + float64(len(storeTaskMap[storeID].regionInfos)) } else { num := float64(len(storeCandidateTaskMap[storeID])) / avgStorePerTask if num == 0 { continue } num += float64(len(storeTaskMap[storeID].regionInfos)) - if num < possibleTaskNum { + if num < weightedTaskNum { store = storeID - possibleTaskNum = num + weightedTaskNum = num } } } - return store, possibleTaskNum + return store } - if totalCandidateStoreNum == 0 { + if totalTaskCandidateNum == 0 { return originalTasks } - store, possibleTaskNum := findNextStore() - for totalCandidateStoreNum > 0 { + store := findNextStore() + for totalTaskCandidateNum > 0 { if len(storeCandidateTaskMap[store]) == 0 { - store, possibleTaskNum = findNextStore() + store = findNextStore() } for key, ri := range storeCandidateTaskMap[store] { storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) - totalCandidateCopTaskNum-- + totalRemainingTaskNum-- for _, id := range ri.AllStores { if _, ok := storeCandidateTaskMap[id]; ok { delete(storeCandidateTaskMap[id], key) - totalCandidateStoreNum-- + totalTaskCandidateNum-- } } - if totalCandidateCopTaskNum > 0 { - possibleTaskNum = float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].regionInfos)) - avgStorePerTask = float64(totalCandidateStoreNum) / float64(totalCandidateCopTaskNum) + if totalRemainingTaskNum > 0 { + weightedTaskNum := float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].regionInfos)) + avgStorePerTask = float64(totalTaskCandidateNum) / float64(totalRemainingTaskNum) for _, id := range ri.AllStores { - if id != store && len(storeCandidateTaskMap[id]) > 0 && float64(len(storeCandidateTaskMap[id]))/avgStorePerTask+float64(len(storeTaskMap[id].regionInfos)) <= possibleTaskNum { + // it is not optimal because we only check the stores that affected by this region, in fact in order + // to find out the store with the lowest weightedTaskNum, all stores should be checked, but I think + // check only the affected stores is more simple and will get a good enough result + if id != store && len(storeCandidateTaskMap[id]) > 0 && float64(len(storeCandidateTaskMap[id]))/avgStorePerTask+float64(len(storeTaskMap[id].regionInfos)) <= weightedTaskNum { store = id - possibleTaskNum = float64(len(storeCandidateTaskMap[id]))/avgStorePerTask + float64(len(storeTaskMap[id].regionInfos)) + weightedTaskNum = float64(len(storeCandidateTaskMap[id]))/avgStorePerTask + float64(len(storeTaskMap[id].regionInfos)) } } } From 0bf087447f27bdf15ae103907bd16079457ad107 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 7 May 2021 14:44:07 +0800 Subject: [PATCH 08/21] save work --- store/copr/batch_coprocessor.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 98073b98015ec..d78a33c635218 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -110,6 +110,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { batchTask := &batchCopTask{ storeAddr: task.storeAddr, cmdType: task.cmdType, + ctx: task.ctx, regionInfos: []tikv.RegionInfo{task.regionInfos[0]}, } storeTaskMap[task.regionInfos[0].AllStores[0]] = batchTask @@ -157,6 +158,9 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } } } + if totalRemainingTaskNum == 0 { + return originalTasks + } avgStorePerTask := float64(totalTaskCandidateNum) / float64(totalRemainingTaskNum) findNextStore := func() uint64 { @@ -180,14 +184,14 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } return store } - if totalTaskCandidateNum == 0 { - return originalTasks - } store := findNextStore() - for totalTaskCandidateNum > 0 { + for totalRemainingTaskNum > 0 { if len(storeCandidateTaskMap[store]) == 0 { store = findNextStore() } + if store == uint64(math.MaxUint64) { + break + } for key, ri := range storeCandidateTaskMap[store] { storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) totalRemainingTaskNum-- From 39c87869b5881967130d1ee2040c32fdff656ecc Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 7 May 2021 14:45:48 +0800 Subject: [PATCH 09/21] save work --- store/copr/batch_coprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index d78a33c635218..8c018aee733f7 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -265,7 +265,7 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik } allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store) if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { - batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{task.region, rpcCtx.Meta, task.ranges, allStores}) + batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}) } else { batchTask := &batchCopTask{ storeAddr: rpcCtx.Addr, From ccbb3f39d21e19d776085fae674a107a59569459 Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 10 May 2021 16:15:03 +0800 Subject: [PATCH 10/21] refine --- store/copr/batch_coprocessor.go | 74 +++++++++++++++--------------- store/copr/mpp.go | 2 +- store/tikv/batch_request_sender.go | 2 +- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 9da94788dae67..7c17d0cf2e1ba 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -104,22 +104,27 @@ func (rs *batchCopResponse) RespTime() time.Duration { // otherwise, use a greedy algorithm to put it into the store with highest weight func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { storeTaskMap := make(map[uint64]*batchCopTask) - storeCandidateTaskMap := make(map[uint64]map[string]tikv.RegionInfo) - totalTaskCandidateNum := 0 - totalRemainingTaskNum := 0 + storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo) + totalRegionCandidateNum := 0 + totalRemainingRegionNum := 0 + for _, task := range originalTasks { + taskStoreID := task.regionInfos[0].AllStores[0] batchTask := &batchCopTask{ storeAddr: task.storeAddr, cmdType: task.cmdType, ctx: task.ctx, regionInfos: []tikv.RegionInfo{task.regionInfos[0]}, } - storeTaskMap[task.regionInfos[0].AllStores[0]] = batchTask + storeTaskMap[taskStoreID] = batchTask + candidateMap := make(map[string]tikv.RegionInfo) + storeCandidateRegionMap[taskStoreID] = candidateMap } + for _, task := range originalTasks { taskStoreID := task.regionInfos[0].AllStores[0] for index, ri := range task.regionInfos { - // for each cop task, figure out the valid store num + // for each region, figure out the valid store num validStoreNum := 0 if index == 0 { continue @@ -128,7 +133,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { validStoreNum = 1 } else { for _, storeID := range ri.AllStores { - if _, ok := storeTaskMap[storeID]; ok { + if _, ok := storeCandidateRegionMap[storeID]; ok { validStoreNum++ } } @@ -137,81 +142,76 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { // if only one store is valid, just put it to storeTaskMap storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri) } else { - // if more than one store is valid, put the cop task + // if more than one store is valid, put the region // to store candidate map - totalTaskCandidateNum += validStoreNum - totalRemainingTaskNum += 1 - /// put this cop task to candidate task map + totalRegionCandidateNum += validStoreNum + totalRemainingRegionNum += 1 taskKey := ri.Region.String() for _, storeID := range ri.AllStores { - if candidateMap, ok := storeCandidateTaskMap[storeID]; ok { + if candidateMap, ok := storeCandidateRegionMap[storeID]; ok { if _, ok := candidateMap[taskKey]; ok { // duplicated region, should not happen, just give up balance return originalTasks } candidateMap[taskKey] = ri - } else { - candidateMap := make(map[string]tikv.RegionInfo) - candidateMap[taskKey] = ri - storeCandidateTaskMap[storeID] = candidateMap } } } } } - if totalRemainingTaskNum == 0 { + if totalRemainingRegionNum == 0 { return originalTasks } - avgStorePerTask := float64(totalTaskCandidateNum) / float64(totalRemainingTaskNum) + avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) findNextStore := func() uint64 { store := uint64(math.MaxUint64) - weightedTaskNum := float64(0) + weightedRegionNum := float64(0) for storeID := range storeTaskMap { - if store == uint64(math.MaxUint64) && len(storeCandidateTaskMap[storeID]) > 0 { + if store == uint64(math.MaxUint64) && len(storeCandidateRegionMap[storeID]) > 0 { store = storeID - weightedTaskNum = float64(len(storeCandidateTaskMap[storeID]))/avgStorePerTask + float64(len(storeTaskMap[storeID].regionInfos)) + weightedRegionNum = float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) } else { - num := float64(len(storeCandidateTaskMap[storeID])) / avgStorePerTask + num := float64(len(storeCandidateRegionMap[storeID])) / avgStorePerRegion if num == 0 { continue } num += float64(len(storeTaskMap[storeID].regionInfos)) - if num < weightedTaskNum { + if num < weightedRegionNum { store = storeID - weightedTaskNum = num + weightedRegionNum = num } } } return store } store := findNextStore() - for totalRemainingTaskNum > 0 { - if len(storeCandidateTaskMap[store]) == 0 { + for totalRemainingRegionNum > 0 { + if len(storeCandidateRegionMap[store]) == 0 { store = findNextStore() } if store == uint64(math.MaxUint64) { break } - for key, ri := range storeCandidateTaskMap[store] { + for key, ri := range storeCandidateRegionMap[store] { storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) - totalRemainingTaskNum-- + totalRemainingRegionNum-- for _, id := range ri.AllStores { - if _, ok := storeCandidateTaskMap[id]; ok { - delete(storeCandidateTaskMap[id], key) - totalTaskCandidateNum-- + if _, ok := storeCandidateRegionMap[id]; ok { + delete(storeCandidateRegionMap[id], key) + totalRegionCandidateNum-- } } - if totalRemainingTaskNum > 0 { - weightedTaskNum := float64(len(storeCandidateTaskMap[store]))/avgStorePerTask + float64(len(storeTaskMap[store].regionInfos)) - avgStorePerTask = float64(totalTaskCandidateNum) / float64(totalRemainingTaskNum) + if totalRemainingRegionNum > 0 { + weightedTaskNum := float64(len(storeCandidateRegionMap[store]))/avgStorePerRegion + float64(len(storeTaskMap[store].regionInfos)) + avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) for _, id := range ri.AllStores { // it is not optimal because we only check the stores that affected by this region, in fact in order // to find out the store with the lowest weightedTaskNum, all stores should be checked, but I think // check only the affected stores is more simple and will get a good enough result - if id != store && len(storeCandidateTaskMap[id]) > 0 && float64(len(storeCandidateTaskMap[id]))/avgStorePerTask+float64(len(storeTaskMap[id].regionInfos)) <= weightedTaskNum { + if id != store && len(storeCandidateRegionMap[id]) > 0 && float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion+float64(len(storeTaskMap[id].regionInfos)) <= weightedTaskNum { store = id - weightedTaskNum = float64(len(storeCandidateTaskMap[id]))/avgStorePerTask + float64(len(storeTaskMap[id].regionInfos)) + weightedTaskNum = float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion + float64(len(storeTaskMap[id].regionInfos)) } } } @@ -454,7 +454,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer, } func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) + sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { regionInfos = append(regionInfos, &coprocessor.RegionInfo{ @@ -486,7 +486,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, ta req.StoreTp = tikvrpc.TiFlash logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos))) - resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong) + resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong) // If there are store errors, we should retry for all regions. if retry { return b.retryBatchCopTask(ctx, bo, task) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 57d972cca0f0f..416667697dbef 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -219,7 +219,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req // In that case if originalTask != nil { sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient()) - rpcResp, _, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) + rpcResp, _, _, err = sender.SendReqToAddr(bo.TiKVBackoffer(), originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. diff --git a/store/tikv/batch_request_sender.go b/store/tikv/batch_request_sender.go index 996c31d9c5d9c..d479e7c5d0417 100644 --- a/store/tikv/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -45,7 +45,7 @@ func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatch } } -func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { +func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { // use the first ctx to send request, because every ctx has same address. cancel = func() {} if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { From 9f32a6bccaac1dbace0d978382e9799da72ae285 Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 10 May 2021 17:14:13 +0800 Subject: [PATCH 11/21] refine --- store/tikv/region_cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 207201c1d0e12..641b977b7f7c0 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -535,7 +535,8 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe } func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 { - allStores := make([]uint64, 0, 1) + allStores := make([]uint64, 0, 4) + // make sure currentStore id is always the first in allStores allStores = append(allStores, currentStore.storeID) ts := time.Now().Unix() cachedRegion := c.getCachedRegionWithRLock(id) From 3a641fc00f46cefd6ee9c9489fed7cc85340a0ed Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 10 May 2021 17:49:07 +0800 Subject: [PATCH 12/21] add comments --- store/tikv/batch_request_sender.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/tikv/batch_request_sender.go b/store/tikv/batch_request_sender.go index d479e7c5d0417..4669fc2ecdde1 100644 --- a/store/tikv/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -88,6 +88,10 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx // when meeting io error. ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) + // Retry on send request failure when it's not canceled. + // When a store is not available, the leader of related region should be elected quickly. + // TODO: the number of retry time should be limited:since region may be unavailable + // when some unrecoverable disaster happened. err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) return errors.Trace(err) } From 416378f6bd66513592d4af8482ecb93c211ac6e7 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 May 2021 09:03:29 +0800 Subject: [PATCH 13/21] fix ci --- store/copr/batch_coprocessor.go | 2 +- store/tikv/batch_request_sender.go | 2 ++ store/tikv/region_cache.go | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 7c17d0cf2e1ba..06d8d50f0ea32 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -272,7 +272,7 @@ func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.Key storeAddr: rpcCtx.Addr, cmdType: cmdType, ctx: rpcCtx, - regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStores}}, + regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}}, } storeTaskMap[rpcCtx.Addr] = batchTask } diff --git a/store/tikv/batch_request_sender.go b/store/tikv/batch_request_sender.go index 4669fc2ecdde1..90d3315b69731 100644 --- a/store/tikv/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/status" ) +// RegionInfo contains region related information for batchCopTask type RegionInfo struct { Region RegionVerID Meta *metapb.Region @@ -45,6 +46,7 @@ func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatch } } +// SendReqToAddr send batch cop request func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { // use the first ctx to send request, because every ctx has same address. cancel = func() {} diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 641b977b7f7c0..1a51fe296a291 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -534,8 +534,9 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe }, nil } +// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 { - allStores := make([]uint64, 0, 4) + allStores := make([]uint64, 0, 2) // make sure currentStore id is always the first in allStores allStores = append(allStores, currentStore.storeID) ts := time.Now().Unix() From 2986c4e26ae6a31c3255e36780bdb118cec4dac0 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 May 2021 14:55:48 +0800 Subject: [PATCH 14/21] fix ci --- store/copr/batch_coprocessor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 06d8d50f0ea32..c542bcd9a4c61 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -492,6 +492,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, ta return b.retryBatchCopTask(ctx, bo, task) } if err != nil { + err = txndriver.ToTiDBErr(err) return nil, errors.Trace(err) } defer cancel() From 8af078f7c620a4fbeea8b41efb7b264aa1eaf02a Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 May 2021 15:06:24 +0800 Subject: [PATCH 15/21] refine --- store/copr/batch_coprocessor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 2a50ac8c02282..65f7bfb281859 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -201,15 +201,15 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } } if totalRemainingRegionNum > 0 { - weightedTaskNum := float64(len(storeCandidateRegionMap[store]))/avgStorePerRegion + float64(len(storeTaskMap[store].regionInfos)) avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) + weightedRegionNum := float64(len(storeCandidateRegionMap[store]))/avgStorePerRegion + float64(len(storeTaskMap[store].regionInfos)) for _, id := range ri.AllStores { // it is not optimal because we only check the stores that affected by this region, in fact in order - // to find out the store with the lowest weightedTaskNum, all stores should be checked, but I think + // to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think // check only the affected stores is more simple and will get a good enough result - if id != store && len(storeCandidateRegionMap[id]) > 0 && float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion+float64(len(storeTaskMap[id].regionInfos)) <= weightedTaskNum { + if id != store && len(storeCandidateRegionMap[id]) > 0 && float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion+float64(len(storeTaskMap[id].regionInfos)) <= weightedRegionNum { store = id - weightedTaskNum = float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion + float64(len(storeTaskMap[id].regionInfos)) + weightedRegionNum = float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion + float64(len(storeTaskMap[id].regionInfos)) } } } From a7eb17fb70c333aaa8f364020f90e46784edcdd1 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 May 2021 15:22:24 +0800 Subject: [PATCH 16/21] address comments --- store/tikv/region_cache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 1a51fe296a291..4b1228972d920 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -536,6 +536,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe // GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 { + // set the cap to 2 because usually, TiFlash table will have 2 replicas allStores := make([]uint64, 0, 2) // make sure currentStore id is always the first in allStores allStores = append(allStores, currentStore.storeID) From d682af314f8ef55f63540e2600853e5da29ef5d4 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 May 2021 09:45:01 +0800 Subject: [PATCH 17/21] refine code --- store/copr/batch_coprocessor.go | 90 ++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index c51f8ccb80738..d597c35a19bdb 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -114,8 +114,6 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { regionInfos: []tikv.RegionInfo{task.regionInfos[0]}, } storeTaskMap[taskStoreID] = batchTask - candidateMap := make(map[string]tikv.RegionInfo) - storeCandidateRegionMap[taskStoreID] = candidateMap } for _, task := range originalTasks { @@ -130,7 +128,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { validStoreNum = 1 } else { for _, storeID := range ri.AllStores { - if _, ok := storeCandidateRegionMap[storeID]; ok { + if _, ok := storeTaskMap[storeID]; ok { validStoreNum++ } } @@ -145,12 +143,16 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { totalRemainingRegionNum += 1 taskKey := ri.Region.String() for _, storeID := range ri.AllStores { - if candidateMap, ok := storeCandidateRegionMap[storeID]; ok { - if _, ok := candidateMap[taskKey]; ok { + if _, validStore := storeTaskMap[storeID]; validStore { + if _, ok := storeCandidateRegionMap[storeID]; !ok { + candidateMap := make(map[string]tikv.RegionInfo) + storeCandidateRegionMap[taskStoreID] = candidateMap + } + if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion { // duplicated region, should not happen, just give up balance return originalTasks } - candidateMap[taskKey] = ri + storeCandidateRegionMap[storeID][taskKey] = ri } } } @@ -161,58 +163,64 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) - findNextStore := func() uint64 { + findNextStore := func(candidateStores []uint64) uint64 { store := uint64(math.MaxUint64) - weightedRegionNum := float64(0) - for storeID := range storeTaskMap { - if store == uint64(math.MaxUint64) && len(storeCandidateRegionMap[storeID]) > 0 { - store = storeID - weightedRegionNum = float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) - } else { - num := float64(len(storeCandidateRegionMap[storeID])) / avgStorePerRegion - if num == 0 { + weightedRegionNum := math.MaxFloat64 + if candidateStores != nil { + for _, storeID := range candidateStores { + if _, validStore := storeCandidateRegionMap[storeID]; !validStore { continue } - num += float64(len(storeTaskMap[storeID].regionInfos)) + num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) if num < weightedRegionNum { store = storeID weightedRegionNum = num } } } + if store != uint64(math.MaxUint64) { + return store + } + for storeID := range storeTaskMap { + if _, validStore := storeCandidateRegionMap[storeID]; !validStore { + continue + } + num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) + if num < weightedRegionNum { + store = storeID + weightedRegionNum = num + } + } return store } - store := findNextStore() + store := findNextStore(nil) for totalRemainingRegionNum > 0 { - if len(storeCandidateRegionMap[store]) == 0 { - store = findNextStore() - } if store == uint64(math.MaxUint64) { break } - for key, ri := range storeCandidateRegionMap[store] { - storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) - totalRemainingRegionNum-- - for _, id := range ri.AllStores { - if _, ok := storeCandidateRegionMap[id]; ok { - delete(storeCandidateRegionMap[id], key) - totalRegionCandidateNum-- - } - } - if totalRemainingRegionNum > 0 { - avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) - weightedRegionNum := float64(len(storeCandidateRegionMap[store]))/avgStorePerRegion + float64(len(storeTaskMap[store].regionInfos)) - for _, id := range ri.AllStores { - // it is not optimal because we only check the stores that affected by this region, in fact in order - // to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think - // check only the affected stores is more simple and will get a good enough result - if id != store && len(storeCandidateRegionMap[id]) > 0 && float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion+float64(len(storeTaskMap[id].regionInfos)) <= weightedRegionNum { - store = id - weightedRegionNum = float64(len(storeCandidateRegionMap[id]))/avgStorePerRegion + float64(len(storeTaskMap[id].regionInfos)) - } + var key string + var ri tikv.RegionInfo + for key, ri = range storeCandidateRegionMap[store] { + // get the first region + break + } + storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) + totalRemainingRegionNum-- + for _, id := range ri.AllStores { + if _, ok := storeCandidateRegionMap[id]; ok { + delete(storeCandidateRegionMap[id], key) + totalRegionCandidateNum-- + if len(storeCandidateRegionMap[id]) == 0 { + delete(storeCandidateRegionMap, id) } } - break + } + if totalRemainingRegionNum > 0 { + avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) + // it is not optimal because we only check the stores that affected by this region, in fact in order + // to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think + // check only the affected stores is more simple and will get a good enough result + store = findNextStore(ri.AllStores) } } From 624abedde8a480fa6dfd58347a822114e938e568 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 May 2021 10:09:06 +0800 Subject: [PATCH 18/21] refine --- store/copr/batch_coprocessor.go | 34 +++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index d597c35a19bdb..e656a01315bee 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -95,11 +95,15 @@ func (rs *batchCopResponse) RespTime() time.Duration { } // balanceBatchCopTask balance the regions between available stores, the basic rule is -// 1. the first region of each original batch cop task belongs to its original store +// 1. the first region of each original batch cop task belongs to its original store because some +// meta data(like the rpc context) in batchCopTask is related to it // 2. for the remaining regions: // if there is only 1 available store, then put the region to the related store // otherwise, use a greedy algorithm to put it into the store with highest weight func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { + if len(originalTasks) <= 1 { + return originalTasks + } storeTaskMap := make(map[uint64]*batchCopTask) storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo) totalRegionCandidateNum := 0 @@ -143,17 +147,18 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { totalRemainingRegionNum += 1 taskKey := ri.Region.String() for _, storeID := range ri.AllStores { - if _, validStore := storeTaskMap[storeID]; validStore { - if _, ok := storeCandidateRegionMap[storeID]; !ok { - candidateMap := make(map[string]tikv.RegionInfo) - storeCandidateRegionMap[taskStoreID] = candidateMap - } - if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion { - // duplicated region, should not happen, just give up balance - return originalTasks - } - storeCandidateRegionMap[storeID][taskKey] = ri + if _, validStore := storeTaskMap[storeID]; !validStore { + continue + } + if _, ok := storeCandidateRegionMap[storeID]; !ok { + candidateMap := make(map[string]tikv.RegionInfo) + storeCandidateRegionMap[storeID] = candidateMap + } + if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion { + // duplicated region, should not happen, just give up balance + return originalTasks } + storeCandidateRegionMap[storeID][taskKey] = ri } } } @@ -177,9 +182,9 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { weightedRegionNum = num } } - } - if store != uint64(math.MaxUint64) { - return store + if store != uint64(math.MaxUint64) { + return store + } } for storeID := range storeTaskMap { if _, validStore := storeCandidateRegionMap[storeID]; !validStore { @@ -193,6 +198,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } return store } + store := findNextStore(nil) for totalRemainingRegionNum > 0 { if store == uint64(math.MaxUint64) { From e1cdae24fe61bfe7bc81d0bfeb5f7dbf29989937 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 May 2021 17:02:22 +0800 Subject: [PATCH 19/21] address comments --- store/tikv/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 4b1228972d920..a77a8217d1a13 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -712,7 +712,7 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) // OnSendFailForBatchRegions handles send request fail logic. func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, regionInfos []RegionInfo, scheduleReload bool, err error) { - metrics.RegionCacheCounterWithSendFail.Inc() + metrics.RegionCacheCounterWithSendFail.Add(float64(len(regionInfos))) if store.storeType != tikvrpc.TiFlash { logutil.Logger(bo.ctx).Info("Should not reach here, OnSendFailForBatchRegions only support TiFlash") return From 849391395b840faee4a28857ac93f01f79b0a3df Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 14 May 2021 15:23:21 +0800 Subject: [PATCH 20/21] address comments --- store/copr/batch_coprocessor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 2acbf2d6bc660..d9b9abb722c22 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -157,6 +157,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { } if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion { // duplicated region, should not happen, just give up balance + logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing") return originalTasks } storeCandidateRegionMap[storeID][taskKey] = ri @@ -230,6 +231,10 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { store = findNextStore(ri.AllStores) } } + if totalRemainingRegionNum > 0 { + logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing") + return originalTasks + } var ret []*batchCopTask for _, task := range storeTaskMap { From 3a76a3e0aaed190f6c2949506de69d7e0fb7e683 Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 17 May 2021 20:34:27 +0800 Subject: [PATCH 21/21] add some log --- store/copr/batch_coprocessor.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index d9b9abb722c22..8c73f58fbf892 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -17,6 +17,7 @@ import ( "context" "io" "math" + "strconv" "sync" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" @@ -310,7 +312,21 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key for _, task := range storeTaskMap { batchTasks = append(batchTasks, task) } + if log.GetLevel() <= zap.DebugLevel { + msg := "Before region balance:" + for _, task := range batchTasks { + msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions," + } + logutil.BgLogger().Debug(msg) + } batchTasks = balanceBatchCopTask(batchTasks) + if log.GetLevel() <= zap.DebugLevel { + msg := "After region balance:" + for _, task := range batchTasks { + msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions," + } + logutil.BgLogger().Debug(msg) + } if elapsed := time.Since(start); elapsed > time.Millisecond*500 { logutil.BgLogger().Warn("buildBatchCopTasks takes too much time",