Skip to content

Commit

Permalink
store/mockstore/unistore: fix several issues of coprocessor paging in…
Browse files Browse the repository at this point in the history
… unistore
  • Loading branch information
tiancaiamao committed Jul 12, 2022
1 parent 613c5dc commit 4a3a584
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
3 changes: 2 additions & 1 deletion store/mockstore/unistore/cophandler/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingS
if pagingSize > 0 {
lastRange = &coprocessor.KeyRange{}
builder.paging = lastRange
builder.pagingSize = pagingSize
}
exec, err := builder.buildMPPExecutor(rootExec)
if err != nil {
Expand Down Expand Up @@ -221,7 +222,7 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest, pagin
if pagingSize > 0 {
totalRows += uint64(chk.NumRows())
if totalRows > pagingSize {
break
return
}
}
default:
Expand Down
25 changes: 16 additions & 9 deletions store/mockstore/unistore/cophandler/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ const (
)

type mppExecBuilder struct {
sc *stmtctx.StatementContext
dbReader *dbreader.DBReader
mppCtx *MPPCtx
dagReq *tipb.DAGRequest
dagCtx *dagContext
counts []int64
ndvs []int64
paging *coprocessor.KeyRange
sc *stmtctx.StatementContext
dbReader *dbreader.DBReader
mppCtx *MPPCtx
dagReq *tipb.DAGRequest
dagCtx *dagContext
counts []int64
ndvs []int64
paging *coprocessor.KeyRange
pagingSize uint64
}

func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, error) {
Expand Down Expand Up @@ -199,7 +200,7 @@ func (b *mppExecBuilder) buildLimit(pb *tipb.Limit) (*limitExec, error) {
return exec, nil
}

func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (*topNExec, error) {
func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (mppExec, error) {
child, err := b.buildMPPExecutor(pb.Child)
if err != nil {
return nil, err
Expand Down Expand Up @@ -227,6 +228,12 @@ func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (*topNExec, error) {
row: newTopNSortRow(len(conds)),
topn: pb.Limit,
}

// When using paging protocol, if paging size < topN limit, the topN exec degenerate to do nothing.
if b.paging != nil && b.pagingSize < pb.Limit {
exec.dummy = true
}

return exec, nil
}

Expand Down
15 changes: 14 additions & 1 deletion store/mockstore/unistore/cophandler/mpp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ func (e *indexScanExec) Process(key, value []byte) error {
if e.chk.IsFull() {
e.chunks = append(e.chunks, e.chk)
if e.paging != nil {
e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, key)
lastProcessed := kv.Key(append([]byte{}, key...)) // need a deep copy to store the key
e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, lastProcessed)
}
e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize)
}
Expand Down Expand Up @@ -423,6 +424,9 @@ type topNExec struct {
conds []expression.Expression
row *sortRow
recv []*chunk.Chunk

// When dummy is true, topNExec just copy what it read from children to its parent.
dummy bool
}

func (e *topNExec) open() error {
Expand All @@ -432,6 +436,11 @@ func (e *topNExec) open() error {
if err != nil {
return err
}

if e.dummy {
return nil
}

for {
chk, err = e.children[0].next()
if err != nil {
Expand Down Expand Up @@ -466,6 +475,10 @@ func (e *topNExec) open() error {
}

func (e *topNExec) next() (*chunk.Chunk, error) {
if e.dummy {
return e.children[0].next()
}

chk := chunk.NewChunkWithCapacity(e.getFieldTypes(), DefaultBatchSize)
for ; !chk.IsFull() && e.idx < e.topn && e.idx < uint64(e.heap.heapSize); e.idx++ {
row := e.heap.rows[e.idx]
Expand Down

0 comments on commit 4a3a584

Please sign in to comment.