Skip to content

Commit

Permalink
executor: control Chunk size for TableReader&IndexReader&IndexLookup (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and zz-jason committed Apr 1, 2019
1 parent bf9570a commit 435a081
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 17 deletions.
12 changes: 7 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
filter: outerFilter,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{innerPlan, b},
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
rowTypes: innerTypes,
},
workerWg: new(sync.WaitGroup),
Expand Down Expand Up @@ -1799,6 +1799,8 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
type dataReaderBuilder struct {
plannercore.Plan
*executorBuilder

selectResultHook // for testing
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum,
Expand All @@ -1818,7 +1820,7 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
childBuilder := &dataReaderBuilder{v.Children()[0], builder.executorBuilder}
childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1863,7 +1865,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return nil, errors.Trace(err)
}
e.resultHandler = &tableResultHandler{}
result, err := distsql.SelectWithRuntimeStats(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1892,11 +1894,11 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if err != nil {
return nil, errors.Trace(err)
}
kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.index.ID, values, indexRanges, keyOff2IdxOff)
e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.index.ID, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, errors.Trace(err)
}
err = e.open(ctx, kvRanges)
err = e.open(ctx)
return e, errors.Trace(err)
}

Expand Down
39 changes: 28 additions & 11 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ type IndexReaderExecutor struct {
idxCols []*expression.Column
colLens []int
plans []plannercore.PhysicalPlan

selectResultHook // for testing
}

// Close clears all resources hold by current object.
Expand Down Expand Up @@ -310,7 +312,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.feedback.Invalidate()
return errors.Trace(err)
}
e.result, err = distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
Expand Down Expand Up @@ -343,6 +345,9 @@ type IndexLookUpExecutor struct {
tblWorkerWg sync.WaitGroup
finished chan struct{}

kvRanges []kv.KeyRange
workerStarted bool

resultCh chan *lookupTableTask
resultCurr *lookupTableTask
feedback *statistics.QueryFeedback
Expand Down Expand Up @@ -371,19 +376,19 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
return errors.Trace(err)
}
}
kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.index.ID, e.ranges, e.feedback)
e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.index.ID, e.ranges, e.feedback)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
}
err = e.open(ctx, kvRanges)
err = e.open(ctx)
if err != nil {
e.feedback.Invalidate()
}
return errors.Trace(err)
}

func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
func (e *IndexLookUpExecutor) open(ctx context.Context) error {
// We have to initialize "memTracker" and other execution resources in here
// instead of in function "Open", because this "IndexLookUpExecutor" may be
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
Expand All @@ -408,20 +413,23 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
return errors.Trace(err)
}
}
return nil
}

func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error {
// indexWorker will write to workCh and tableWorker will read from workCh,
// so fetching index and getting table data can run concurrently.
workCh := make(chan *lookupTableTask, 1)
err = e.startIndexWorker(ctx, kvRanges, workCh)
if err != nil {
if err := e.startIndexWorker(ctx, e.kvRanges, workCh, initBatchSize); err != nil {
return errors.Trace(err)
}
e.startTableWorker(ctx, workCh)
e.workerStarted = true
return nil
}

// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error {
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error {
if e.runtimeStats != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
Expand All @@ -445,11 +453,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
result.Fetch(ctx)
worker := &indexWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
resultCh: e.resultCh,
keepOrder: e.keepOrder,
batchSize: e.maxChunkSize,
batchSize: initBatchSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
}
Expand Down Expand Up @@ -525,7 +534,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.finished == nil {
if !e.workerStarted || e.finished == nil {
return nil
}

Expand All @@ -537,6 +546,7 @@ func (e *IndexLookUpExecutor) Close() error {
e.idxWorkerWg.Wait()
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker.Detach()
e.memTracker = nil
if e.runtimeStats != nil {
Expand All @@ -552,6 +562,11 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.RecordBatch)
start := time.Now()
defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }()
}
if !e.workerStarted {
if err := e.startWorkers(ctx, req.RequiredRows()); err != nil {
return errors.Trace(err)
}
}
req.Reset()
for {
resultTask, err := e.getResultTask()
Expand All @@ -564,7 +579,7 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.RecordBatch)
for resultTask.cursor < len(resultTask.rows) {
req.AppendRow(resultTask.rows[resultTask.cursor])
resultTask.cursor++
if req.NumRows() >= e.maxChunkSize {
if req.IsFull() {
return nil
}
}
Expand Down Expand Up @@ -593,6 +608,7 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {

// indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines.
type indexWorker struct {
idxLookup *IndexLookUpExecutor
workCh chan<- *lookupTableTask
finished <-chan struct{}
resultCh chan<- *lookupTableTask
Expand Down Expand Up @@ -625,7 +641,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}
}()
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize)
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
for {
handles, err := w.extractTaskHandles(ctx, chk, result)
if err != nil {
Expand Down Expand Up @@ -655,6 +671,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) {
handles = make([]int64, 0, w.batchSize)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, err
Expand Down
1 change: 1 addition & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func defaultCtx() sessionctx.Context {
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery)
ctx.GetSessionVars().SnapshotTS = uint64(1)
return ctx
}

Expand Down
21 changes: 20 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -32,6 +35,20 @@ import (
// make sure `TableReaderExecutor` implements `Executor`.
var _ Executor = &TableReaderExecutor{}

// selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing.
type selectResultHook struct {
selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error)
}

func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) {
if sr.selectResultFunc == nil {
return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
}
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
}

// TableReaderExecutor sends DAG request and reads table data from kv layer.
type TableReaderExecutor struct {
baseExecutor
Expand All @@ -55,6 +72,8 @@ type TableReaderExecutor struct {
// corColInAccess tells whether there's correlated column in access conditions.
corColInAccess bool
plans []plannercore.PhysicalPlan

selectResultHook // for testing
}

// Open initialzes necessary variables for using this executor.
Expand Down Expand Up @@ -148,7 +167,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
if err != nil {
return nil, errors.Trace(err)
}
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 435a081

Please sign in to comment.