Skip to content

Commit

Permalink
executor: record index usage for the clustered primary keys (#55602)
Browse files Browse the repository at this point in the history
close #55601
  • Loading branch information
YangKeao committed Sep 5, 2024
1 parent b04ea3a commit edf1001
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 14 deletions.
8 changes: 6 additions & 2 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,15 @@ func (e *BatchPointGetExec) Close() error {
if e.RuntimeStats() != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
if e.indexUsageReporter != nil && e.idxInfo != nil {
if e.indexUsageReporter != nil {
kvReqTotal := e.stats.GetCmdRPCCount(tikvrpc.CmdBatchGet)
// We cannot distinguish how many rows are coming from each partition. Here, we calculate all index usages
// percentage according to the row counts for the whole table.
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.tblInfo.ID, e.idxInfo.ID, e.ID(), kvReqTotal)
if e.idxInfo != nil {
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.tblInfo.ID, e.idxInfo.ID, e.ID(), kvReqTotal)
} else {
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, e.tblInfo.ID, e.ID(), kvReqTotal)
}
}
e.inited = 0
e.index = 0
Expand Down
13 changes: 9 additions & 4 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3468,6 +3468,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
e := &TableReaderExecutor{
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
tableReaderExecutorContext: newTableReaderExecutorContext(b.ctx),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
Expand Down Expand Up @@ -4168,13 +4169,13 @@ type tableStatsPreloader interface {
LoadTableStats(sessionctx.Context)
}

func (b *executorBuilder) buildIndexUsageReporter(plan tableStatsPreloader) (indexUsageReporter *exec.IndexUsageReporter) {
sc := b.ctx.GetSessionVars().StmtCtx
if b.ctx.GetSessionVars().StmtCtx.IndexUsageCollector != nil &&
func buildIndexUsageReporter(ctx sessionctx.Context, plan tableStatsPreloader) (indexUsageReporter *exec.IndexUsageReporter) {
sc := ctx.GetSessionVars().StmtCtx
if ctx.GetSessionVars().StmtCtx.IndexUsageCollector != nil &&
sc.RuntimeStatsColl != nil {
// Preload the table stats. If the statement is a point-get or execute, the planner may not have loaded the
// stats.
plan.LoadTableStats(b.ctx)
plan.LoadTableStats(ctx)

statsMap := sc.GetUsedStatsInfo(false)
indexUsageReporter = exec.NewIndexUsageReporter(
Expand All @@ -4185,6 +4186,10 @@ func (b *executorBuilder) buildIndexUsageReporter(plan tableStatsPreloader) (ind
return indexUsageReporter
}

func (b *executorBuilder) buildIndexUsageReporter(plan tableStatsPreloader) (indexUsageReporter *exec.IndexUsageReporter) {
return buildIndexUsageReporter(b.ctx, plan)
}

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) exec.Executor {
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,12 +928,12 @@ func (e *IndexMergeReaderExecutor) Close() error {
}
if e.indexUsageReporter != nil {
for _, p := range e.partialPlans {
is, ok := p[0].(*plannercore.PhysicalIndexScan)
if !ok {
continue
switch p := p[0].(type) {
case *plannercore.PhysicalTableScan:
e.indexUsageReporter.ReportCopIndexUsageForHandle(e.table, p.ID())
case *plannercore.PhysicalIndexScan:
e.indexUsageReporter.ReportCopIndexUsageForTable(e.table, p.Index.ID, p.ID())
}

e.indexUsageReporter.ReportCopIndexUsageForTable(e.table, is.Index.ID, is.ID())
}
}
if e.finished == nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/internal/exec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/domain",
"//pkg/expression",
"//pkg/meta/model",
"//pkg/parser",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
Expand All @@ -36,7 +37,7 @@ go_test(
timeout = "short",
srcs = ["indexusage_test.go"],
flaky = True,
shard_count = 5,
shard_count = 6,
deps = [
":exec",
"//pkg/domain",
Expand Down
43 changes: 43 additions & 0 deletions pkg/executor/internal/exec/indexusage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exec

import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage"
Expand All @@ -40,6 +41,17 @@ func NewIndexUsageReporter(reporter *indexusage.StmtIndexUsageCollector,
}
}

// ReportCopIndexUsageForHandle wraps around `ReportCopIndexUsageForTable` to get the `indexID` automatically
// from the `table.Table` if the table has a clustered index or integer primary key.
func (e *IndexUsageReporter) ReportCopIndexUsageForHandle(tbl table.Table, planID int) {
idxID, ok := getClusterIndexID(tbl.Meta())
if !ok {
return
}

e.ReportCopIndexUsageForTable(tbl, idxID, planID)
}

// ReportCopIndexUsageForTable wraps around `ReportCopIndexUsage` to get `tableID` and `physicalTableID` from the
// `table.Table`. If it's expected to calculate the percentage according to the size of partition, the `tbl` argument
// should be a `table.PhysicalTable`, or the percentage will be calculated using the size of whole table.
Expand Down Expand Up @@ -75,6 +87,17 @@ func (e *IndexUsageReporter) ReportCopIndexUsage(tableID int64, physicalTableID
e.reporter.Update(tableID, indexID, sample)
}

// ReportPointGetIndexUsageForHandle wraps around `ReportPointGetIndexUsage` to get the `indexID` automatically
// from the `table.Table` if the table has a clustered index or integer primary key.
func (e *IndexUsageReporter) ReportPointGetIndexUsageForHandle(tblInfo *model.TableInfo, physicalTableID int64, planID int, kvRequestTotal int64) {
idxID, ok := getClusterIndexID(tblInfo)
if !ok {
return
}

e.ReportPointGetIndexUsage(tblInfo.ID, physicalTableID, idxID, planID, kvRequestTotal)
}

// ReportPointGetIndexUsage reports the index usage of a point get or batch point get
func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTableID int64, indexID int64, planID int, kvRequestTotal int64) {
tableRowCount, ok := e.getTableRowCount(physicalTableID)
Expand Down Expand Up @@ -104,3 +127,23 @@ func (e *IndexUsageReporter) getTableRowCount(tableID int64) (int64, bool) {
}
return stats.RealtimeCount, true
}

// getClusterIndexID returns the indexID of the clustered index. If the table doesn't have a clustered index, it returns
// (0, false).
func getClusterIndexID(tblInfo *model.TableInfo) (int64, bool) {
var idxID int64
if tblInfo.PKIsHandle {
idxID = 0
} else if tblInfo.IsCommonHandle {
for _, idx := range tblInfo.Indices {
if idx.Primary {
idxID = idx.ID
}
}
} else {
// just ignore, this table is read through rowid.
return 0, false
}

return idxID, true
}
101 changes: 101 additions & 0 deletions pkg/executor/internal/exec/indexusage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,104 @@ func TestDisableIndexUsageReporter(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
}

func TestIndexUsageReporterWithClusterIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t0 (id int primary key, a int)")
tk.MustExec("create table t1 (id char(255) primary key, a int)")
tk.MustExec("create table t2 (id char(255) primary key nonclustered, a int)")
tk.MustExec("create table t3 (id int primary key, a int, unique key idx_a(a))")

type testTableInfo struct {
tableID int64
pkID int64
extraIdxID int64
}
testTableInfos := []testTableInfo{}
for i := 0; i < 4; i++ {
table, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr(fmt.Sprintf("t%d", i)))
require.NoError(t, err)
tableID := table.Meta().ID
pkID := int64(0)
extraIdxID := int64(0)
for _, idx := range table.Indices() {
if idx.Meta().Primary {
pkID = idx.Meta().ID
} else {
extraIdxID = idx.Meta().ID
}
}
testTableInfos = append(testTableInfos, testTableInfo{tableID, pkID, extraIdxID})
}

for i := 0; i < 4; i++ {
for val := 0; val < 100; val++ {
tk.MustExec(fmt.Sprintf("insert into t%d values (?, ?)", i), val, val)
}
tk.MustExec(fmt.Sprintf("analyze table t%d", i))
}
tk.RefreshSession()
tk.MustExec("use test")

cases := []testCase{
// TableReader on PKAsHandle
{
"select id from t0 where id >= 30",
"TableReader",
[]indexStatsExpect{{testTableInfos[0].tableID, testTableInfos[0].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 70, 100)}}},
},
// TableReader on CommonHandle
{
"select id from t1 where id >= \"30\"",
"TableReader",
// It'll scan 76 rows according to the string order
[]indexStatsExpect{{testTableInfos[1].tableID, testTableInfos[1].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 76, 100)}}},
},
// IndexRangeScan on NonClustered PK
{
"select id from t2 where id >= \"30\"",
"IndexRangeScan",
// It'll scan 76 rows according to the string order
[]indexStatsExpect{{testTableInfos[2].tableID, testTableInfos[2].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 76, 100)}}},
},
// IndexMerge on PK and a normal Unique Key
{
"select /*+ USE_INDEX_MERGE(t3) */ * from t3 where id >= 30 or id < 5 or a >= 50",
"IndexMerge",
// It'll scan 76 rows according to the string order
[]indexStatsExpect{
{testTableInfos[3].tableID, testTableInfos[3].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 70, 100)}},
{testTableInfos[3].tableID, testTableInfos[3].pkID, []indexusage.Sample{indexusage.NewSample(0, 1, 5, 100)}},
{testTableInfos[3].tableID, testTableInfos[3].extraIdxID, []indexusage.Sample{indexusage.NewSample(1, 1, 50, 100)}},
},
},
// PointGet on PKAsHandle
{
"select * from t0 where id = 1",
"Point_Get",
[]indexStatsExpect{{testTableInfos[0].tableID, testTableInfos[0].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 1, 100)}}},
},
// PointGet on CommonHandle
{
"select * from t1 where id = \"1\"",
"Point_Get",
[]indexStatsExpect{{testTableInfos[1].tableID, testTableInfos[1].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 1, 100)}}},
},
// BatchPointGet on PKAsHandle
{
"select * from t0 where id in (1,3,5,9)",
"Batch_Point_Get",
[]indexStatsExpect{{testTableInfos[0].tableID, testTableInfos[0].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 4, 100)}}},
},
// BatchPointGet on CommonHandle
{
"select * from t1 where id in (\"1\",\"3\",\"5\",\"9\")",
"Batch_Point_Get",
[]indexStatsExpect{{testTableInfos[1].tableID, testTableInfos[1].pkID, []indexusage.Sample{indexusage.NewSample(1, 1, 4, 100)}}},
},
}

runIndexUsageTestCases(t, dom, tk, append(cases, wrapTestCaseWithPrepare(cases)...))
}
13 changes: 11 additions & 2 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan) {
e.partitionDefIdx = p.PartitionIdx
e.columns = p.Columns
e.buildVirtualColumnInfo()

// It's necessary to at least reset the `runtimeStats` of the `BaseExecutor`.
// As the `StmtCtx` may have changed, a new index usage reporter should also be created.
e.BaseExecutor = exec.NewBaseExecutor(e.Ctx(), p.Schema(), p.ID())
e.indexUsageReporter = buildIndexUsageReporter(e.Ctx(), p)
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand Down Expand Up @@ -252,11 +257,15 @@ func (e *PointGetExecutor) Close() error {
if e.RuntimeStats() != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
if e.indexUsageReporter != nil && e.idxInfo != nil {
if e.indexUsageReporter != nil {
tableID := e.tblInfo.ID
physicalTableID := GetPhysID(e.tblInfo, e.partitionDefIdx)
kvReqTotal := e.stats.SnapshotRuntimeStats.GetCmdRPCCount(tikvrpc.CmdGet)
e.indexUsageReporter.ReportPointGetIndexUsage(tableID, physicalTableID, e.idxInfo.ID, e.ID(), kvReqTotal)
if e.idxInfo != nil {
e.indexUsageReporter.ReportPointGetIndexUsage(tableID, physicalTableID, e.idxInfo.ID, e.ID(), kvReqTotal)
} else {
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, physicalTableID, e.ID(), kvReqTotal)
}
}
e.done = false
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorC
type TableReaderExecutor struct {
tableReaderExecutorContext
exec.BaseExecutorV2
indexUsageReporter *exec.IndexUsageReporter

table table.Table

Expand Down Expand Up @@ -341,6 +342,10 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error

// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
if e.indexUsageReporter != nil {
e.indexUsageReporter.ReportCopIndexUsageForHandle(e.table, e.plans[0].ID())
}

var err error
if e.resultHandler != nil {
err = e.resultHandler.Close()
Expand Down
6 changes: 6 additions & 0 deletions pkg/planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ type PhysicalTableReader struct {
TableScanAndPartitionInfos []tableScanAndPartitionInfo `plan-cache-clone:"must-nil"`
}

// LoadTableStats loads the stats of the table read by this plan.
func (p *PhysicalTableReader) LoadTableStats(ctx sessionctx.Context) {
ts := p.TablePlans[0].(*PhysicalTableScan)
loadTableStats(ctx, ts.Table, ts.physicalTableID)
}

// PhysPlanPartInfo indicates partition helper info in physical plan.
type PhysPlanPartInfo struct {
PruningConds []expression.Expression
Expand Down

0 comments on commit edf1001

Please sign in to comment.