diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index 75fa686ff8fca..5f375f2bfdc30 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -179,6 +179,7 @@ func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingS if pagingSize > 0 { lastRange = &coprocessor.KeyRange{} builder.paging = lastRange + builder.pagingSize = pagingSize } exec, err := builder.buildMPPExecutor(rootExec) if err != nil { @@ -221,7 +222,7 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest, pagin if pagingSize > 0 { totalRows += uint64(chk.NumRows()) if totalRows > pagingSize { - break + return } } default: diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index fb9cfeaf1aff1..1cf0746e861dd 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -51,14 +51,15 @@ const ( ) type mppExecBuilder struct { - sc *stmtctx.StatementContext - dbReader *dbreader.DBReader - mppCtx *MPPCtx - dagReq *tipb.DAGRequest - dagCtx *dagContext - counts []int64 - ndvs []int64 - paging *coprocessor.KeyRange + sc *stmtctx.StatementContext + dbReader *dbreader.DBReader + mppCtx *MPPCtx + dagReq *tipb.DAGRequest + dagCtx *dagContext + counts []int64 + ndvs []int64 + paging *coprocessor.KeyRange + pagingSize uint64 } func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, error) { @@ -199,7 +200,7 @@ func (b *mppExecBuilder) buildLimit(pb *tipb.Limit) (*limitExec, error) { return exec, nil } -func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (*topNExec, error) { +func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (mppExec, error) { child, err := b.buildMPPExecutor(pb.Child) if err != nil { return nil, err @@ -227,6 +228,12 @@ func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (*topNExec, error) { row: newTopNSortRow(len(conds)), topn: pb.Limit, } + + // When using paging protocol, if paging size < topN limit, the topN exec degenerate to do nothing. + if b.paging != nil && b.pagingSize < pb.Limit { + exec.dummy = true + } + return exec, nil } diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index 8e079991c7daa..3e164da24d458 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -319,7 +319,8 @@ func (e *indexScanExec) Process(key, value []byte) error { if e.chk.IsFull() { e.chunks = append(e.chunks, e.chk) if e.paging != nil { - e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, key) + lastProcessed := kv.Key(append([]byte{}, key...)) // need a deep copy to store the key + e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, lastProcessed) } e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize) } @@ -423,6 +424,9 @@ type topNExec struct { conds []expression.Expression row *sortRow recv []*chunk.Chunk + + // When dummy is true, topNExec just copy what it read from children to its parent. + dummy bool } func (e *topNExec) open() error { @@ -432,6 +436,11 @@ func (e *topNExec) open() error { if err != nil { return err } + + if e.dummy { + return nil + } + for { chk, err = e.children[0].next() if err != nil { @@ -466,6 +475,10 @@ func (e *topNExec) open() error { } func (e *topNExec) next() (*chunk.Chunk, error) { + if e.dummy { + return e.children[0].next() + } + chk := chunk.NewChunkWithCapacity(e.getFieldTypes(), DefaultBatchSize) for ; !chk.IsFull() && e.idx < e.topn && e.idx < uint64(e.heap.heapSize); e.idx++ { row := e.heap.rows[e.idx]