Skip to content

Commit

Permalink
set batchSize to the first RequiredRows
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Mar 28, 2019
1 parent 6a16eca commit 4179218
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"
"unsafe"

"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -346,8 +345,8 @@ type IndexLookUpExecutor struct {
tblWorkerWg sync.WaitGroup
finished chan struct{}

// parentReqRows indicates how many rows the parent want now.
parentReqRows int64
requiredRowsCh chan int
batchSizeIsInit bool

resultCh chan *lookupTableTask
resultCurr *lookupTableTask
Expand Down Expand Up @@ -399,7 +398,7 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)

e.finished = make(chan struct{})
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
e.parentReqRows = int64(e.maxChunkSize)
e.requiredRowsCh = make(chan int, 1)

var err error
if e.corColInIdxSide {
Expand Down Expand Up @@ -466,6 +465,14 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
e.idxWorkerWg.Add(1)
go func() {
// init worker's batchSize to the first RequiredRows to
// reduce the memory usage and network-waiting time
requiredRow, ok := <-e.requiredRowsCh
if !ok {
return
}
worker.batchSize = int(requiredRow)

ctx1, cancel := context.WithCancel(ctx)
count, err := worker.fetchHandles(ctx1, result)
if err != nil {
Expand Down Expand Up @@ -538,6 +545,7 @@ func (e *IndexLookUpExecutor) Close() error {
}

close(e.finished)
close(e.requiredRowsCh)
// Drain the resultCh and discard the result, in case that Next() doesn't fully
// consume the data, background worker still writing to resultCh and block forever.
for range e.resultCh {
Expand All @@ -560,7 +568,10 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.RecordBatch)
start := time.Now()
defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }()
}
atomic.StoreInt64(&e.parentReqRows, int64(req.RequiredRows()))
if !e.batchSizeIsInit {
e.requiredRowsCh <- req.RequiredRows()
e.batchSizeIsInit = true
}
req.Reset()
for {
resultTask, err := e.getResultTask()
Expand Down Expand Up @@ -663,11 +674,9 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}

func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) {
requiredNow := int(atomic.LoadInt64(&w.idxLookup.parentReqRows))
requiredNow = mathutil.Min(requiredNow, w.batchSize)
handles = make([]int64, 0, requiredNow)
for len(handles) < requiredNow {
chk.SetRequiredRows(requiredNow-len(handles), w.maxChunkSize)
handles = make([]int64, 0, w.batchSize)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, err
Expand Down

0 comments on commit 4179218

Please sign in to comment.