diff --git a/cmd/explaintest/r/explain_easy.result b/cmd/explaintest/r/explain_easy.result index 2c9ed5f634306..387073b759936 100644 --- a/cmd/explaintest/r/explain_easy.result +++ b/cmd/explaintest/r/explain_easy.result @@ -122,15 +122,14 @@ MemTableScan_4 10000.00 root explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1; id count task operator info Projection_12 10000.00 root eq(test.t1.c2, test.t2.c2) -└─Apply_14 10000.00 root CARTESIAN left outer join, inner:Limit_21 +└─Apply_14 10000.00 root CARTESIAN left outer join, inner:Projection_41 ├─IndexReader_16 10000.00 root index:IndexScan_15 │ └─IndexScan_15 10000.00 cop table:t1, index:c2, range:[NULL,+inf], keep order:false, stats:pseudo - └─Limit_21 1.00 root offset:0, count:1 - └─Projection_41 1.00 root test.t2.c1, test.t2.c2 - └─IndexLookUp_40 1.00 root - ├─Limit_39 1.00 cop offset:0, count:1 - │ └─IndexScan_37 1.00 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo - └─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo + └─Projection_41 1.00 root test.t2.c1, test.t2.c2 + └─IndexLookUp_40 1.00 root limit embedded(offset:0, count:1) + ├─Limit_39 1.00 cop offset:0, count:1 + │ └─IndexScan_37 1.00 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo + └─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo explain select * from t1 order by c1 desc limit 1; id count task operator info Limit_10 1.00 root offset:0, count:1 diff --git a/cmd/explaintest/r/explain_easy_stats.result b/cmd/explaintest/r/explain_easy_stats.result index 9c82b0923467a..4c0329fd824de 100644 --- a/cmd/explaintest/r/explain_easy_stats.result +++ b/cmd/explaintest/r/explain_easy_stats.result @@ -108,15 +108,14 @@ MemTableScan_4 10000.00 root explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1; id count task operator info Projection_12 1999.00 root eq(test.t1.c2, test.t2.c2) -└─Apply_14 1999.00 root CARTESIAN left outer join, inner:Limit_21 +└─Apply_14 1999.00 root CARTESIAN left outer join, inner:Projection_41 ├─IndexReader_16 1999.00 root index:IndexScan_15 │ └─IndexScan_15 1999.00 cop table:t1, index:c2, range:[NULL,+inf], keep order:false - └─Limit_21 1.00 root offset:0, count:1 - └─Projection_41 1.00 root test.t2.c1, test.t2.c2 - └─IndexLookUp_40 1.00 root - ├─Limit_39 1.00 cop offset:0, count:1 - │ └─IndexScan_37 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true - └─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo + └─Projection_41 1.00 root test.t2.c1, test.t2.c2 + └─IndexLookUp_40 1.00 root limit embedded(offset:0, count:1) + ├─Limit_39 1.00 cop offset:0, count:1 + │ └─IndexScan_37 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true + └─TableScan_38 1.00 cop table:t2, keep order:false, stats:pseudo explain select * from t1 order by c1 desc limit 1; id count task operator info Limit_10 1.00 root offset:0, count:1 @@ -169,18 +168,16 @@ id count task operator info TableDual_5 0.00 root rows:0 explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 1; id count task operator info -Limit_9 1.00 root offset:1, count:1 -└─IndexLookUp_14 1.00 root - ├─Limit_13 1.00 cop offset:0, count:2 - │ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false - └─TableScan_12 1.00 cop table:index_prune, keep order:false, stats:pseudo +IndexLookUp_14 1.00 root limit embedded(offset:1, count:1) +├─Limit_13 1.00 cop offset:0, count:2 +│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false +└─TableScan_12 1.00 cop table:index_prune, keep order:false, stats:pseudo explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 0; id count task operator info -Limit_9 0.00 root offset:1, count:0 -└─IndexLookUp_14 0.00 root - ├─Limit_13 0.00 cop offset:0, count:1 - │ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false - └─TableScan_12 0.00 cop table:index_prune, keep order:false, stats:pseudo +IndexLookUp_14 0.00 root limit embedded(offset:1, count:0) +├─Limit_13 0.00 cop offset:0, count:1 +│ └─IndexScan_11 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false +└─TableScan_12 0.00 cop table:index_prune, keep order:false, stats:pseudo explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 0, 1; id count task operator info Point_Get_1 1.00 root table:index_prune, index:a b diff --git a/executor/builder.go b/executor/builder.go index 591c826005f56..35e0b969fd229 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1998,6 +1998,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn colLens: is.IdxColLens, idxPlans: v.IndexPlans, tblPlans: v.TablePlans, + PushedLimit: v.PushedLimit, } if containsLimit(indexReq.Executors) { diff --git a/executor/distsql.go b/executor/distsql.go index e3c4bfa6d2cdb..f499070c91e7e 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -391,6 +391,8 @@ type IndexLookUpExecutor struct { corColInAccess bool idxCols []*expression.Column colLens []int + // PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader. + PushedLimit *plannercore.PushedDownLimit } type checkIndexValue struct { @@ -503,6 +505,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k checkIndexValue: e.checkIndexValue, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + PushedLimit: e.PushedLimit, } if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -520,9 +523,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k } if e.runtimeStats != nil { copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String()) - copStats.SetRowNum(count) + copStats.SetRowNum(int64(count)) copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String()) - copStats.SetRowNum(count) + copStats.SetRowNum(int64(count)) } e.ctx.StoreQueryFeedback(e.feedback) close(workCh) @@ -664,12 +667,14 @@ type indexWorker struct { // checkIndexValue is used to check the consistency of the index data. *checkIndexValue + // PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader. + PushedLimit *plannercore.PushedDownLimit } // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. // The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh // at the same time to keep data ordered. -func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (count int64, err error) { +func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult) (count uint64, err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -694,7 +699,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) } for { - handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) + handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count) if err != nil { doneCh := make(chan error, 1) doneCh <- err @@ -703,10 +708,10 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } return count, err } + count += scannedKeys if len(handles) == 0 { return count, nil } - count += int64(len(handles)) task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): @@ -719,20 +724,43 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } -func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( - handles []int64, retChk *chunk.Chunk, err error) { +func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, count uint64) ( + handles []int64, retChk *chunk.Chunk, scannedKeys uint64, err error) { handleOffset := chk.NumCols() - 1 handles = make([]int64, 0, w.batchSize) + // PushedLimit would always be nil for CheckIndex or CheckTable, we add this check just for insurance. + checkLimit := (w.PushedLimit != nil) && (w.checkIndexValue == nil) for len(handles) < w.batchSize { - chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) + requiredRows := w.batchSize - len(handles) + if checkLimit { + if w.PushedLimit.Offset+w.PushedLimit.Count <= scannedKeys+count { + return handles, nil, scannedKeys, nil + } + leftCnt := w.PushedLimit.Offset + w.PushedLimit.Count - scannedKeys - count + if uint64(requiredRows) > leftCnt { + requiredRows = int(leftCnt) + } + } + chk.SetRequiredRows(requiredRows, w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { - return handles, nil, err + return handles, nil, scannedKeys, err } if chk.NumRows() == 0 { - return handles, retChk, nil + return handles, retChk, scannedKeys, nil } for i := 0; i < chk.NumRows(); i++ { + scannedKeys++ + if checkLimit { + if (count + scannedKeys) <= w.PushedLimit.Offset { + // Skip the preceding Offset handles. + continue + } + if (count + scannedKeys) > (w.PushedLimit.Offset + w.PushedLimit.Count) { + // Skip the handles after Offset+Count. + return handles, nil, scannedKeys, nil + } + } h := chk.GetRow(i).GetInt64(handleOffset) handles = append(handles, h) } @@ -747,7 +775,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, if w.batchSize > w.maxBatchSize { w.batchSize = w.maxBatchSize } - return handles, retChk, nil + return handles, retChk, scannedKeys, nil } func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask { diff --git a/executor/distsql_test.go b/executor/distsql_test.go index c68850092505c..dfbf03221cebf 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -237,3 +237,18 @@ func (s *testSuite3) TestInconsistentIndex(c *C) { c.Assert(err, IsNil) } } + +func (s *testSuite3) TestPushLimitDownIndexLookUpReader(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl") + tk.MustExec("create table tbl(a int, b int, c int, key idx_b_c(b,c))") + tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)") + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 2,1").Check(testkit.Rows("4 4 4")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 4 limit 2,1").Check(testkit.Rows()) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 3 limit 2,1").Check(testkit.Rows()) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 2 limit 2,1").Check(testkit.Rows("5 5 5")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 1").Check(testkit.Rows("2 2 2")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 order by b desc limit 2,1").Check(testkit.Rows("3 3 3")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 and c > 1 limit 2,1").Check(testkit.Rows("4 4 4")) +} diff --git a/planner/core/explain.go b/planner/core/explain.go index e3ed7f5a18e55..1f865d663ae11 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -141,6 +141,9 @@ func (p *PhysicalIndexReader) ExplainInfo() string { // ExplainInfo implements PhysicalPlan interface. func (p *PhysicalIndexLookUpReader) ExplainInfo() string { // The children can be inferred by the relation symbol. + if p.PushedLimit != nil { + return fmt.Sprintf("limit embedded(offset:%v, count:%v)", p.PushedLimit.Offset, p.PushedLimit.Count) + } return "" } diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index a6c65e1e8c25c..6a794fcb80f9b 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -106,7 +106,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test TopN to Limit in double read. { sql: "select * from t where t.c = 1 and t.e = 1 order by t.d limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))->Limit", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))", }, // Test TopN to Limit in index single read. { @@ -151,7 +151,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test Limit push down in double single read. { sql: "select c, b from t where c = 1 limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit->Projection", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Projection", }, // Test Selection + Limit push down in double single read. { @@ -182,7 +182,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test PK in index double read. { sql: "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))->Limit", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))", }, // Test index filter condition push down. { @@ -574,7 +574,7 @@ func (s *testPlanSuite) TestDAGPlanTopN(c *C) { }, { sql: "select * from t where c = 1 order by c limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))", }, { sql: "select * from t order by a limit 1", diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index a43f3fa03f340..b49992f301197 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -72,6 +72,12 @@ type PhysicalIndexReader struct { OutputColumns []*expression.Column } +// PushedDownLimit is the limit operator pushed down into PhysicalIndexLookUpReader. +type PushedDownLimit struct { + Offset uint64 + Count uint64 +} + // PhysicalIndexLookUpReader is the index look up reader in tidb. It's used in case of double reading. type PhysicalIndexLookUpReader struct { physicalSchemaProducer @@ -82,6 +88,9 @@ type PhysicalIndexLookUpReader struct { TablePlans []PhysicalPlan indexPlan PhysicalPlan tablePlan PhysicalPlan + + // PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader. + PushedLimit *PushedDownLimit } // PhysicalIndexScan represents an index scan plan. diff --git a/planner/core/task.go b/planner/core/task.go index a1dabd6bc4ef3..f19621dc2552a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -264,6 +264,7 @@ func (t *rootTask) plan() PhysicalPlan { func (p *PhysicalLimit) attach2Task(tasks ...task) task { t := tasks[0].copy() + sunk := false if cop, ok := t.(*copTask); ok { // If the table/index scans data by order and applies a double read, the limit cannot be pushed to the table side. if !cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil { @@ -272,9 +273,42 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { cop = attachPlan2Task(pushedDownLimit, cop).(*copTask) } t = finishCopTask(p.ctx, cop) + sunk = p.sinkIntoIndexLookUp(t) } - t = attachPlan2Task(p, t) - return t + if sunk { + return t + } + return attachPlan2Task(p, t) +} + +func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool { + root := t.(*rootTask) + reader, isDoubleRead := root.p.(*PhysicalIndexLookUpReader) + proj, isProj := root.p.(*PhysicalProjection) + if !isDoubleRead && !isProj { + return false + } + if isProj { + reader, isDoubleRead = proj.Children()[0].(*PhysicalIndexLookUpReader) + if !isDoubleRead { + return false + } + } + // We can sink Limit into IndexLookUpReader only if tablePlan contains no Selection. + ts, isTableScan := reader.tablePlan.(*PhysicalTableScan) + if !isTableScan { + return false + } + reader.PushedLimit = &PushedDownLimit{ + Offset: p.Offset, + Count: p.Count, + } + ts.stats = p.stats + reader.stats = p.stats + if isProj { + proj.stats = p.stats + } + return true } // GetCost computes the cost of in memory sort.