Skip to content

Commit

Permalink
planner, executor: push limit down into IndexLookUpReader exec… (#12378)
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored and zz-jason committed Sep 25, 2019
1 parent 7c48202 commit 3716ac5
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 41 deletions.
13 changes: 6 additions & 7 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,14 @@ MemTableScan_4 10000.00 root
explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1;
id count task operator info
Projection_12 10000.00 root eq(test.t1.c2, test.t2.c2)
└─Apply_14 10000.00 root CARTESIAN left outer join, inner:Limit_21
└─Apply_14 10000.00 root CARTESIAN left outer join, inner:Projection_41
├─IndexReader_16 10000.00 root index:IndexScan_15
│ └─IndexScan_15 10000.00 cop table:t1, index:c2, range:[NULL,+inf], keep order:false, stats:pseudo
└─Limit_21 1.00 root offset:0, count:1
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.00 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root limit embedded(offset:0, count:1)
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.00 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
explain select * from t1 order by c1 desc limit 1;
id count task operator info
Limit_10 1.00 root offset:0, count:1
Expand Down
31 changes: 14 additions & 17 deletions cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,14 @@ MemTableScan_4 10000.00 root
explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1;
id count task operator info
Projection_12 1999.00 root eq(test.t1.c2, test.t2.c2)
└─Apply_14 1999.00 root CARTESIAN left outer join, inner:Limit_21
└─Apply_14 1999.00 root CARTESIAN left outer join, inner:Projection_41
├─IndexReader_16 1999.00 root index:IndexScan_15
│ └─IndexScan_15 1999.00 cop table:t1, index:c2, range:[NULL,+inf], keep order:false
└─Limit_21 1.00 root offset:0, count:1
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root limit embedded(offset:0, count:1)
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
explain select * from t1 order by c1 desc limit 1;
id count task operator info
Limit_10 1.00 root offset:0, count:1
Expand Down Expand Up @@ -169,18 +168,16 @@ id count task operator info
TableDual_5 0.00 root rows:0
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 1;
id count task operator info
Limit_9 1.00 root offset:1, count:1
└─IndexLookUp_14 1.00 root
├─Limit_13 1.00 cop offset:0, count:2
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 1.00 cop table:index_prune, keep order:false, stats:pseudo
IndexLookUp_14 1.00 root limit embedded(offset:1, count:1)
├─Limit_13 1.00 cop offset:0, count:2
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 1.00 cop table:index_prune, keep order:false, stats:pseudo
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 0;
id count task operator info
Limit_9 0.00 root offset:1, count:0
└─IndexLookUp_14 0.00 root
├─Limit_13 0.00 cop offset:0, count:1
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 0.00 cop table:index_prune, keep order:false, stats:pseudo
IndexLookUp_14 0.00 root limit embedded(offset:1, count:0)
├─Limit_13 0.00 cop offset:0, count:1
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 0.00 cop table:index_prune, keep order:false, stats:pseudo
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 0, 1;
id count task operator info
Point_Get_1 1.00 root table:index_prune, index:a b
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}

if containsLimit(indexReq.Executors) {
Expand Down
50 changes: 39 additions & 11 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ type IndexLookUpExecutor struct {
corColInAccess bool
idxCols []*expression.Column
colLens []int
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *plannercore.PushedDownLimit
}

type checkIndexValue struct {
Expand Down Expand Up @@ -503,6 +505,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
checkIndexValue: e.checkIndexValue,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
PushedLimit: e.PushedLimit,
}
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand All @@ -520,9 +523,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String())
copStats.SetRowNum(count)
copStats.SetRowNum(int64(count))
copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String())
copStats.SetRowNum(count)
copStats.SetRowNum(int64(count))
}
e.ctx.StoreQueryFeedback(e.feedback)
close(workCh)
Expand Down Expand Up @@ -664,12 +667,14 @@ type indexWorker struct {

// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *plannercore.PushedDownLimit
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (count int64, err error) {
func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (count uint64, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
Expand All @@ -694,7 +699,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
}
for {
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result)
handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count)
if err != nil {
doneCh := make(chan error, 1)
doneCh <- err
Expand All @@ -703,10 +708,10 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
return count, err
}
count += scannedKeys
if len(handles) == 0 {
return count, nil
}
count += int64(len(handles))
task := w.buildTableTask(handles, retChunk)
select {
case <-ctx.Done():
Expand All @@ -719,20 +724,43 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}

func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (
handles []int64, retChk *chunk.Chunk, err error) {
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, count uint64) (
handles []int64, retChk *chunk.Chunk, scannedKeys uint64, err error) {
handleOffset := chk.NumCols() - 1
handles = make([]int64, 0, w.batchSize)
// PushedLimit would always be nil for CheckIndex or CheckTable, we add this check just for insurance.
checkLimit := (w.PushedLimit != nil) && (w.checkIndexValue == nil)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
requiredRows := w.batchSize - len(handles)
if checkLimit {
if w.PushedLimit.Offset+w.PushedLimit.Count <= scannedKeys+count {
return handles, nil, scannedKeys, nil
}
leftCnt := w.PushedLimit.Offset + w.PushedLimit.Count - scannedKeys - count
if uint64(requiredRows) > leftCnt {
requiredRows = int(leftCnt)
}
}
chk.SetRequiredRows(requiredRows, w.maxChunkSize)
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, nil, err
return handles, nil, scannedKeys, err
}
if chk.NumRows() == 0 {
return handles, retChk, nil
return handles, retChk, scannedKeys, nil
}
for i := 0; i < chk.NumRows(); i++ {
scannedKeys++
if checkLimit {
if (count + scannedKeys) <= w.PushedLimit.Offset {
// Skip the preceding Offset handles.
continue
}
if (count + scannedKeys) > (w.PushedLimit.Offset + w.PushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, nil, scannedKeys, nil
}
}
h := chk.GetRow(i).GetInt64(handleOffset)
handles = append(handles, h)
}
Expand All @@ -747,7 +775,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk,
if w.batchSize > w.maxBatchSize {
w.batchSize = w.maxBatchSize
}
return handles, retChk, nil
return handles, retChk, scannedKeys, nil
}

func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask {
Expand Down
15 changes: 15 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,18 @@ func (s *testSuite3) TestInconsistentIndex(c *C) {
c.Assert(err, IsNil)
}
}

func (s *testSuite3) TestPushLimitDownIndexLookUpReader(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl")
tk.MustExec("create table tbl(a int, b int, c int, key idx_b_c(b,c))")
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 4 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 3 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 2 limit 2,1").Check(testkit.Rows("5 5 5"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 1").Check(testkit.Rows("2 2 2"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 order by b desc limit 2,1").Check(testkit.Rows("3 3 3"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 and c > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
}
3 changes: 3 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func (p *PhysicalIndexReader) ExplainInfo() string {
// ExplainInfo implements PhysicalPlan interface.
func (p *PhysicalIndexLookUpReader) ExplainInfo() string {
// The children can be inferred by the relation symbol.
if p.PushedLimit != nil {
return fmt.Sprintf("limit embedded(offset:%v, count:%v)", p.PushedLimit.Offset, p.PushedLimit.Count)
}
return ""
}

Expand Down
8 changes: 4 additions & 4 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test TopN to Limit in double read.
{
sql: "select * from t where t.c = 1 and t.e = 1 order by t.d limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))",
},
// Test TopN to Limit in index single read.
{
Expand Down Expand Up @@ -151,7 +151,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test Limit push down in double single read.
{
sql: "select c, b from t where c = 1 limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit->Projection",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Projection",
},
// Test Selection + Limit push down in double single read.
{
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test PK in index double read.
{
sql: "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))",
},
// Test index filter condition push down.
{
Expand Down Expand Up @@ -574,7 +574,7 @@ func (s *testPlanSuite) TestDAGPlanTopN(c *C) {
},
{
sql: "select * from t where c = 1 order by c limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))",
},
{
sql: "select * from t order by a limit 1",
Expand Down
9 changes: 9 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type PhysicalIndexReader struct {
OutputColumns []*expression.Column
}

// PushedDownLimit is the limit operator pushed down into PhysicalIndexLookUpReader.
type PushedDownLimit struct {
Offset uint64
Count uint64
}

// PhysicalIndexLookUpReader is the index look up reader in tidb. It's used in case of double reading.
type PhysicalIndexLookUpReader struct {
physicalSchemaProducer
Expand All @@ -82,6 +88,9 @@ type PhysicalIndexLookUpReader struct {
TablePlans []PhysicalPlan
indexPlan PhysicalPlan
tablePlan PhysicalPlan

// PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader.
PushedLimit *PushedDownLimit
}

// PhysicalIndexScan represents an index scan plan.
Expand Down
38 changes: 36 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (t *rootTask) plan() PhysicalPlan {

func (p *PhysicalLimit) attach2Task(tasks ...task) task {
t := tasks[0].copy()
sunk := false
if cop, ok := t.(*copTask); ok {
// If the table/index scans data by order and applies a double read, the limit cannot be pushed to the table side.
if !cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil {
Expand All @@ -272,9 +273,42 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
cop = attachPlan2Task(pushedDownLimit, cop).(*copTask)
}
t = finishCopTask(p.ctx, cop)
sunk = p.sinkIntoIndexLookUp(t)
}
t = attachPlan2Task(p, t)
return t
if sunk {
return t
}
return attachPlan2Task(p, t)
}

func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
root := t.(*rootTask)
reader, isDoubleRead := root.p.(*PhysicalIndexLookUpReader)
proj, isProj := root.p.(*PhysicalProjection)
if !isDoubleRead && !isProj {
return false
}
if isProj {
reader, isDoubleRead = proj.Children()[0].(*PhysicalIndexLookUpReader)
if !isDoubleRead {
return false
}
}
// We can sink Limit into IndexLookUpReader only if tablePlan contains no Selection.
ts, isTableScan := reader.tablePlan.(*PhysicalTableScan)
if !isTableScan {
return false
}
reader.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
ts.stats = p.stats
reader.stats = p.stats
if isProj {
proj.stats = p.stats
}
return true
}

// GetCost computes the cost of in memory sort.
Expand Down

0 comments on commit 3716ac5

Please sign in to comment.