From 5eb71514d241e3edeb6a492caa96f085e186be30 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 31 May 2022 14:51:18 +0800 Subject: [PATCH] distsql,executor: using paging protocol for all coprocessor requests --- distsql/request_builder.go | 3 +++ executor/builder.go | 6 ++++++ executor/distsql.go | 1 + executor/index_merge_reader.go | 2 ++ executor/table_reader.go | 6 +++++- 5 files changed, 17 insertions(+), 1 deletion(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index c8d7f0ebb3c0c..d7a7f2a267492 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -264,6 +264,9 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.Priority = builder.getKVPriority(sv) builder.Request.ReplicaRead = replicaReadType builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger()) + if sv.EnablePaging { + builder.SetPaging(true) + } return builder } diff --git a/executor/builder.go b/executor/builder.go index 1300aeb643f7f..f4b29aa817936 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3216,6 +3216,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea if err != nil { return nil, err } + paging := b.ctx.GetSessionVars().EnablePaging e := &TableReaderExecutor{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), dagPB: dagReq, @@ -3227,6 +3228,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea desc: ts.Desc, columns: ts.Columns, streaming: streaming, + paging: paging, corColInFilter: b.corColInDistPlan(v.TablePlans), corColInAccess: b.corColInAccess(v.TablePlans[0]), plans: v.TablePlans, @@ -3489,6 +3491,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea if err != nil { return nil, err } + paging := b.ctx.GetSessionVars().EnablePaging e := &IndexReaderExecutor{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), dagPB: dagReq, @@ -3502,6 +3505,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea desc: is.Desc, columns: is.Columns, streaming: streaming, + paging: paging, corColInFilter: b.corColInDistPlan(v.IndexPlans), corColInAccess: b.corColInAccess(v.IndexPlans[0]), idxCols: is.IdxCols, @@ -3815,6 +3819,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd return nil, err } + paging := b.ctx.GetSessionVars().EnablePaging e := &IndexMergeReaderExecutor{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), dagPBs: partialReqs, @@ -3830,6 +3835,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd tblPlans: v.TablePlans, dataReaderBuilder: readerBuilder, feedbacks: feedbacks, + paging: paging, handleCols: ts.HandleCols, isCorColInPartialFilters: isCorColInPartialFilters, isCorColInTableFilter: isCorColInTableFilter, diff --git a/executor/distsql.go b/executor/distsql.go index 2ec1dca348d0f..bf7484a094656 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -182,6 +182,7 @@ type IndexReaderExecutor struct { feedback *statistics.QueryFeedback streaming bool + paging bool keepOrder bool desc bool diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 660eaa71499b3..4ce7f4f24a888 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -101,6 +101,7 @@ type IndexMergeReaderExecutor struct { // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker + paging bool // checkIndexValue is used to check the consistency of the index data. *checkIndexValue // nolint:unused @@ -310,6 +311,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, SetIsStaleness(e.isStaleness). SetFromSessionVars(e.ctx.GetSessionVars()). SetMemTracker(e.memTracker). + SetPaging(e.paging). SetFromInfoSchema(e.ctx.GetInfoSchema()) for parTblIdx, keyRange := range keyRanges { diff --git a/executor/table_reader.go b/executor/table_reader.go index a9decdd55b1a7..2f08a989d5816 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -97,6 +97,7 @@ type TableReaderExecutor struct { keepOrder bool desc bool streaming bool + paging bool storeType kv.StoreType // corColInFilter tells whether there's correlated column in filter. corColInFilter bool @@ -338,6 +339,7 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ SetFromInfoSchema(e.ctx.GetInfoSchema()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). + SetPaging(e.paging). SetAllowBatchCop(e.batchCop).Build() if err != nil { return nil, err @@ -376,6 +378,7 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex SetFromInfoSchema(e.ctx.GetInfoSchema()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). + SetPaging(e.paging). SetAllowBatchCop(e.batchCop).Build() if err != nil { return nil, err @@ -407,7 +410,8 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R SetFromInfoSchema(e.ctx.GetInfoSchema()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). - SetAllowBatchCop(e.batchCop) + SetAllowBatchCop(e.batchCop). + SetPaging(e.paging) return reqBuilder.Build() }