Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: push limit down into IndexLookUpReader executor (#12262) #12378

Merged
merged 2 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,14 @@ MemTableScan_4 10000.00 root
explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1;
id count task operator info
Projection_12 10000.00 root eq(test.t1.c2, test.t2.c2)
└─Apply_14 10000.00 root CARTESIAN left outer join, inner:Limit_21
└─Apply_14 10000.00 root CARTESIAN left outer join, inner:Projection_41
├─IndexReader_16 10000.00 root index:IndexScan_15
│ └─IndexScan_15 10000.00 cop table:t1, index:c2, range:[NULL,+inf], keep order:false, stats:pseudo
└─Limit_21 1.00 root offset:0, count:1
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.00 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root limit embedded(offset:0, count:1)
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.00 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
explain select * from t1 order by c1 desc limit 1;
id count task operator info
Limit_10 1.00 root offset:0, count:1
Expand Down
31 changes: 14 additions & 17 deletions cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,14 @@ MemTableScan_4 10000.00 root
explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1;
id count task operator info
Projection_12 1999.00 root eq(test.t1.c2, test.t2.c2)
└─Apply_14 1999.00 root CARTESIAN left outer join, inner:Limit_21
└─Apply_14 1999.00 root CARTESIAN left outer join, inner:Projection_41
├─IndexReader_16 1999.00 root index:IndexScan_15
│ └─IndexScan_15 1999.00 cop table:t1, index:c2, range:[NULL,+inf], keep order:false
└─Limit_21 1.00 root offset:0, count:1
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
└─Projection_41 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_40 1.00 root limit embedded(offset:0, count:1)
├─Limit_39 1.00 cop offset:0, count:1
│ └─IndexScan_37 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true
└─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo
explain select * from t1 order by c1 desc limit 1;
id count task operator info
Limit_10 1.00 root offset:0, count:1
Expand Down Expand Up @@ -169,18 +168,16 @@ id count task operator info
TableDual_5 0.00 root rows:0
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 1;
id count task operator info
Limit_9 1.00 root offset:1, count:1
└─IndexLookUp_14 1.00 root
├─Limit_13 1.00 cop offset:0, count:2
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 1.00 cop table:index_prune, keep order:false, stats:pseudo
IndexLookUp_14 1.00 root limit embedded(offset:1, count:1)
├─Limit_13 1.00 cop offset:0, count:2
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 1.00 cop table:index_prune, keep order:false, stats:pseudo
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 0;
id count task operator info
Limit_9 0.00 root offset:1, count:0
└─IndexLookUp_14 0.00 root
├─Limit_13 0.00 cop offset:0, count:1
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 0.00 cop table:index_prune, keep order:false, stats:pseudo
IndexLookUp_14 0.00 root limit embedded(offset:1, count:0)
├─Limit_13 0.00 cop offset:0, count:1
│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_12 0.00 cop table:index_prune, keep order:false, stats:pseudo
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 0, 1;
id count task operator info
Point_Get_1 1.00 root table:index_prune, index:a b
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}

if containsLimit(indexReq.Executors) {
Expand Down
50 changes: 39 additions & 11 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ type IndexLookUpExecutor struct {
corColInAccess bool
idxCols []*expression.Column
colLens []int
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *plannercore.PushedDownLimit
}

type checkIndexValue struct {
Expand Down Expand Up @@ -503,6 +505,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
checkIndexValue: e.checkIndexValue,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
PushedLimit: e.PushedLimit,
}
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand All @@ -520,9 +523,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String())
copStats.SetRowNum(count)
copStats.SetRowNum(int64(count))
copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String())
copStats.SetRowNum(count)
copStats.SetRowNum(int64(count))
}
e.ctx.StoreQueryFeedback(e.feedback)
close(workCh)
Expand Down Expand Up @@ -664,12 +667,14 @@ type indexWorker struct {

// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *plannercore.PushedDownLimit
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (count int64, err error) {
func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (count uint64, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
Expand All @@ -694,7 +699,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
}
for {
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result)
handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count)
if err != nil {
doneCh := make(chan error, 1)
doneCh <- err
Expand All @@ -703,10 +708,10 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
return count, err
}
count += scannedKeys
if len(handles) == 0 {
return count, nil
}
count += int64(len(handles))
task := w.buildTableTask(handles, retChunk)
select {
case <-ctx.Done():
Expand All @@ -719,20 +724,43 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}

func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (
handles []int64, retChk *chunk.Chunk, err error) {
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, count uint64) (
handles []int64, retChk *chunk.Chunk, scannedKeys uint64, err error) {
handleOffset := chk.NumCols() - 1
handles = make([]int64, 0, w.batchSize)
// PushedLimit would always be nil for CheckIndex or CheckTable, we add this check just for insurance.
checkLimit := (w.PushedLimit != nil) && (w.checkIndexValue == nil)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
requiredRows := w.batchSize - len(handles)
if checkLimit {
if w.PushedLimit.Offset+w.PushedLimit.Count <= scannedKeys+count {
return handles, nil, scannedKeys, nil
}
leftCnt := w.PushedLimit.Offset + w.PushedLimit.Count - scannedKeys - count
if uint64(requiredRows) > leftCnt {
requiredRows = int(leftCnt)
}
}
chk.SetRequiredRows(requiredRows, w.maxChunkSize)
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, nil, err
return handles, nil, scannedKeys, err
}
if chk.NumRows() == 0 {
return handles, retChk, nil
return handles, retChk, scannedKeys, nil
}
for i := 0; i < chk.NumRows(); i++ {
scannedKeys++
if checkLimit {
if (count + scannedKeys) <= w.PushedLimit.Offset {
// Skip the preceding Offset handles.
continue
}
if (count + scannedKeys) > (w.PushedLimit.Offset + w.PushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, nil, scannedKeys, nil
}
}
h := chk.GetRow(i).GetInt64(handleOffset)
handles = append(handles, h)
}
Expand All @@ -747,7 +775,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk,
if w.batchSize > w.maxBatchSize {
w.batchSize = w.maxBatchSize
}
return handles, retChk, nil
return handles, retChk, scannedKeys, nil
}

func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask {
Expand Down
15 changes: 15 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,18 @@ func (s *testSuite3) TestInconsistentIndex(c *C) {
c.Assert(err, IsNil)
}
}

func (s *testSuite3) TestPushLimitDownIndexLookUpReader(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl")
tk.MustExec("create table tbl(a int, b int, c int, key idx_b_c(b,c))")
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 4 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 3 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 2 limit 2,1").Check(testkit.Rows("5 5 5"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 1").Check(testkit.Rows("2 2 2"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 order by b desc limit 2,1").Check(testkit.Rows("3 3 3"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 and c > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
}
3 changes: 3 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func (p *PhysicalIndexReader) ExplainInfo() string {
// ExplainInfo implements PhysicalPlan interface.
func (p *PhysicalIndexLookUpReader) ExplainInfo() string {
// The children can be inferred by the relation symbol.
if p.PushedLimit != nil {
return fmt.Sprintf("limit embedded(offset:%v, count:%v)", p.PushedLimit.Offset, p.PushedLimit.Count)
}
return ""
}

Expand Down
8 changes: 4 additions & 4 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test TopN to Limit in double read.
{
sql: "select * from t where t.c = 1 and t.e = 1 order by t.d limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))",
},
// Test TopN to Limit in index single read.
{
Expand Down Expand Up @@ -151,7 +151,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test Limit push down in double single read.
{
sql: "select c, b from t where c = 1 limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit->Projection",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Projection",
},
// Test Selection + Limit push down in double single read.
{
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test PK in index double read.
{
sql: "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))",
},
// Test index filter condition push down.
{
Expand Down Expand Up @@ -574,7 +574,7 @@ func (s *testPlanSuite) TestDAGPlanTopN(c *C) {
},
{
sql: "select * from t where c = 1 order by c limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))",
},
{
sql: "select * from t order by a limit 1",
Expand Down
9 changes: 9 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type PhysicalIndexReader struct {
OutputColumns []*expression.Column
}

// PushedDownLimit is the limit operator pushed down into PhysicalIndexLookUpReader.
type PushedDownLimit struct {
Offset uint64
Count uint64
}

// PhysicalIndexLookUpReader is the index look up reader in tidb. It's used in case of double reading.
type PhysicalIndexLookUpReader struct {
physicalSchemaProducer
Expand All @@ -82,6 +88,9 @@ type PhysicalIndexLookUpReader struct {
TablePlans []PhysicalPlan
indexPlan PhysicalPlan
tablePlan PhysicalPlan

// PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader.
PushedLimit *PushedDownLimit
}

// PhysicalIndexScan represents an index scan plan.
Expand Down
38 changes: 36 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (t *rootTask) plan() PhysicalPlan {

func (p *PhysicalLimit) attach2Task(tasks ...task) task {
t := tasks[0].copy()
sunk := false
if cop, ok := t.(*copTask); ok {
// If the table/index scans data by order and applies a double read, the limit cannot be pushed to the table side.
if !cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil {
Expand All @@ -272,9 +273,42 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
cop = attachPlan2Task(pushedDownLimit, cop).(*copTask)
}
t = finishCopTask(p.ctx, cop)
sunk = p.sinkIntoIndexLookUp(t)
}
t = attachPlan2Task(p, t)
return t
if sunk {
return t
}
return attachPlan2Task(p, t)
}

func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
root := t.(*rootTask)
reader, isDoubleRead := root.p.(*PhysicalIndexLookUpReader)
proj, isProj := root.p.(*PhysicalProjection)
if !isDoubleRead && !isProj {
return false
}
if isProj {
reader, isDoubleRead = proj.Children()[0].(*PhysicalIndexLookUpReader)
if !isDoubleRead {
return false
}
}
// We can sink Limit into IndexLookUpReader only if tablePlan contains no Selection.
ts, isTableScan := reader.tablePlan.(*PhysicalTableScan)
if !isTableScan {
return false
}
reader.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
ts.stats = p.stats
reader.stats = p.stats
if isProj {
proj.stats = p.stats
}
return true
}

// GetCost computes the cost of in memory sort.
Expand Down