From b936543bb86cc15455e5203c2b4319fde9beb35b Mon Sep 17 00:00:00 2001 From: PatrickNicholas Date: Sat, 16 Jan 2021 12:02:29 +0800 Subject: [PATCH] *: add network cost estimater for adpative follower scan --- distsql/request_builder.go | 6 +++--- executor/table_reader.go | 15 ++++++++------- kv/kv.go | 5 ++--- store/tikv/coprocessor.go | 23 +++++++++++++++++++++-- store/tikv/snapshot.go | 5 +++-- 5 files changed, 37 insertions(+), 17 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 9e075cc04bffe..ca05f07a4b31a 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -243,9 +243,9 @@ func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder { return builder } -// SetRecommendLocalScan sets "RecommendLocalScan" flag for "kv.Request". -func (builder *RequestBuilder) SetRecommendLocalScan(recommendLocalScan bool) *RequestBuilder { - builder.Request.RecommendLocalScan = recommendLocalScan +// SetNetworkCostEstimater sets "NetworkCostEstimater" flag for "kv.Request". +func (builder *RequestBuilder) SetNetworkCostEstimater(estimator func(*kv.KeyRange, []kv.KeyRange) float64) *RequestBuilder { + builder.Request.NetworkCostEstimater = estimator return builder } diff --git a/executor/table_reader.go b/executor/table_reader.go index 4e12a2883cea1..de1d317bf3f5d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -36,10 +36,6 @@ import ( "github.com/pingcap/tipb/go-tipb" ) -// CostThreshold marks the cost threshold to determine whether given request is -// a large scan request. -const CostThreshold = 100000000.0 - // make sure `TableReaderExecutor` implements `Executor`. var _ Executor = &TableReaderExecutor{} @@ -228,9 +224,14 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback) } - totalCst := 0.0 + factor := e.ctx.GetSessionVars().NetworkFactor + totalCost := 0.0 for _, plan := range e.plans { - totalCst += plan.StatsCount() + totalCost += plan.StatsCount()*factor + + plan.Stats().HistColl.GetAvgRowSize(e.ctx, plan.Schema().Columns, false, true) + } + estimater := func(r *kv.KeyRange, totalRanges []kv.KeyRange) float64 { + return totalCost * 1.0 / (float64(len(totalRanges)) + 1.0) } kvReq, err := reqBuilder. @@ -244,7 +245,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetStoreType(e.storeType). SetAllowBatchCop(e.batchCop). SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)). - SetRecommendLocalScan(totalCst >= CostThreshold). // TODO: need to replace with real cost, this is just row count + SetNetworkCostEstimater(estimater). Build() if err != nil { return nil, err diff --git a/kv/kv.go b/kv/kv.go index 30e132a361367..b20741bd23e47 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -408,9 +408,8 @@ type Request struct { TaskID uint64 // TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances. TiDBServerID uint64 - // RecommendLocalScan indicates whether the planner recommends a local tikv replica scan, when - // the tidb-server and leader are not in the same AZ/DC. - RecommendLocalScan bool + + NetworkCostEstimater func(*KeyRange, []KeyRange) float64 } // ResultSubset represents a result subset from a single storage unit. diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 1d150753117e4..5b876c0179c8c 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -45,6 +45,10 @@ import ( "go.uber.org/zap" ) +// CostThreshold marks the cost threshold to determine whether given request is +// a large scan request. +const CostThreshold = 64 * 1024 // 64KB + var ( tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor") tikvTxnRegionsNumHistogramWithBatchCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor") @@ -870,8 +874,23 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch } } + recommendLocalScan := false + if worker.req.NetworkCostEstimater != nil { + networkCost := 0.0 + for _, r := range copReq.Ranges { + kvRange := kv.KeyRange{ + StartKey: r.Start, + EndKey: r.End, + } + networkCost += worker.req.NetworkCostEstimater(&kvRange, worker.req.KeyRanges) + } + if networkCost > float64(CostThreshold) { + recommendLocalScan = true + } + } + req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, - &worker.replicaReadSeed, worker.req.RecommendLocalScan, + &worker.replicaReadSeed, recommendLocalScan, kvrpcpb.Context{ IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel), Priority: kvPriorityToCommandPri(worker.req.Priority), @@ -879,7 +898,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch RecordTimeStat: true, RecordScanStat: true, TaskId: worker.req.TaskID, - }) + }) req.StoreTp = task.storeType startTime := time.Now() if worker.Stats == nil { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index d6b7867b50211..541f85cedb847 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -281,10 +281,11 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll pending := batch.keys for { s.mu.RLock() + // TODO(patrick) set recommend local scan for this request. req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &pb.BatchGetRequest{ Keys: pending, Version: s.version.Ver, - }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ + }, s.mu.replicaRead, &s.replicaReadSeed, false, pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, TaskId: s.mu.taskID, @@ -429,7 +430,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte &pb.GetRequest{ Key: k, Version: s.version.Ver, - }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ + }, s.mu.replicaRead, &s.replicaReadSeed, false, pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, TaskId: s.mu.taskID,