From aef1c85d98a4d0a70502468c276bfdc03d233224 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 6 Dec 2021 15:54:58 +0800 Subject: [PATCH 01/30] add optimizer support Signed-off-by: you06 --- executor/builder.go | 6 ++++++ executor/distsql.go | 1 + planner/core/exhaust_physical_plans.go | 6 ++++-- planner/core/physical_plans.go | 1 + planner/core/task.go | 19 ++++++++++++++++--- sessionctx/variable/session.go | 4 ++++ 6 files changed, 32 insertions(+), 5 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 37cb9b63d5f01..72ac4a157420b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3392,6 +3392,11 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn if err != nil { return nil, err } + indexPaging := false + if v.Paging { + indexPaging = true + indexStreaming = false + } tableReq, tableStreaming, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) if err != nil { return nil, err @@ -3413,6 +3418,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn columns: ts.Columns, indexStreaming: indexStreaming, tableStreaming: tableStreaming, + indexPaging: indexPaging, dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, corColInIdxSide: b.corColInDistPlan(v.IndexPlans), corColInTblSide: b.corColInDistPlan(v.TablePlans), diff --git a/executor/distsql.go b/executor/distsql.go index 3edb1dd709168..bb0de74317ef3 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -359,6 +359,7 @@ type IndexLookUpExecutor struct { indexStreaming bool tableStreaming bool + indexPaging bool corColInIdxSide bool corColInTblSide bool diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index fb6a02d34b484..2630d0941a99c 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -845,7 +845,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( maxOneRow = ok && (sf.FuncName.L == ast.EQ) } } - innerTask := p.constructInnerIndexScanTask(ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) + innerTask := p.constructInnerIndexScanTask(prop, ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) { if val.(bool) { failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)) @@ -860,7 +860,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 := p.constructInnerIndexScanTask(ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) + innerTask2 := p.constructInnerIndexScanTask(prop, ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) if innerTask2 != nil { joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)...) } @@ -1011,6 +1011,7 @@ func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader Physi // constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func (p *LogicalJoin) constructInnerIndexScanTask( + prop *property.PhysicalProperty, ds *DataSource, path *util.AccessPath, filterConds []expression.Expression, @@ -1090,6 +1091,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask( cop.originSchema = ds.schema } cop.tablePlan = ts + cop.expectCnt = prop.ExpectedCnt } if cop.tablePlan != nil && ds.tableInfo.IsCommonHandle { cop.commonHandleCols = ds.commonHandleCols diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 17e84b6efcf47..6293bba4b5073 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -270,6 +270,7 @@ type PhysicalIndexLookUpReader struct { TablePlans []PhysicalPlan indexPlan PhysicalPlan tablePlan PhysicalPlan + Paging bool ExtraHandleCol *expression.Column // PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader. diff --git a/planner/core/task.go b/planner/core/task.go index 187140c613aa5..25621ad6c3d2c 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -89,6 +89,8 @@ type copTask struct { // For table partition. partitionInfo PartitionInfo + + expectCnt float64 } func (t *copTask) invalid() bool { @@ -914,7 +916,20 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // (indexRows / batchSize) * batchSize * CPUFactor // Since we don't know the number of copTasks built, ignore these network cost now. indexRows := t.indexPlan.statsInfo().RowCount - newTask.cst += indexRows * sessVars.CPUFactor + tableRows := t.tablePlan.statsInfo().RowCount + selectivity := tableRows / indexRows + idxCst := indexRows * sessVars.CPUFactor + // try paging API + if ctx.GetSessionVars().EnablePaging { + expectIndexRows := t.expectCnt / selectivity + rpcCnt := float64(int(math.Log(expectIndexRows/64) / math.Log(2))) + pagingCst := rpcCnt*sessVars.CPUFactor*40 + expectIndexRows*sessVars.CPUFactor + if pagingCst < idxCst { + idxCst = pagingCst + p.Paging = true + } + } + newTask.cst += idxCst // Add cost of worker goroutines in index lookup. numTblWorkers := float64(sessVars.IndexLookupConcurrency()) newTask.cst += (numTblWorkers + 1) * sessVars.ConcurrencyFactor @@ -930,8 +945,6 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // Also, we need to sort the retrieved rows if index lookup reader is expected to return // ordered results. Note that row count of these two sorts can be different, if there are // operators above table scan. - tableRows := t.tablePlan.statsInfo().RowCount - selectivity := tableRows / indexRows batchSize = math.Min(indexLookupSize*selectivity, tableRows) if t.keepOrder && batchSize > 2 { sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 680e6f2c1367b..6b20a8ec41067 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -435,6 +435,7 @@ type SessionVars struct { Concurrency MemQuota BatchSize + Paging // DMLBatchSize indicates the number of rows batch-committed for a statement. // It will be used when using LOAD DATA or BatchInsert or BatchDelete is on. DMLBatchSize int @@ -1904,6 +1905,9 @@ type BatchSize struct { MaxChunkSize int } +type Paging struct { +} + const ( // SlowLogRowPrefixStr is slow log row prefix. SlowLogRowPrefixStr = "# " From 922304424fbe06af64452b5cdbaeb7359087440b Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 6 Dec 2021 17:03:41 +0800 Subject: [PATCH 02/30] fix compare cost Signed-off-by: you06 --- planner/core/physical_plans.go | 9 +++++++++ planner/core/task.go | 34 ++++++++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 6293bba4b5073..cebf2bfe3cee3 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -322,6 +322,14 @@ func (p *PhysicalIndexLookUpReader) ExtractCorrelatedCols() (corCols []*expressi return corCols } +// ExplainNormalizedInfo implements Plan interface. +func (p *PhysicalIndexLookUpReader) ExplainNormalizedInfo() string { + if p.Paging { + return "paging: true" + } + return "paging: false" +} + // PhysicalIndexMergeReader is the reader using multiple indexes in tidb. type PhysicalIndexMergeReader struct { physicalSchemaProducer @@ -390,6 +398,7 @@ type PhysicalIndexScan struct { isPartition bool Desc bool KeepOrder bool + Paging bool // DoubleRead means if the index executor will read kv two times. // If the query requires the columns that don't belong to index, DoubleRead will be true. DoubleRead bool diff --git a/planner/core/task.go b/planner/core/task.go index 25621ad6c3d2c..9fb1c29ee6293 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -920,13 +920,38 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { selectivity := tableRows / indexRows idxCst := indexRows * sessVars.CPUFactor // try paging API - if ctx.GetSessionVars().EnablePaging { - expectIndexRows := t.expectCnt / selectivity - rpcCnt := float64(int(math.Log(expectIndexRows/64) / math.Log(2))) - pagingCst := rpcCnt*sessVars.CPUFactor*40 + expectIndexRows*sessVars.CPUFactor + if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 { + //expectIndexRows := t.expectCnt / selectivity + rpcCnt := float64(1) + if t.expectCnt > 64 { + rpcCnt = float64(int(1 + math.Log(t.expectCnt/64)/math.Log(2))) + } + scanCst := t.expectCnt * sessVars.CPUFactor + var extractRows func(p PhysicalPlan) float64 + extractRows = func(p PhysicalPlan) float64 { + f := float64(0) + for _, c := range t.indexPlan.Children() { + if len(c.Children()) != 0 { + f += extractRows(c) + } else { + f += c.statsInfo().RowCount + } + } + return f + } + cmpRate := float64(1) + if sourceRows := extractRows(t.indexPlan); sourceRows > scanCst { + cmpRate = indexRows / sourceRows + } + t.indexPlan.Children() + pagingCst := (rpcCnt-1)*sessVars.CPUFactor*40 + t.expectCnt*sessVars.CPUFactor + pagingCst *= cmpRate if pagingCst < idxCst { idxCst = pagingCst p.Paging = true + if p, ok := t.indexPlan.(*PhysicalIndexScan); ok { + p.Paging = true + } } } newTask.cst += idxCst @@ -1155,6 +1180,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) pushedDownLimit.cost = cop.cost() } + cop.expectCnt = float64(p.Count) t = cop.convertToRootTask(p.ctx) sunk = p.sinkIntoIndexLookUp(t) } else if mpp, ok := t.(*mppTask); ok { From 7d75aca6c8950a08e1c71384a8843b0ce5248477 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 6 Dec 2021 17:15:27 +0800 Subject: [PATCH 03/30] set paging Signed-off-by: you06 --- executor/distsql.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/distsql.go b/executor/distsql.go index bb0de74317ef3..5d38730d6c96a 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -559,6 +559,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetDesc(e.desc). SetKeepOrder(e.keepOrder). SetStreaming(e.indexStreaming). + SetPaging(e.indexPaging). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). SetFromSessionVars(e.ctx.GetSessionVars()). From bcccba159558e55e1de9a06be8da3cfea73720c2 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 7 Dec 2021 14:07:34 +0800 Subject: [PATCH 04/30] handle expect cnt more than max paging size Signed-off-by: you06 --- planner/core/task.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index 9fb1c29ee6293..129cfe6889e52 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -923,7 +923,9 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 { //expectIndexRows := t.expectCnt / selectivity rpcCnt := float64(1) - if t.expectCnt > 64 { + if t.expectCnt > 16320 { + rpcCnt = float64(int(9 + (t.expectCnt-16320)/64)) + } else if t.expectCnt > 64 { rpcCnt = float64(int(1 + math.Log(t.expectCnt/64)/math.Log(2))) } scanCst := t.expectCnt * sessVars.CPUFactor @@ -943,7 +945,6 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { if sourceRows := extractRows(t.indexPlan); sourceRows > scanCst { cmpRate = indexRows / sourceRows } - t.indexPlan.Children() pagingCst := (rpcCnt-1)*sessVars.CPUFactor*40 + t.expectCnt*sessVars.CPUFactor pagingCst *= cmpRate if pagingCst < idxCst { From 00c4ef86d62c02191e0fb6e76fa6e4f66c59ea80 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 8 Dec 2021 16:22:07 +0800 Subject: [PATCH 05/30] add metrics Signed-off-by: you06 --- distsql/distsql.go | 1 + distsql/select_result.go | 9 +++-- metrics/distsql.go | 2 +- metrics/session.go | 1 + planner/core/explain.go | 11 ++++-- planner/core/physical_plans.go | 8 ----- planner/core/plan_test.go | 63 ++++++++++++++++++++++++++++++++ planner/core/task.go | 66 ++++++++++++++++++---------------- 8 files changed, 117 insertions(+), 44 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 2f952da2a7d3c..3bc59f58542b2 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -128,6 +128,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie memTracker: kvReq.MemTracker, encodeType: encodetype, storeType: kvReq.StoreType, + paging: kvReq.Paging, }, nil } diff --git a/distsql/select_result.go b/distsql/select_result.go index 3ac7f1db94a97..b2eef6e6f5300 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -152,7 +152,8 @@ type selectResult struct { durationReported bool memTracker *memory.Tracker - stats *selectResultRuntimeStats + stats *selectResultRuntimeStats + paging bool } func (r *selectResult) fetchResp(ctx context.Context) error { @@ -206,7 +207,11 @@ func (r *selectResult) fetchResp(ctx context.Context) error { // final round of fetch // TODO: Add a label to distinguish between success or failure. // https://github.com/pingcap/tidb/issues/11397 - metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType).Observe(r.fetchDuration.Seconds()) + if r.paging { + metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "paging").Observe(r.fetchDuration.Seconds()) + } else { + metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "common").Observe(r.fetchDuration.Seconds()) + } r.durationReported = true } return nil diff --git a/metrics/distsql.go b/metrics/distsql.go index 9bec9d7646827..3a4527da510ae 100644 --- a/metrics/distsql.go +++ b/metrics/distsql.go @@ -27,7 +27,7 @@ var ( Name: "handle_query_duration_seconds", Help: "Bucketed histogram of processing time (s) of handled queries.", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days - }, []string{LblType, LblSQLType}) + }, []string{LblType, LblSQLType, LblCoprType}) DistSQLScanKeysPartialHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ diff --git a/metrics/session.go b/metrics/session.go index 0058104788f21..83df91439d311 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -142,6 +142,7 @@ const ( LblDb = "db" LblResult = "result" LblSQLType = "sql_type" + LblCoprType = "copr_type" LblGeneral = "general" LblInternal = "internal" LbTxnMode = "txn_mode" diff --git a/planner/core/explain.go b/planner/core/explain.go index 26bfa775fc417..d0ae474b79e7f 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -456,17 +456,22 @@ func (p *PhysicalIndexReader) accessObject(sctx sessionctx.Context) string { // ExplainInfo implements Plan interface. func (p *PhysicalIndexLookUpReader) ExplainInfo() string { + var str strings.Builder // The children can be inferred by the relation symbol. if p.PushedLimit != nil { - var str strings.Builder str.WriteString("limit embedded(offset:") str.WriteString(strconv.FormatUint(p.PushedLimit.Offset, 10)) str.WriteString(", count:") str.WriteString(strconv.FormatUint(p.PushedLimit.Count, 10)) str.WriteString(")") - return str.String() } - return "" + if p.Paging { + if p.PushedLimit != nil { + str.WriteString(", ") + } + str.WriteString("paging:true") + } + return str.String() } func (p *PhysicalIndexLookUpReader) accessObject(sctx sessionctx.Context) string { diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index cebf2bfe3cee3..9af71bbd0ab2d 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -322,14 +322,6 @@ func (p *PhysicalIndexLookUpReader) ExtractCorrelatedCols() (corCols []*expressi return corCols } -// ExplainNormalizedInfo implements Plan interface. -func (p *PhysicalIndexLookUpReader) ExplainNormalizedInfo() string { - if p.Paging { - return "paging: true" - } - return "paging: false" -} - // PhysicalIndexMergeReader is the reader using multiple indexes in tidb. type PhysicalIndexMergeReader struct { physicalSchemaProducer diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 4437a354b1757..a1faf06e100e7 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "strings" + "testing" . "github.com/pingcap/check" "github.com/pingcap/tidb/config" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" + kit "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/testkit" @@ -639,3 +641,64 @@ func (s *testPlanNormalize) TestIssue25729(c *C) { tk.MustExec("insert into t1 values(\"a\", \"adwa\");") tk.MustQuery("select * from t1 where concat(a, b) like \"aadwa\" and a = \"a\";").Check(testkit.Rows("a adwa")) } + +func TestCopPaging(t *testing.T) { + t.Parallel() + + store, clean := kit.CreateMockStore(t) + defer clean() + + tk := kit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("set session tidb_enable_paging = 1") + tk.MustExec("create table t(id int, c1 int, c2 int, primary key (id), key i(c1))") + defer tk.MustExec("drop table t") + for i := 0; i < 200; i++ { + tk.MustExec("insert into t values(?, ?, ?)", i, i, i) + } + tk.MustExec("analyze table t") + + // limit 1 in 100 should go paging + for i := 0; i < 1; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 1").Check(kit.Rows( + "Limit 1.00 root offset:0, count:1", + "└─IndexLookUp 1.00 root paging:true", + " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", + " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", + " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) + } + // limit 64 with paging has only 1 seek cost, less than non-paging + for i := 0; i < 10; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 64").Check(kit.Rows( + "Limit 1.00 root offset:0, count:64", + "└─IndexLookUp 1.00 root paging:true", + " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", + " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", + " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) + } + + // limit 65 with paging has 2 seek cost, also less than non-paging (SeekFactor + 65 * CPUFactor < 100 * CPUFactor) + for i := 0; i < 10; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 65").Check(kit.Rows( + "Limit 1.00 root offset:0, count:65", + "└─IndexLookUp 1.00 root paging:true", + " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", + " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", + " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) + } + + // limit 94 with paging has 2 seek cost, but more than non-paging (SeekFactor + 94 * CPUFactor < 100 * CPUFactor) + for i := 0; i < 10; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 94").Check(kit.Rows( + "Limit 1.00 root offset:0, count:94", + "└─IndexLookUp 1.00 root ", + " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", + " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", + " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) + } +} diff --git a/planner/core/task.go b/planner/core/task.go index 129cfe6889e52..91a23b8b452f4 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -46,6 +46,15 @@ var ( _ task = &mppTask{} ) +const ( + minPagingSize float64 = 64 + maxPagingSize float64 = 8192 + pagingSizeGrow float64 = 2 + // pagingGrowingSum is the sum of paging sizes during growing to the max page size + // pagingGrowingSum = (pagingSize ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 + pagingGrowingSum float64 = 16320 +) + // task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. // A task may be CopTask, RootTask, MPPTaskMeta or a ParallelTask. type task interface { @@ -919,40 +928,25 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { tableRows := t.tablePlan.statsInfo().RowCount selectivity := tableRows / indexRows idxCst := indexRows * sessVars.CPUFactor - // try paging API + // try paging API. + // paging API reduces the count of index and table rows, however introduces more seek cost. if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 { - //expectIndexRows := t.expectCnt / selectivity - rpcCnt := float64(1) - if t.expectCnt > 16320 { - rpcCnt = float64(int(9 + (t.expectCnt-16320)/64)) - } else if t.expectCnt > 64 { - rpcCnt = float64(int(1 + math.Log(t.expectCnt/64)/math.Log(2))) - } - scanCst := t.expectCnt * sessVars.CPUFactor - var extractRows func(p PhysicalPlan) float64 - extractRows = func(p PhysicalPlan) float64 { - f := float64(0) - for _, c := range t.indexPlan.Children() { - if len(c.Children()) != 0 { - f += extractRows(c) - } else { - f += c.statsInfo().RowCount - } - } - return f - } - cmpRate := float64(1) - if sourceRows := extractRows(t.indexPlan); sourceRows > scanCst { - cmpRate = indexRows / sourceRows - } - pagingCst := (rpcCnt-1)*sessVars.CPUFactor*40 + t.expectCnt*sessVars.CPUFactor - pagingCst *= cmpRate + seekCnt := float64(1) + if t.expectCnt > pagingGrowingSum { + seekCnt += float64(int(9 + (t.expectCnt-pagingGrowingSum)/maxPagingSize)) + } else if t.expectCnt > minPagingSize { + seekCnt += float64(int(1 + math.Log((pagingSizeGrow-1)*t.expectCnt/minPagingSize)/math.Log(pagingSizeGrow))) + } + indexSelectivity := float64(1) + sourceRows := extractRows(t.indexPlan) + if sourceRows > indexRows { + indexSelectivity = indexRows / sourceRows + } + pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + t.expectCnt*sessVars.CPUFactor + pagingCst *= indexSelectivity if pagingCst < idxCst { idxCst = pagingCst p.Paging = true - if p, ok := t.indexPlan.(*PhysicalIndexScan); ok { - p.Paging = true - } } } newTask.cst += idxCst @@ -990,6 +984,18 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { return newTask } +func extractRows(p PhysicalPlan) float64 { + f := float64(0) + for _, c := range p.Children() { + if len(c.Children()) != 0 { + f += extractRows(c) + } else { + f += c.statsInfo().RowCount + } + } + return f +} + func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask { return t.copy().(*rootTask) } From 7f9509b64d02a999005df21a157a29d7026d62de Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 8 Dec 2021 16:58:06 +0800 Subject: [PATCH 06/30] clean code Signed-off-by: you06 --- planner/core/exhaust_physical_plans.go | 6 ++---- planner/core/physical_plans.go | 1 - planner/core/task.go | 4 ++-- sessionctx/variable/session.go | 4 ---- 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 2630d0941a99c..fb6a02d34b484 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -845,7 +845,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( maxOneRow = ok && (sf.FuncName.L == ast.EQ) } } - innerTask := p.constructInnerIndexScanTask(prop, ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) + innerTask := p.constructInnerIndexScanTask(ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) { if val.(bool) { failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)) @@ -860,7 +860,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 := p.constructInnerIndexScanTask(prop, ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) + innerTask2 := p.constructInnerIndexScanTask(ds, helper.chosenPath, helper.chosenRemained, outerJoinKeys, us, rangeInfo, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) if innerTask2 != nil { joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)...) } @@ -1011,7 +1011,6 @@ func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader Physi // constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func (p *LogicalJoin) constructInnerIndexScanTask( - prop *property.PhysicalProperty, ds *DataSource, path *util.AccessPath, filterConds []expression.Expression, @@ -1091,7 +1090,6 @@ func (p *LogicalJoin) constructInnerIndexScanTask( cop.originSchema = ds.schema } cop.tablePlan = ts - cop.expectCnt = prop.ExpectedCnt } if cop.tablePlan != nil && ds.tableInfo.IsCommonHandle { cop.commonHandleCols = ds.commonHandleCols diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 9af71bbd0ab2d..6293bba4b5073 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -390,7 +390,6 @@ type PhysicalIndexScan struct { isPartition bool Desc bool KeepOrder bool - Paging bool // DoubleRead means if the index executor will read kv two times. // If the query requires the columns that don't belong to index, DoubleRead will be true. DoubleRead bool diff --git a/planner/core/task.go b/planner/core/task.go index 91a23b8b452f4..3c4ed9a0269b4 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -925,8 +925,6 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // (indexRows / batchSize) * batchSize * CPUFactor // Since we don't know the number of copTasks built, ignore these network cost now. indexRows := t.indexPlan.statsInfo().RowCount - tableRows := t.tablePlan.statsInfo().RowCount - selectivity := tableRows / indexRows idxCst := indexRows * sessVars.CPUFactor // try paging API. // paging API reduces the count of index and table rows, however introduces more seek cost. @@ -965,6 +963,8 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // Also, we need to sort the retrieved rows if index lookup reader is expected to return // ordered results. Note that row count of these two sorts can be different, if there are // operators above table scan. + tableRows := t.tablePlan.statsInfo().RowCount + selectivity := tableRows / indexRows batchSize = math.Min(indexLookupSize*selectivity, tableRows) if t.keepOrder && batchSize > 2 { sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 6b20a8ec41067..680e6f2c1367b 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -435,7 +435,6 @@ type SessionVars struct { Concurrency MemQuota BatchSize - Paging // DMLBatchSize indicates the number of rows batch-committed for a statement. // It will be used when using LOAD DATA or BatchInsert or BatchDelete is on. DMLBatchSize int @@ -1905,9 +1904,6 @@ type BatchSize struct { MaxChunkSize int } -type Paging struct { -} - const ( // SlowLogRowPrefixStr is slow log row prefix. SlowLogRowPrefixStr = "# " From 28fe7e3ed6ef4ccba349f7db614b79a990098c23 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 8 Dec 2021 17:14:18 +0800 Subject: [PATCH 07/30] fix streaming metric label Signed-off-by: you06 --- distsql/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distsql/stream.go b/distsql/stream.go index 73d8f96b8fe79..2f10e72b5c0f5 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -82,7 +82,7 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons if !r.durationReported { // TODO: Add a label to distinguish between success or failure. // https://github.com/pingcap/tidb/issues/11397 - metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType).Observe(r.fetchDuration.Seconds()) + metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "streaming").Observe(r.fetchDuration.Seconds()) r.durationReported = true } return true, nil From 2f31fe479c09b4e2c8e04281d19ed5966d2088b1 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 8 Dec 2021 17:48:24 +0800 Subject: [PATCH 08/30] use a threshold to decied whether go paging Signed-off-by: you06 --- planner/core/plan_test.go | 20 ++++++++++---------- planner/core/task.go | 32 +++++++++++++++++--------------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index a1faf06e100e7..0fcf8bcc243af 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -669,10 +669,10 @@ func TestCopPaging(t *testing.T) { " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) } - // limit 64 with paging has only 1 seek cost, less than non-paging + // limit 19 is still under the threshold 0.2, it should go paging for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 64").Check(kit.Rows( - "Limit 1.00 root offset:0, count:64", + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 19").Check(kit.Rows( + "Limit 1.00 root offset:0, count:19", "└─IndexLookUp 1.00 root paging:true", " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", @@ -680,21 +680,21 @@ func TestCopPaging(t *testing.T) { " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) } - // limit 65 with paging has 2 seek cost, also less than non-paging (SeekFactor + 65 * CPUFactor < 100 * CPUFactor) + // limit 21 exceeds the threshold, it should not go paging for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 65").Check(kit.Rows( - "Limit 1.00 root offset:0, count:65", - "└─IndexLookUp 1.00 root paging:true", + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 21").Check(kit.Rows( + "Limit 1.00 root offset:0, count:21", + "└─IndexLookUp 1.00 root ", " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) } - // limit 94 with paging has 2 seek cost, but more than non-paging (SeekFactor + 94 * CPUFactor < 100 * CPUFactor) + // limit 100 certainly not go paging. for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 94").Check(kit.Rows( - "Limit 1.00 root offset:0, count:94", + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 100").Check(kit.Rows( + "Limit 1.00 root offset:0, count:100", "└─IndexLookUp 1.00 root ", " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", diff --git a/planner/core/task.go b/planner/core/task.go index 3c4ed9a0269b4..09d96a4e3a2ed 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -53,6 +53,8 @@ const ( // pagingGrowingSum is the sum of paging sizes during growing to the max page size // pagingGrowingSum = (pagingSize ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 pagingGrowingSum float64 = 16320 + // if the desired rows are below the threshold, use paging + pagingThreshold float64 = 0.2 ) // task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. @@ -926,23 +928,23 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // Since we don't know the number of copTasks built, ignore these network cost now. indexRows := t.indexPlan.statsInfo().RowCount idxCst := indexRows * sessVars.CPUFactor - // try paging API. + // if the expectCnt is below the paging threshold, using paging API, recalculate cost. // paging API reduces the count of index and table rows, however introduces more seek cost. if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 { - seekCnt := float64(1) - if t.expectCnt > pagingGrowingSum { - seekCnt += float64(int(9 + (t.expectCnt-pagingGrowingSum)/maxPagingSize)) - } else if t.expectCnt > minPagingSize { - seekCnt += float64(int(1 + math.Log((pagingSizeGrow-1)*t.expectCnt/minPagingSize)/math.Log(pagingSizeGrow))) - } - indexSelectivity := float64(1) - sourceRows := extractRows(t.indexPlan) - if sourceRows > indexRows { - indexSelectivity = indexRows / sourceRows - } - pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + t.expectCnt*sessVars.CPUFactor - pagingCst *= indexSelectivity - if pagingCst < idxCst { + if sourceRows := extractRows(t.indexPlan); t.expectCnt < sourceRows*pagingThreshold { + seekCnt := float64(1) + if t.expectCnt > pagingGrowingSum { + seekCnt += float64(int(8 + (t.expectCnt-pagingGrowingSum)/maxPagingSize)) + } else if t.expectCnt > minPagingSize { + seekCnt += float64(int(math.Log((pagingSizeGrow-1)*t.expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) + } + indexSelectivity := float64(1) + sourceRows := extractRows(t.indexPlan) + if sourceRows > indexRows { + indexSelectivity = indexRows / sourceRows + } + pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + t.expectCnt*sessVars.CPUFactor + pagingCst *= indexSelectivity idxCst = pagingCst p.Paging = true } From b5ab23b05dbcab0277e44f7586cce9dd4a390283 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 8 Dec 2021 19:29:12 +0800 Subject: [PATCH 09/30] decide go paging only by limit count Signed-off-by: you06 --- planner/core/plan_test.go | 55 ++++++++++++--------------------------- planner/core/task.go | 41 ++++++++++++++--------------- 2 files changed, 37 insertions(+), 59 deletions(-) diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 0fcf8bcc243af..8b8952de12089 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -654,51 +654,30 @@ func TestCopPaging(t *testing.T) { tk.MustExec("set session tidb_enable_paging = 1") tk.MustExec("create table t(id int, c1 int, c2 int, primary key (id), key i(c1))") defer tk.MustExec("drop table t") - for i := 0; i < 200; i++ { + for i := 0; i < 1024; i++ { tk.MustExec("insert into t values(?, ?, ?)", i, i, i) } tk.MustExec("analyze table t") - // limit 1 in 100 should go paging + // limit 1000 should go paging for i := 0; i < 1; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 1").Check(kit.Rows( - "Limit 1.00 root offset:0, count:1", - "└─IndexLookUp 1.00 root paging:true", - " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", - " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", - " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", - " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) - } - // limit 19 is still under the threshold 0.2, it should go paging - for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 19").Check(kit.Rows( - "Limit 1.00 root offset:0, count:19", - "└─IndexLookUp 1.00 root paging:true", - " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", - " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", - " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", - " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) - } - - // limit 21 exceeds the threshold, it should not go paging - for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 21").Check(kit.Rows( - "Limit 1.00 root offset:0, count:21", - "└─IndexLookUp 1.00 root ", - " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", - " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", - " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", - " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 1000").Check(kit.Rows( + "Limit 4.00 root offset:0, count:1000", + "└─IndexLookUp 4.00 root paging:true", + " ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)", + " │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true", + " └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } - // limit 100 certainly not go paging. + // limit 1001 exceeds the threshold, it should not go paging for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 99 and c1 >= 0 and c1 <= 99 and c2 in (2, 4, 6, 8) order by c1 limit 100").Check(kit.Rows( - "Limit 1.00 root offset:0, count:100", - "└─IndexLookUp 1.00 root ", - " ├─Selection(Build) 50.00 cop[tikv] le(test.t.id, 99)", - " │ └─IndexRangeScan 100.00 cop[tikv] table:t, index:i(c1) range:[0,99], keep order:true", - " └─Selection(Probe) 1.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", - " └─TableRowIDScan 50.00 cop[tikv] table:t keep order:false")) + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 1001").Check(kit.Rows( + "Limit 4.00 root offset:0, count:1001", + "└─IndexLookUp 4.00 root ", + " ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)", + " │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true", + " └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } } diff --git a/planner/core/task.go b/planner/core/task.go index 09d96a4e3a2ed..06641f39f2a60 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -54,7 +54,7 @@ const ( // pagingGrowingSum = (pagingSize ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 pagingGrowingSum float64 = 16320 // if the desired rows are below the threshold, use paging - pagingThreshold float64 = 0.2 + pagingThreshold uint64 = 1000 ) // task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. @@ -101,7 +101,7 @@ type copTask struct { // For table partition. partitionInfo PartitionInfo - expectCnt float64 + expectCnt uint64 } func (t *copTask) invalid() bool { @@ -930,24 +930,23 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { idxCst := indexRows * sessVars.CPUFactor // if the expectCnt is below the paging threshold, using paging API, recalculate cost. // paging API reduces the count of index and table rows, however introduces more seek cost. - if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 { - if sourceRows := extractRows(t.indexPlan); t.expectCnt < sourceRows*pagingThreshold { - seekCnt := float64(1) - if t.expectCnt > pagingGrowingSum { - seekCnt += float64(int(8 + (t.expectCnt-pagingGrowingSum)/maxPagingSize)) - } else if t.expectCnt > minPagingSize { - seekCnt += float64(int(math.Log((pagingSizeGrow-1)*t.expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) - } - indexSelectivity := float64(1) - sourceRows := extractRows(t.indexPlan) - if sourceRows > indexRows { - indexSelectivity = indexRows / sourceRows - } - pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + t.expectCnt*sessVars.CPUFactor - pagingCst *= indexSelectivity - idxCst = pagingCst - p.Paging = true - } + if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= pagingThreshold { + seekCnt := float64(1) + expectCnt := float64(t.expectCnt) + if expectCnt > pagingGrowingSum { + seekCnt += float64(int(8 + (expectCnt-pagingGrowingSum)/maxPagingSize)) + } else if expectCnt > minPagingSize { + seekCnt += float64(int(math.Log((pagingSizeGrow-1)*expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) + } + indexSelectivity := float64(1) + sourceRows := extractRows(t.indexPlan) + if sourceRows > indexRows { + indexSelectivity = indexRows / sourceRows + } + pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + expectCnt*sessVars.CPUFactor + pagingCst *= indexSelectivity + idxCst = pagingCst + p.Paging = true } newTask.cst += idxCst // Add cost of worker goroutines in index lookup. @@ -1189,7 +1188,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) pushedDownLimit.cost = cop.cost() } - cop.expectCnt = float64(p.Count) + cop.expectCnt = p.Count t = cop.convertToRootTask(p.ctx) sunk = p.sinkIntoIndexLookUp(t) } else if mpp, ok := t.(*mppTask); ok { From 72a7cfb10b3e9f52f2ce8f25bcdab00b1553614d Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 9 Dec 2021 10:59:35 +0800 Subject: [PATCH 10/30] correct expectCnt Signed-off-by: you06 --- planner/core/task.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/planner/core/task.go b/planner/core/task.go index 06641f39f2a60..2b6e658316d15 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -933,13 +933,14 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= pagingThreshold { seekCnt := float64(1) expectCnt := float64(t.expectCnt) + sourceRows := extractRows(t.indexPlan) + expectCnt = math.Min(expectCnt, sourceRows) if expectCnt > pagingGrowingSum { seekCnt += float64(int(8 + (expectCnt-pagingGrowingSum)/maxPagingSize)) } else if expectCnt > minPagingSize { seekCnt += float64(int(math.Log((pagingSizeGrow-1)*expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) } indexSelectivity := float64(1) - sourceRows := extractRows(t.indexPlan) if sourceRows > indexRows { indexSelectivity = indexRows / sourceRows } From aba1a8c0c671632db1f0853826a17b8345cdb37c Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 9 Dec 2021 11:52:40 +0800 Subject: [PATCH 11/30] paging do not enlarge idx cost Signed-off-by: you06 --- infoschema/metric_table_def.go | 2 +- metrics/grafana/tidb.json | 2 +- planner/core/plan_test.go | 11 +++++++++++ planner/core/task.go | 7 +++++-- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/infoschema/metric_table_def.go b/infoschema/metric_table_def.go index b27217a316c03..ebbc1e2389c96 100644 --- a/infoschema/metric_table_def.go +++ b/infoschema/metric_table_def.go @@ -260,7 +260,7 @@ var MetricTableMap = map[string]MetricTableDef{ Comment: "The quantile durations of distsql execution(second)", }, "tidb_distsql_qps": { - PromQL: "sum(rate(tidb_distsql_handle_query_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION]))", + PromQL: "sum(rate(tidb_distsql_handle_query_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (copr_type)", Labels: []string{"instance", "type"}, Comment: "distsql query handling durations per second", }, diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index 2776e7625570f..dee67797869cf 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -6168,7 +6168,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m]))", + "expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m])) by (copr_type)", "format": "time_series", "intervalFactor": 2, "legendFormat": "", diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 8b8952de12089..2a63a35b6d3aa 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -680,4 +680,15 @@ func TestCopPaging(t *testing.T) { " └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } + + // limit 1000 should go paging + for i := 0; i < 1; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 255 and c1 >= 0 and c1 <= 255 and c2 in (2, 4, 6, 8) order by c1 limit 1000").Check(kit.Rows( + "Limit 0.25 root offset:0, count:1000", + "└─IndexLookUp 0.25 root paging:true", + " ├─Selection(Build) 64.00 cop[tikv] le(test.t.id, 255)", + " │ └─IndexRangeScan 256.00 cop[tikv] table:t, index:i(c1) range:[0,255], keep order:true", + " └─Selection(Probe) 0.25 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 64.00 cop[tikv] table:t keep order:false")) + } } diff --git a/planner/core/task.go b/planner/core/task.go index 2b6e658316d15..c0839d6416864 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -928,8 +928,11 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // Since we don't know the number of copTasks built, ignore these network cost now. indexRows := t.indexPlan.statsInfo().RowCount idxCst := indexRows * sessVars.CPUFactor - // if the expectCnt is below the paging threshold, using paging API, recalculate cost. + // if the expectCnt is below the paging threshold, using paging API, recalculate idxCst. // paging API reduces the count of index and table rows, however introduces more seek cost. + if ctx.GetSessionVars().EnablePaging { + logutil.BgLogger().Info("MYLOG try paging", zap.Uint64("expectCnt", t.expectCnt)) + } if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= pagingThreshold { seekCnt := float64(1) expectCnt := float64(t.expectCnt) @@ -946,7 +949,7 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { } pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + expectCnt*sessVars.CPUFactor pagingCst *= indexSelectivity - idxCst = pagingCst + idxCst = math.Min(idxCst, pagingCst) p.Paging = true } newTask.cst += idxCst From 9e9d34d807d305700bab96a75d4529f685131863 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 9 Dec 2021 16:19:42 +0800 Subject: [PATCH 12/30] fix metric schame test Signed-off-by: you06 --- infoschema/metric_table_def.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infoschema/metric_table_def.go b/infoschema/metric_table_def.go index ebbc1e2389c96..b27217a316c03 100644 --- a/infoschema/metric_table_def.go +++ b/infoschema/metric_table_def.go @@ -260,7 +260,7 @@ var MetricTableMap = map[string]MetricTableDef{ Comment: "The quantile durations of distsql execution(second)", }, "tidb_distsql_qps": { - PromQL: "sum(rate(tidb_distsql_handle_query_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (copr_type)", + PromQL: "sum(rate(tidb_distsql_handle_query_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION]))", Labels: []string{"instance", "type"}, Comment: "distsql query handling durations per second", }, From cee8aaabaaa24c4d4d3082714efadfe60192b809 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 10 Dec 2021 11:43:59 +0800 Subject: [PATCH 13/30] adjust the paging threshold Signed-off-by: you06 --- planner/core/task.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index c0839d6416864..8d756aa5ee066 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -53,8 +53,8 @@ const ( // pagingGrowingSum is the sum of paging sizes during growing to the max page size // pagingGrowingSum = (pagingSize ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 pagingGrowingSum float64 = 16320 - // if the desired rows are below the threshold, use paging - pagingThreshold uint64 = 1000 + // if the desired rows are below the threshold, use paging, threshold is the sum of 4 pages + pagingThreshold uint64 = 960 ) // task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. From 653f16573348cb26acc6cf371daf1cb23917c8e7 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 10 Dec 2021 14:20:48 +0800 Subject: [PATCH 14/30] fix test Signed-off-by: you06 --- planner/core/plan_test.go | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 2a63a35b6d3aa..a40cfcc4fa94e 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -659,10 +659,10 @@ func TestCopPaging(t *testing.T) { } tk.MustExec("analyze table t") - // limit 1000 should go paging - for i := 0; i < 1; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 1000").Check(kit.Rows( - "Limit 4.00 root offset:0, count:1000", + // limit 960 should go paging + for i := 0; i < 10; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 960").Check(kit.Rows( + "Limit 4.00 root offset:0, count:960", "└─IndexLookUp 4.00 root paging:true", " ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)", " │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true", @@ -670,25 +670,14 @@ func TestCopPaging(t *testing.T) { " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } - // limit 1001 exceeds the threshold, it should not go paging + // limit 961 exceeds the threshold, it should not go paging for i := 0; i < 10; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 1001").Check(kit.Rows( - "Limit 4.00 root offset:0, count:1001", + tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows( + "Limit 4.00 root offset:0, count:961", "└─IndexLookUp 4.00 root ", " ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)", " │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true", " └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } - - // limit 1000 should go paging - for i := 0; i < 1; i++ { - tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 255 and c1 >= 0 and c1 <= 255 and c2 in (2, 4, 6, 8) order by c1 limit 1000").Check(kit.Rows( - "Limit 0.25 root offset:0, count:1000", - "└─IndexLookUp 0.25 root paging:true", - " ├─Selection(Build) 64.00 cop[tikv] le(test.t.id, 255)", - " │ └─IndexRangeScan 256.00 cop[tikv] table:t, index:i(c1) range:[0,255], keep order:true", - " └─Selection(Probe) 0.25 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", - " └─TableRowIDScan 64.00 cop[tikv] table:t keep order:false")) - } } From 0b45c0b846f02d1dd996489a4586e601ee8b2e58 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 15 Dec 2021 11:48:57 +0800 Subject: [PATCH 15/30] address comment Signed-off-by: you06 --- planner/core/task.go | 49 +++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index 8d756aa5ee066..918394f0e95c1 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -47,6 +47,7 @@ var ( ) const ( + // The size of a paging distsql grows from minPagingSize to maxPagingSize minPagingSize float64 = 64 maxPagingSize float64 = 8192 pagingSizeGrow float64 = 2 @@ -930,27 +931,9 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { idxCst := indexRows * sessVars.CPUFactor // if the expectCnt is below the paging threshold, using paging API, recalculate idxCst. // paging API reduces the count of index and table rows, however introduces more seek cost. - if ctx.GetSessionVars().EnablePaging { - logutil.BgLogger().Info("MYLOG try paging", zap.Uint64("expectCnt", t.expectCnt)) - } if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= pagingThreshold { - seekCnt := float64(1) - expectCnt := float64(t.expectCnt) - sourceRows := extractRows(t.indexPlan) - expectCnt = math.Min(expectCnt, sourceRows) - if expectCnt > pagingGrowingSum { - seekCnt += float64(int(8 + (expectCnt-pagingGrowingSum)/maxPagingSize)) - } else if expectCnt > minPagingSize { - seekCnt += float64(int(math.Log((pagingSizeGrow-1)*expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) - } - indexSelectivity := float64(1) - if sourceRows > indexRows { - indexSelectivity = indexRows / sourceRows - } - pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + expectCnt*sessVars.CPUFactor - pagingCst *= indexSelectivity - idxCst = math.Min(idxCst, pagingCst) p.Paging = true + idxCst = math.Min(idxCst, calcPagingCost(ctx, t)) } newTask.cst += idxCst // Add cost of worker goroutines in index lookup. @@ -1001,6 +984,34 @@ func extractRows(p PhysicalPlan) float64 { return f } +func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 { + sessVars := ctx.GetSessionVars() + indexRows := t.indexPlan.statsInfo().RowCount + expectCnt := float64(t.expectCnt) + sourceRows := extractRows(t.indexPlan) + // with paging, the scanned rows is always less than or equal to source rows. + expectCnt = math.Min(expectCnt, sourceRows) + seekCnt := float64(1) + if expectCnt > pagingGrowingSum { + // if the expectCnt is larger than pagingGrowingSum, calculate the seekCnt for the excess. + seekCnt += float64(int(8 + (expectCnt-pagingGrowingSum)/maxPagingSize)) + } else if expectCnt > minPagingSize { + // if the expectCnt is less than pagingGrowingSum, + // calculate the seekCnt(number of terms) from the sum of a geometric progression. + // expectCnt = minPagingSize * (pagingSizeGrow ^ seekCnt - 1) / (pagingSizeGrow - 1) + // simplify (pagingSizeGrow ^ seekCnt - 1) to pagingSizeGrow ^ seekCnt, we can infer that + // seekCnt = log((pagingSizeGrow - 1) * expectCnt / minPagingSize) / log(pagingSizeGrow) + seekCnt += float64(int(math.Log((pagingSizeGrow-1)*expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) + } + indexSelectivity := float64(1) + if sourceRows > indexRows { + indexSelectivity = indexRows / sourceRows + } + pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + expectCnt*sessVars.CPUFactor + pagingCst *= indexSelectivity + return pagingCst +} + func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask { return t.copy().(*rootTask) } From 4d096f1f7a08af9f1e7f9e5ac406ec211a1d7f63 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 11:34:42 +0800 Subject: [PATCH 16/30] Update planner/core/task.go Co-authored-by: Lei Zhao --- planner/core/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/task.go b/planner/core/task.go index 918394f0e95c1..7c3305789a72b 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -52,7 +52,7 @@ const ( maxPagingSize float64 = 8192 pagingSizeGrow float64 = 2 // pagingGrowingSum is the sum of paging sizes during growing to the max page size - // pagingGrowingSum = (pagingSize ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 + // pagingGrowingSum = (pagingSizeGrow ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 pagingGrowingSum float64 = 16320 // if the desired rows are below the threshold, use paging, threshold is the sum of 4 pages pagingThreshold uint64 = 960 From 1f0b0e488e04d24aca5a5fe18e1ecc3e4a3d87c2 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 12:13:39 +0800 Subject: [PATCH 17/30] fix legend format Signed-off-by: you06 --- metrics/grafana/tidb.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index c150b503c407c..a84dd0e044dff 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -6171,7 +6171,7 @@ "expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m])) by (copr_type)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "", + "legendFormat": "{{copr_type}}", "metric": "tidb_distsql_query_total", "refId": "A", "step": 4 From 5d23006dad30fd17e334f6015b5130b0deb76551 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 13:21:19 +0800 Subject: [PATCH 18/30] address comment Signed-off-by: you06 --- planner/core/task.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index 7c3305789a72b..672daefb92c85 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -47,13 +47,14 @@ var ( ) const ( + pagingSizeGrow int = 2 + maxPagingSizeShift int = 7 // The size of a paging distsql grows from minPagingSize to maxPagingSize - minPagingSize float64 = 64 - maxPagingSize float64 = 8192 - pagingSizeGrow float64 = 2 + minPagingSize int = 64 + maxPagingSize int = minPagingSize << maxPagingSizeShift // pagingGrowingSum is the sum of paging sizes during growing to the max page size // pagingGrowingSum = (pagingSizeGrow ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 - pagingGrowingSum float64 = 16320 + pagingGrowingSum int = ((2 << maxPagingSizeShift) - 1) * minPagingSize // if the desired rows are below the threshold, use paging, threshold is the sum of 4 pages pagingThreshold uint64 = 960 ) @@ -102,6 +103,8 @@ type copTask struct { // For table partition. partitionInfo PartitionInfo + // expectCnt is the expected row count of upper task, 0 for unlimited. + // It's used for deciding whether using paging distsql. expectCnt uint64 } @@ -987,27 +990,30 @@ func extractRows(p PhysicalPlan) float64 { func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 { sessVars := ctx.GetSessionVars() indexRows := t.indexPlan.statsInfo().RowCount - expectCnt := float64(t.expectCnt) + expectCnt := int(t.expectCnt) sourceRows := extractRows(t.indexPlan) // with paging, the scanned rows is always less than or equal to source rows. - expectCnt = math.Min(expectCnt, sourceRows) + if int(sourceRows) < expectCnt { + expectCnt = int(sourceRows) + } seekCnt := float64(1) if expectCnt > pagingGrowingSum { // if the expectCnt is larger than pagingGrowingSum, calculate the seekCnt for the excess. - seekCnt += float64(int(8 + (expectCnt-pagingGrowingSum)/maxPagingSize)) + seekCnt += float64(8 + (expectCnt-pagingGrowingSum)/maxPagingSize) } else if expectCnt > minPagingSize { // if the expectCnt is less than pagingGrowingSum, // calculate the seekCnt(number of terms) from the sum of a geometric progression. // expectCnt = minPagingSize * (pagingSizeGrow ^ seekCnt - 1) / (pagingSizeGrow - 1) // simplify (pagingSizeGrow ^ seekCnt - 1) to pagingSizeGrow ^ seekCnt, we can infer that // seekCnt = log((pagingSizeGrow - 1) * expectCnt / minPagingSize) / log(pagingSizeGrow) - seekCnt += float64(int(math.Log((pagingSizeGrow-1)*expectCnt/minPagingSize) / math.Log(pagingSizeGrow))) + seekCnt += float64(int(math.Log(float64((pagingSizeGrow-1)*expectCnt)/float64(minPagingSize)) / math.Log(float64(pagingSizeGrow)))) } indexSelectivity := float64(1) if sourceRows > indexRows { indexSelectivity = indexRows / sourceRows } - pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + expectCnt*sessVars.CPUFactor + // only calculate the diff of seek cost here, so (seekCnt - 1) + pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.CPUFactor pagingCst *= indexSelectivity return pagingCst } From 381aa2aed85387212b7a1bfd892477188ac29de2 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 19:59:39 +0800 Subject: [PATCH 19/30] extract paging constants to individual package Signed-off-by: you06 --- planner/core/task.go | 45 +++++++++------------------------ store/copr/coprocessor.go | 25 +++--------------- store/copr/coprocessor_test.go | 3 ++- util/paging/paging.go | 46 ++++++++++++++++++++++++++++++++++ util/paging/paging_test.go | 24 ++++++++++++++++++ 5 files changed, 87 insertions(+), 56 deletions(-) create mode 100644 util/paging/paging.go create mode 100644 util/paging/paging_test.go diff --git a/planner/core/task.go b/planner/core/task.go index 672daefb92c85..16c8e1c3d5724 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -15,6 +15,7 @@ package core import ( + "github.com/pingcap/tidb/sessionctx" "math" "github.com/cznic/mathutil" @@ -29,12 +30,12 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/paging" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" @@ -46,19 +47,6 @@ var ( _ task = &mppTask{} ) -const ( - pagingSizeGrow int = 2 - maxPagingSizeShift int = 7 - // The size of a paging distsql grows from minPagingSize to maxPagingSize - minPagingSize int = 64 - maxPagingSize int = minPagingSize << maxPagingSizeShift - // pagingGrowingSum is the sum of paging sizes during growing to the max page size - // pagingGrowingSum = (pagingSizeGrow ^ n - 1) * minPagingSize = (2 ^ 8 - 1) * 64 = 16320 - pagingGrowingSum int = ((2 << maxPagingSizeShift) - 1) * minPagingSize - // if the desired rows are below the threshold, use paging, threshold is the sum of 4 pages - pagingThreshold uint64 = 960 -) - // task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. // A task may be CopTask, RootTask, MPPTaskMeta or a ParallelTask. type task interface { @@ -934,9 +922,12 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { idxCst := indexRows * sessVars.CPUFactor // if the expectCnt is below the paging threshold, using paging API, recalculate idxCst. // paging API reduces the count of index and table rows, however introduces more seek cost. - if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= pagingThreshold { + if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold { p.Paging = true - idxCst = math.Min(idxCst, calcPagingCost(ctx, t)) + // we want the diff between idxCst and pagingCst here, + // however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed + pagingCstDiff := calcPagingCost(ctx, t) - sessVars.GetSeekFactor(nil) + idxCst = math.Min(idxCst, pagingCstDiff) } newTask.cst += idxCst // Add cost of worker goroutines in index lookup. @@ -990,30 +981,18 @@ func extractRows(p PhysicalPlan) float64 { func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 { sessVars := ctx.GetSessionVars() indexRows := t.indexPlan.statsInfo().RowCount - expectCnt := int(t.expectCnt) + expectCnt := t.expectCnt sourceRows := extractRows(t.indexPlan) // with paging, the scanned rows is always less than or equal to source rows. - if int(sourceRows) < expectCnt { - expectCnt = int(sourceRows) - } - seekCnt := float64(1) - if expectCnt > pagingGrowingSum { - // if the expectCnt is larger than pagingGrowingSum, calculate the seekCnt for the excess. - seekCnt += float64(8 + (expectCnt-pagingGrowingSum)/maxPagingSize) - } else if expectCnt > minPagingSize { - // if the expectCnt is less than pagingGrowingSum, - // calculate the seekCnt(number of terms) from the sum of a geometric progression. - // expectCnt = minPagingSize * (pagingSizeGrow ^ seekCnt - 1) / (pagingSizeGrow - 1) - // simplify (pagingSizeGrow ^ seekCnt - 1) to pagingSizeGrow ^ seekCnt, we can infer that - // seekCnt = log((pagingSizeGrow - 1) * expectCnt / minPagingSize) / log(pagingSizeGrow) - seekCnt += float64(int(math.Log(float64((pagingSizeGrow-1)*expectCnt)/float64(minPagingSize)) / math.Log(float64(pagingSizeGrow)))) + if uint64(sourceRows) < expectCnt { + expectCnt = uint64(sourceRows) } + seekCnt := paging.CalculateSeekCnt(expectCnt) indexSelectivity := float64(1) if sourceRows > indexRows { indexSelectivity = indexRows / sourceRows } - // only calculate the diff of seek cost here, so (seekCnt - 1) - pagingCst := (seekCnt-1)*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.CPUFactor + pagingCst := seekCnt*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.CPUFactor pagingCst *= indexSelectivity return pagingCst } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 25ed965ea15e3..02db6678e7ab0 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -17,6 +17,7 @@ package copr import ( "context" "fmt" + "github.com/pingcap/tidb/util/paging" "io" "strconv" "strings" @@ -61,18 +62,6 @@ const ( copNextMaxBackoff = 20000 ) -// A paging request may be separated into multi requests if there are more data than a page. -// The paging size grows from min to max, it's not well tuned yet. -// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch, -// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again. -// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data. -// TODO: may make the paging parameters configurable. -const ( - minPagingSize uint64 = 64 - maxPagingSize = minPagingSize * 128 - pagingSizeGrow uint64 = 2 -) - // CopClient is coprocessor client. type CopClient struct { kv.RequestTypeSupportedChecker @@ -212,7 +201,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv // the size will grow every round. pagingSize := uint64(0) if req.Paging { - pagingSize = minPagingSize + pagingSize = paging.MinPagingSize } tasks = append(tasks, &copTask{ region: loc.Location.Region, @@ -928,7 +917,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti if task.ranges.Len() == 0 { return nil, nil } - task.pagingSize = growPagingSize(task.pagingSize) + task.pagingSize = paging.GrowPagingSize(task.pagingSize) return []*copTask{task}, nil } @@ -1332,11 +1321,3 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel { return kvrpcpb.IsolationLevel_SI } } - -func growPagingSize(size uint64) uint64 { - size *= pagingSizeGrow - if size > maxPagingSize { - return maxPagingSize - } - return size -} diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 88ad5568f68eb..b2d104e201b30 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -16,6 +16,7 @@ package copr import ( "context" + "github.com/pingcap/tidb/util/paging" "testing" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -318,7 +319,7 @@ func TestBuildPagingTasks(t *testing.T) { require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], "a", "c") require.True(t, tasks[0].paging) - require.Equal(t, tasks[0].pagingSize, minPagingSize) + require.Equal(t, tasks[0].pagingSize, paging.MinPagingSize) } func toCopRange(r kv.KeyRange) *coprocessor.KeyRange { diff --git a/util/paging/paging.go b/util/paging/paging.go new file mode 100644 index 0000000000000..496dbc0689058 --- /dev/null +++ b/util/paging/paging.go @@ -0,0 +1,46 @@ +package paging + +import "math" + +// A paging request may be separated into multi requests if there are more data than a page. +// The paging size grows from min to max, it's not well tuned yet. +// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch, +// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again. +// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data. +// TODO: may make the paging parameters configurable. +const ( + MinPagingSize uint64 = 64 + maxPagingSizeShift = 7 + pagingSizeGrow = 2 + MaxPagingSize = MinPagingSize << maxPagingSizeShift + pagingGrowingSum = ((2 << maxPagingSizeShift) - 1) * MinPagingSize + Threshold uint64 = 960 +) + +// GrowPagingSize grows the paging size and ensures it does not exceed MaxPagingSize +func GrowPagingSize(size uint64) uint64 { + size <<= 1 + if size > MaxPagingSize { + return MaxPagingSize + } + return size +} + +func CalculateSeekCnt(expectCnt uint64) float64 { + if expectCnt == 0 { + return 0 + } + if expectCnt > pagingGrowingSum { + // if the expectCnt is larger than pagingGrowingSum, calculate the seekCnt for the excess. + return float64(8 + (expectCnt-pagingGrowingSum+MaxPagingSize-1)/MaxPagingSize) + } + if expectCnt > MinPagingSize { + // if the expectCnt is less than pagingGrowingSum, + // calculate the seekCnt(number of terms) from the sum of a geometric progression. + // expectCnt = minPagingSize * (pagingSizeGrow ^ seekCnt - 1) / (pagingSizeGrow - 1) + // simplify (pagingSizeGrow ^ seekCnt - 1) to pagingSizeGrow ^ seekCnt, we can infer that + // seekCnt = log((pagingSizeGrow - 1) * expectCnt / minPagingSize) / log(pagingSizeGrow) + return 1 + float64(int(math.Log(float64((pagingSizeGrow-1)*expectCnt)/float64(MinPagingSize))/math.Log(float64(pagingSizeGrow)))) + } + return 1 +} diff --git a/util/paging/paging_test.go b/util/paging/paging_test.go new file mode 100644 index 0000000000000..21cb3970db931 --- /dev/null +++ b/util/paging/paging_test.go @@ -0,0 +1,24 @@ +package paging + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGrowPagingSize(t *testing.T) { + t.Parallel() + + require.Equal(t, GrowPagingSize(MaxPagingSize), MaxPagingSize) + require.Equal(t, GrowPagingSize(MaxPagingSize/pagingSizeGrow+1), MaxPagingSize) +} + +func TestCalculateSeekCnt(t *testing.T) { + t.Parallel() + require.InDelta(t, CalculateSeekCnt(0), 0, 0.1) + require.InDelta(t, CalculateSeekCnt(1), 1, 0.1) + require.InDelta(t, CalculateSeekCnt(MinPagingSize), 1, 0.1) + require.InDelta(t, CalculateSeekCnt(pagingGrowingSum), maxPagingSizeShift+1, 0.1) + require.InDelta(t, CalculateSeekCnt(pagingGrowingSum+1), maxPagingSizeShift+2, 0.1) + require.InDelta(t, CalculateSeekCnt(pagingGrowingSum+MaxPagingSize), maxPagingSizeShift+2, 0.1) +} From 367aeadc87cd1f2bb37c6e4169e035ae6fe1291e Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 20:00:35 +0800 Subject: [PATCH 20/30] add test Signed-off-by: you06 --- util/paging/paging_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/util/paging/paging_test.go b/util/paging/paging_test.go index 21cb3970db931..29da1f764f43b 100644 --- a/util/paging/paging_test.go +++ b/util/paging/paging_test.go @@ -9,12 +9,14 @@ import ( func TestGrowPagingSize(t *testing.T) { t.Parallel() + require.Equal(t, GrowPagingSize(MinPagingSize), MinPagingSize*pagingSizeGrow) require.Equal(t, GrowPagingSize(MaxPagingSize), MaxPagingSize) require.Equal(t, GrowPagingSize(MaxPagingSize/pagingSizeGrow+1), MaxPagingSize) } func TestCalculateSeekCnt(t *testing.T) { t.Parallel() + require.InDelta(t, CalculateSeekCnt(0), 0, 0.1) require.InDelta(t, CalculateSeekCnt(1), 1, 0.1) require.InDelta(t, CalculateSeekCnt(MinPagingSize), 1, 0.1) From 90a628befd8ebb934463067e17ea5c8328b5d153 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 20:06:28 +0800 Subject: [PATCH 21/30] add licsnee Signed-off-by: you06 --- util/paging/paging.go | 14 ++++++++++++++ util/paging/paging_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/util/paging/paging.go b/util/paging/paging.go index 496dbc0689058..41bf62e8bc9e1 100644 --- a/util/paging/paging.go +++ b/util/paging/paging.go @@ -1,3 +1,17 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package paging import "math" diff --git a/util/paging/paging_test.go b/util/paging/paging_test.go index 29da1f764f43b..3243e9dec9fe4 100644 --- a/util/paging/paging_test.go +++ b/util/paging/paging_test.go @@ -1,3 +1,17 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package paging import ( From c992665e212ff46f5c5f92e4345350e1ffe6550d Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 16 Dec 2021 20:15:18 +0800 Subject: [PATCH 22/30] add comment Signed-off-by: you06 --- util/paging/paging.go | 1 + 1 file changed, 1 insertion(+) diff --git a/util/paging/paging.go b/util/paging/paging.go index 41bf62e8bc9e1..5f2618ea341db 100644 --- a/util/paging/paging.go +++ b/util/paging/paging.go @@ -40,6 +40,7 @@ func GrowPagingSize(size uint64) uint64 { return size } +// CalculateSeekCnt calculates the seek count from expect count func CalculateSeekCnt(expectCnt uint64) float64 { if expectCnt == 0 { return 0 From e2838686a6e033724af6d5148c5eca714ae517ad Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 17 Dec 2021 11:46:49 +0800 Subject: [PATCH 23/30] sort imports Signed-off-by: you06 --- planner/core/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/task.go b/planner/core/task.go index 16c8e1c3d5724..b1241472360e7 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -15,7 +15,6 @@ package core import ( - "github.com/pingcap/tidb/sessionctx" "math" "github.com/cznic/mathutil" @@ -30,6 +29,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" From ab251f0c849922f1f75ec1b462e1eeded09739b3 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 17 Dec 2021 11:52:36 +0800 Subject: [PATCH 24/30] sort imports Signed-off-by: you06 --- store/copr/coprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 02db6678e7ab0..04e29a8507656 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -17,7 +17,6 @@ package copr import ( "context" "fmt" - "github.com/pingcap/tidb/util/paging" "io" "strconv" "strings" @@ -43,6 +42,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/paging" "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/metrics" From fb4b003c8e66952b399b3552329d4b5df610e6ed Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 17 Dec 2021 17:19:49 +0800 Subject: [PATCH 25/30] fix lint Signed-off-by: you06 --- store/copr/coprocessor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index b2d104e201b30..b628b4eaab831 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -16,12 +16,12 @@ package copr import ( "context" - "github.com/pingcap/tidb/util/paging" "testing" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" + "github.com/pingcap/tidb/util/paging" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" From 4a8e5725d5e3e320f0dc8488368dcd157160de53 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 20 Dec 2021 11:59:37 +0800 Subject: [PATCH 26/30] add bridge to satisfy test Signed-off-by: you06 --- util/paging/main_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 util/paging/main_test.go diff --git a/util/paging/main_test.go b/util/paging/main_test.go new file mode 100644 index 0000000000000..94766559896e1 --- /dev/null +++ b/util/paging/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package paging + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + goleak.VerifyTestMain(m) +} From 1f2167eca9ea18d8658169a4e2ab8f12afd93f07 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 20 Dec 2021 15:33:26 +0800 Subject: [PATCH 27/30] address comment Signed-off-by: you06 --- planner/core/task.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index b1241472360e7..833ec08a4b787 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -924,10 +924,8 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { // paging API reduces the count of index and table rows, however introduces more seek cost. if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold { p.Paging = true - // we want the diff between idxCst and pagingCst here, - // however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed - pagingCstDiff := calcPagingCost(ctx, t) - sessVars.GetSeekFactor(nil) - idxCst = math.Min(idxCst, pagingCstDiff) + pagingCst := calcPagingCost(ctx, t) + idxCst = math.Min(idxCst, pagingCst) } newTask.cst += idxCst // Add cost of worker goroutines in index lookup. @@ -978,6 +976,7 @@ func extractRows(p PhysicalPlan) float64 { return f } +// calcPagingCost calculates the cost for paging processing which may increase the seekCnt and reduce scanned rows. func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 { sessVars := ctx.GetSessionVars() indexRows := t.indexPlan.statsInfo().RowCount @@ -994,7 +993,10 @@ func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 { } pagingCst := seekCnt*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.CPUFactor pagingCst *= indexSelectivity - return pagingCst + + // we want the diff between idxCst and pagingCst here, + // however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed + return pagingCst - sessVars.GetSeekFactor(nil) } func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask { From 8c6794c17e279190238ce90b6853ff9446fe343e Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 21 Dec 2021 16:52:26 +0800 Subject: [PATCH 28/30] remote parallel test Signed-off-by: you06 --- planner/core/plan_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index a40cfcc4fa94e..821740a45ebe8 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -643,8 +643,6 @@ func (s *testPlanNormalize) TestIssue25729(c *C) { } func TestCopPaging(t *testing.T) { - t.Parallel() - store, clean := kit.CreateMockStore(t) defer clean() From a4d48efa058643c28cc578057ca06ea158dfac07 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 23 Dec 2021 17:16:23 +0800 Subject: [PATCH 29/30] use prop to pass expectCnt, add tests Signed-off-by: you06 --- planner/core/find_best_task.go | 1 + planner/core/plan_test.go | 24 ++++++++++++++++++++++++ planner/core/task.go | 3 ++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 166d3adc298b3..86ee7c30db627 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1216,6 +1216,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid indexPlan: is, tblColHists: ds.TblColHists, tblCols: ds.TblCols, + expectCnt: uint64(prop.ExpectedCnt), } cop.partitionInfo = PartitionInfo{ PruningConds: ds.allConds, diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 821740a45ebe8..288444c9f2c12 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -668,6 +668,18 @@ func TestCopPaging(t *testing.T) { " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } + // selection between limit and indexlookup, limit 960 should also go paging + for i := 0; i < 10; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where mod(id, 2) > 0 and id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 960").Check(kit.Rows( + "Limit 3.20 root offset:0, count:960", + "└─Selection 2.56 root gt(mod(test.t.id, 2), 0)", + " └─IndexLookUp 3.20 root paging:true", + " ├─Selection(Build) 819.20 cop[tikv] le(test.t.id, 1024)", + " │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true", + " └─Selection(Probe) 3.20 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false")) + } + // limit 961 exceeds the threshold, it should not go paging for i := 0; i < 10; i++ { tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows( @@ -678,4 +690,16 @@ func TestCopPaging(t *testing.T) { " └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", " └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false")) } + + // selection between limit and indexlookup, limit 961 should not go paging too + for i := 0; i < 10; i++ { + tk.MustQuery("explain format='brief' select * from t force index(i) where mod(id, 2) > 0 and id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows( + "Limit 3.20 root offset:0, count:961", + "└─Selection 2.56 root gt(mod(test.t.id, 2), 0)", + " └─IndexLookUp 3.20 root ", + " ├─Selection(Build) 819.20 cop[tikv] le(test.t.id, 1024)", + " │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true", + " └─Selection(Probe) 3.20 cop[tikv] in(test.t.c2, 2, 4, 6, 8)", + " └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false")) + } } diff --git a/planner/core/task.go b/planner/core/task.go index 833ec08a4b787..1e44a18d9322e 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -925,6 +925,8 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold { p.Paging = true pagingCst := calcPagingCost(ctx, t) + // prevent enlarging the cost because we take paging as a better plan, + // if the cost is enlarged, it'll be easier to go another plan. idxCst = math.Min(idxCst, pagingCst) } newTask.cst += idxCst @@ -1190,7 +1192,6 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) pushedDownLimit.cost = cop.cost() } - cop.expectCnt = p.Count t = cop.convertToRootTask(p.ctx) sunk = p.sinkIntoIndexLookUp(t) } else if mpp, ok := t.(*mppTask); ok { From 905738b8f97445adeeb852342f189681fcefe6b8 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 24 Dec 2021 17:44:06 +0800 Subject: [PATCH 30/30] remove parallel in test T^T Signed-off-by: you06 --- util/paging/main_test.go | 2 +- util/paging/paging_test.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/util/paging/main_test.go b/util/paging/main_test.go index 94766559896e1..af568af279474 100644 --- a/util/paging/main_test.go +++ b/util/paging/main_test.go @@ -22,6 +22,6 @@ import ( ) func TestMain(m *testing.M) { - testbridge.WorkaroundGoCheckFlags() + testbridge.SetupForCommonTest() goleak.VerifyTestMain(m) } diff --git a/util/paging/paging_test.go b/util/paging/paging_test.go index 3243e9dec9fe4..1890b4d754d54 100644 --- a/util/paging/paging_test.go +++ b/util/paging/paging_test.go @@ -21,16 +21,12 @@ import ( ) func TestGrowPagingSize(t *testing.T) { - t.Parallel() - require.Equal(t, GrowPagingSize(MinPagingSize), MinPagingSize*pagingSizeGrow) require.Equal(t, GrowPagingSize(MaxPagingSize), MaxPagingSize) require.Equal(t, GrowPagingSize(MaxPagingSize/pagingSizeGrow+1), MaxPagingSize) } func TestCalculateSeekCnt(t *testing.T) { - t.Parallel() - require.InDelta(t, CalculateSeekCnt(0), 0, 0.1) require.InDelta(t, CalculateSeekCnt(1), 1, 0.1) require.InDelta(t, CalculateSeekCnt(MinPagingSize), 1, 0.1)