Skip to content

Commit

Permalink
feat: recreate the job
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <[email protected]>
  • Loading branch information
Rustin170506 committed Sep 6, 2024
1 parent d7588c4 commit 40c202a
Show file tree
Hide file tree
Showing 2 changed files with 351 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
Expand All @@ -25,6 +27,7 @@ go_library(
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
)
Expand Down
351 changes: 348 additions & 3 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,28 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/tikv/client-go/v2/oracle"
)

const (
// unanalyzedTableDefaultChangePercentage is the default change percentage of unanalyzed table.
unanalyzedTableDefaultChangePercentage = 1
// unanalyzedTableDefaultLastUpdateDuration is the default last update duration of unanalyzed table.
unanalyzedTableDefaultLastUpdateDuration = -30 * time.Minute
)

// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs.
type AnalysisPriorityQueueV2 struct {
inner *heap.Heap[int64, AnalysisJob]
inner *heap.Heap[int64, AnalysisJob]
statsHandle statstypes.StatsHandle

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -91,8 +104,340 @@ func (pq *AnalysisPriorityQueueV2) fetchDMLUpdate() {
}

func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
// TODO: Implement the logic to handle table stats
// This might include updating the priority queue based on the new stats
if !stats.IsEligibleForAnalysis() {
return
}

changePercent := CalculateChangePercentage(stats, 0.2)
if changePercent == 0 {
return
}

ctx, err := pq.statsHandle.SPool().Get()
if err != nil {
return
}
defer pq.statsHandle.SPool().Put(ctx)
sctx := ctx.(sessionctx.Context)
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID)
if !ok {
return
}
partitionedTable := tableInfo.Meta().GetPartitionInfo()
var job AnalysisJob
if partitionedTable == nil {
job = CreateTableAnalysisJob(sctx, tableInfo.Meta().Name.O, tableInfo.Meta(), stats, 0.2, 0)
} else {
partitionDefs := partitionedTable.Definitions
partitionStats := getPartitionStats(pq.statsHandle, tableInfo.Meta(), partitionDefs)
job = createTableAnalysisJobForPartitions(
sctx,
tableInfo.Meta().Name.O,
tableInfo.Meta(),
stats,
partitionStats,
0.2,
0,
)
}
calculator := NewPriorityCalculator()
job.SetWeight(calculator.CalculateWeight(job))

pq.inner.Add(job)
}

// CreateTableAnalysisJob creates a TableAnalysisJob for the physical table.
func CreateTableAnalysisJob(
sctx sessionctx.Context,
tableSchema string,
tblInfo *model.TableInfo,
tblStats *statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) AnalysisJob {
if !tblStats.IsEligibleForAnalysis() {
return nil
}

tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer)

changePercentage := CalculateChangePercentage(tblStats, autoAnalyzeRatio)
tableSize := calculateTableSize(tblInfo, tblStats)
lastAnalysisDuration := GetTableLastAnalyzeDuration(tblStats, currentTs)
indexes := CheckIndexesNeedAnalyze(tblInfo, tblStats)

// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if changePercentage == 0 && len(indexes) == 0 {
return nil
}

job := NewNonPartitionedTableAnalysisJob(
tableSchema,
tblInfo.Name.O,
tblInfo.ID,
indexes,
tableStatsVer,
changePercentage,
tableSize,
lastAnalysisDuration,
)

return job
}

func calculateTableSize(
tblInfo *model.TableInfo,
tblStats *statistics.Table,
) float64 {
tblCnt := float64(tblStats.RealtimeCount)
// TODO: Ignore unanalyzable columns.
colCnt := float64(len(tblInfo.Columns))

return tblCnt * colCnt
}

// GetTableLastAnalyzeDuration gets the duration since the last analysis of the table.
func GetTableLastAnalyzeDuration(
tblStats *statistics.Table,
currentTs uint64,
) time.Duration {
lastTime := findLastAnalyzeTime(tblStats, currentTs)
currentTime := oracle.GetTimeFromTS(currentTs)

// Calculate the duration since last analyze.
return currentTime.Sub(lastTime)
}

// CheckIndexesNeedAnalyze checks if the indexes of the table need to be analyzed.
func CheckIndexesNeedAnalyze(
tblInfo *model.TableInfo,
tblStats *statistics.Table,
) []string {
// If table is not analyzed, we need to analyze whole table.
// So we don't need to check indexes.
if !tblStats.IsAnalyzed() {
return nil
}

indexes := make([]string, 0, len(tblInfo.Indices))
// Check if missing index stats.
for _, idx := range tblInfo.Indices {
if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) && idx.State == model.StatePublic {
indexes = append(indexes, idx.Name.O)
}
}

return indexes
}

// findLastAnalyzeTime finds the last analyze time of the table.
// It uses `LastUpdateVersion` to find the last analyze time.
// The `LastUpdateVersion` is the version of the transaction that updates the statistics.
// It always not null(default 0), so we can use it to find the last analyze time.
func findLastAnalyzeTime(
tblStats *statistics.Table,
currentTs uint64,
) time.Time {
// Table is not analyzed, compose a fake version.
if !tblStats.IsAnalyzed() {
phy := oracle.GetTimeFromTS(currentTs)
return phy.Add(unanalyzedTableDefaultLastUpdateDuration)
}
return oracle.GetTimeFromTS(tblStats.LastAnalyzeVersion)
}

// PartitionIDAndName is a struct that contains the ID and name of a partition.
// Exported for testing purposes. Do not use it in other packages.
type PartitionIDAndName struct {
Name string
ID int64
}

func createTableAnalysisJobForPartitions(
sctx sessionctx.Context,
tableSchema string,
tblInfo *model.TableInfo,
tblStats *statistics.Table,
partitionStats map[PartitionIDAndName]*statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) AnalysisJob {
if !tblStats.IsEligibleForAnalysis() {
return nil
}

// TODO: figure out how to check the table stats version correctly for partitioned tables.
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer)

averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := CalculateIndicatorsForPartitions(
tblInfo,
partitionStats,
autoAnalyzeRatio,
currentTs,
)
partitionIndexes := CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo,
partitionStats,
)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if len(partitionNames) == 0 && len(partitionIndexes) == 0 {
return nil
}

job := NewDynamicPartitionedTableAnalysisJob(
tableSchema,
tblInfo.Name.O,
tblInfo.ID,
partitionNames,
partitionIndexes,
tableStatsVer,
averageChangePercentage,
avgSize,
minLastAnalyzeDuration,
)

return job
}

// CalculateChangePercentage calculates the change percentage of the table
// based on the change count and the analysis count.
// TODO: DO NOT COPY THIS FUNCTION
func CalculateChangePercentage(
tblStats *statistics.Table,
autoAnalyzeRatio float64,
) float64 {
if !tblStats.IsAnalyzed() {
return 1
}

// Auto analyze based on the change percentage is disabled.
// However, this check should not affect the analysis of indexes,
// as index analysis is still needed for query performance.
if autoAnalyzeRatio == 0 {
return 0
}

tblCnt := float64(tblStats.RealtimeCount)
if histCnt := tblStats.GetAnalyzeRowCount(); histCnt > 0 {
tblCnt = histCnt
}
res := float64(tblStats.ModifyCount) / tblCnt
if res > autoAnalyzeRatio {
return res
}

return 0
}

// CalculateIndicatorsForPartitions calculates the average change percentage,
// average size and average last analyze duration for the partitions that meet the threshold.
// Change percentage is the ratio of the number of modified rows to the total number of rows.
// Size is the product of the number of rows and the number of columns.
// Last analyze duration is the duration since the last analyze.
func CalculateIndicatorsForPartitions(
tblInfo *model.TableInfo,
partitionStats map[PartitionIDAndName]*statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) (
avgChange float64,
avgSize float64,
avgLastAnalyzeDuration time.Duration,
partitionNames []string,
) {
totalChangePercent := 0.0
totalSize := 0.0
count := 0.0
partitionNames = make([]string, 0, len(partitionStats))
cols := float64(len(tblInfo.Columns))
totalLastAnalyzeDuration := time.Duration(0)

for pIDAndName, tblStats := range partitionStats {
changePercent := CalculateChangePercentage(tblStats, autoAnalyzeRatio)
// Skip partition analysis if it doesn't meet the threshold, stats are not yet loaded,
// or the auto analyze ratio is set to 0 by the user.
if changePercent == 0 {
continue
}

totalChangePercent += changePercent
// size = count * cols
totalSize += float64(tblStats.RealtimeCount) * cols
lastAnalyzeDuration := GetTableLastAnalyzeDuration(tblStats, currentTs)
totalLastAnalyzeDuration += lastAnalyzeDuration
partitionNames = append(partitionNames, pIDAndName.Name)
count++
}
if len(partitionNames) == 0 {
return 0, 0, 0, partitionNames
}

avgChange = totalChangePercent / count
avgSize = totalSize / count
avgLastAnalyzeDuration = totalLastAnalyzeDuration / time.Duration(count)

return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames
}

// CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed.
// It returns a map from index name to the names of the partitions that need to be analyzed.
// NOTE: This is only for newly added indexes.
func CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo *model.TableInfo,
partitionStats map[PartitionIDAndName]*statistics.Table,
) map[string][]string {
partitionIndexes := make(map[string][]string, len(tblInfo.Indices))

for _, idx := range tblInfo.Indices {
// No need to analyze the index if it's not public.
if idx.State != model.StatePublic {
continue
}

// Find all the partitions that need to analyze this index.
names := make([]string, 0, len(partitionStats))
for pIDAndName, tblStats := range partitionStats {
if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) {
names = append(names, pIDAndName.Name)
}
}

if len(names) > 0 {
partitionIndexes[idx.Name.O] = names
}
}

return partitionIndexes
}

func getPartitionStats(
statsHandle statstypes.StatsHandle,
tblInfo *model.TableInfo,
defs []model.PartitionDefinition,
) map[PartitionIDAndName]*statistics.Table {
partitionStats := make(map[PartitionIDAndName]*statistics.Table, len(defs))

for _, def := range defs {
stats := statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, def.ID)
// Ignore the partition if it's not ready to analyze.
if !stats.IsEligibleForAnalysis() {
continue
}
d := PartitionIDAndName{
ID: def.ID,
Name: def.Name.O,
}
partitionStats[d] = stats
}

return partitionStats
}

func (pq *AnalysisPriorityQueueV2) Close() {
Expand Down

0 comments on commit 40c202a

Please sign in to comment.