Skip to content

Commit

Permalink
Merge pull request pingcap#3 from PatrickNicholas/2020-hackathon
Browse files Browse the repository at this point in the history
Add network cost estimater for adpative follower local read
  • Loading branch information
zhangjinpeng87 authored Jan 16, 2021
2 parents ca08051 + b936543 commit 40d9bf5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 17 deletions.
6 changes: 3 additions & 3 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
return builder
}

// SetRecommendLocalScan sets "RecommendLocalScan" flag for "kv.Request".
func (builder *RequestBuilder) SetRecommendLocalScan(recommendLocalScan bool) *RequestBuilder {
builder.Request.RecommendLocalScan = recommendLocalScan
// SetNetworkCostEstimater sets "NetworkCostEstimater" flag for "kv.Request".
func (builder *RequestBuilder) SetNetworkCostEstimater(estimator func(*kv.KeyRange, []kv.KeyRange) float64) *RequestBuilder {
builder.Request.NetworkCostEstimater = estimator
return builder
}

Expand Down
15 changes: 8 additions & 7 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

// CostThreshold marks the cost threshold to determine whether given request is
// a large scan request.
const CostThreshold = 100000000.0

// make sure `TableReaderExecutor` implements `Executor`.
var _ Executor = &TableReaderExecutor{}

Expand Down Expand Up @@ -228,9 +224,14 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}

totalCst := 0.0
factor := e.ctx.GetSessionVars().NetworkFactor
totalCost := 0.0
for _, plan := range e.plans {
totalCst += plan.StatsCount()
totalCost += plan.StatsCount()*factor +
plan.Stats().HistColl.GetAvgRowSize(e.ctx, plan.Schema().Columns, false, true)
}
estimater := func(r *kv.KeyRange, totalRanges []kv.KeyRange) float64 {
return totalCost * 1.0 / (float64(len(totalRanges)) + 1.0)
}

kvReq, err := reqBuilder.
Expand All @@ -244,7 +245,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
SetRecommendLocalScan(totalCst >= CostThreshold). // TODO: need to replace with real cost, this is just row count
SetNetworkCostEstimater(estimater).
Build()
if err != nil {
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,8 @@ type Request struct {
TaskID uint64
// TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances.
TiDBServerID uint64
// RecommendLocalScan indicates whether the planner recommends a local tikv replica scan, when
// the tidb-server and leader are not in the same AZ/DC.
RecommendLocalScan bool

NetworkCostEstimater func(*KeyRange, []KeyRange) float64
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
23 changes: 21 additions & 2 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ import (
"go.uber.org/zap"
)

// CostThreshold marks the cost threshold to determine whether given request is
// a large scan request.
const CostThreshold = 64 * 1024 // 64KB

var (
tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor")
tikvTxnRegionsNumHistogramWithBatchCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor")
Expand Down Expand Up @@ -870,16 +874,31 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}
}

recommendLocalScan := false
if worker.req.NetworkCostEstimater != nil {
networkCost := 0.0
for _, r := range copReq.Ranges {
kvRange := kv.KeyRange{
StartKey: r.Start,
EndKey: r.End,
}
networkCost += worker.req.NetworkCostEstimater(&kvRange, worker.req.KeyRanges)
}
if networkCost > float64(CostThreshold) {
recommendLocalScan = true
}
}

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead,
&worker.replicaReadSeed, worker.req.RecommendLocalScan,
&worker.replicaReadSeed, recommendLocalScan,
kvrpcpb.Context{
IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel),
Priority: kvPriorityToCommandPri(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
TaskId: worker.req.TaskID,
})
})
req.StoreTp = task.storeType
startTime := time.Now()
if worker.Stats == nil {
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,11 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
pending := batch.keys
for {
s.mu.RLock()
// TODO(patrick) set recommend local scan for this request.
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &pb.BatchGetRequest{
Keys: pending,
Version: s.version.Ver,
}, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{
}, s.mu.replicaRead, &s.replicaReadSeed, false, pb.Context{
Priority: s.priority,
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
Expand Down Expand Up @@ -429,7 +430,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
&pb.GetRequest{
Key: k,
Version: s.version.Ver,
}, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{
}, s.mu.replicaRead, &s.replicaReadSeed, false, pb.Context{
Priority: s.priority,
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
Expand Down

0 comments on commit 40d9bf5

Please sign in to comment.