Skip to content

Commit

Permalink
statistics: extract the common function to rebuild the queue (#56251)
Browse files Browse the repository at this point in the history
ref #55906
  • Loading branch information
Rustin170506 authored Sep 26, 2024
1 parent f399e91 commit f75100c
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 125 deletions.
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/timeutil",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
160 changes: 158 additions & 2 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,162 @@

package priorityqueue

import "container/heap"
import (
"container/heap"
"context"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
"go.uber.org/zap"
)

// PushJobFunc is a function that pushes an AnalysisJob to a queue.
type PushJobFunc func(job AnalysisJob) error

// FetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions.
func FetchAllTablesAndBuildAnalysisJobs(
sctx sessionctx.Context,
parameters map[string]string,
autoAnalysisTimeWindow AutoAnalysisTimeWindow,
statsHandle statstypes.StatsHandle,
jobFunc PushJobFunc,
) error {
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
// Get current timestamp from the session context.
currentTs, err := getStartTs(sctx)
if err != nil {
return err
}

jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
calculator := NewPriorityCalculator()

dbs := is.AllSchemaNames()
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}

// Ignore the memory and system database.
if util.IsMemOrSysDB(db.L) {
continue
}

tbls, err := is.SchemaTableInfos(context.Background(), db)
if err != nil {
return err
}

// We need to check every partition of every table to see if it needs to be analyzed.
for _, tblInfo := range tbls {
// If table locked, skip analyze all partitions of the table.
if _, ok := lockedTables[tblInfo.ID]; ok {
continue
}

if tblInfo.IsView() {
continue
}

pi := tblInfo.GetPartitionInfo()
if pi == nil {
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
db.O,
tblInfo,
statsHandle.GetTableStatsForAutoAnalyze(tblInfo),
)
err := setWeightAndPushJob(jobFunc, job, calculator)
if err != nil {
return err
}
continue
}

// Only analyze the partition that has not been locked.
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
if _, ok := lockedTables[def.ID]; !ok {
partitionDefs = append(partitionDefs, def)
}
}
partitionStats := GetPartitionStats(statsHandle, tblInfo, partitionDefs)
// If the prune mode is static, we need to analyze every partition as a separate table.
if pruneMode == variable.Static {
for pIDAndName, stats := range partitionStats {
job := jobFactory.CreateStaticPartitionAnalysisJob(
db.O,
tblInfo,
pIDAndName.ID,
pIDAndName.Name,
stats,
)
err := setWeightAndPushJob(jobFunc, job, calculator)
if err != nil {
return err
}
}
} else {
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
db.O,
tblInfo,
statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID),
partitionStats,
)
err := setWeightAndPushJob(jobFunc, job, calculator)
if err != nil {
return err
}
}
}
}

return nil
}

func setWeightAndPushJob(pushFunc PushJobFunc, job AnalysisJob, calculator *PriorityCalculator) error {
if job == nil {
return nil
}
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
// Push the job onto the queue.
return pushFunc(job)
}

func getStartTs(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}

// AnalysisPriorityQueue is a priority queue for TableAnalysisJobs.
type AnalysisPriorityQueue struct {
Expand All @@ -31,8 +186,9 @@ func NewAnalysisPriorityQueue() *AnalysisPriorityQueue {
}

// Push adds a job to the priority queue with the given weight.
func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) {
func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) error {
heap.Push(apq.inner, job)
return nil
}

// Pop removes the highest priority job from the queue.
Expand Down
3 changes: 0 additions & 3 deletions pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/priorityqueue",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
Expand Down
121 changes: 1 addition & 120 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@
package refresher

import (
"context"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -157,7 +152,6 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
r.statsHandle.SPool(),
func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
// Get the available time period for auto analyze and check if the current time is in the period.
start, end, err := exec.ParseAutoAnalysisWindow(
parameters[variable.TiDBAutoAnalyzeStartTime],
Expand All @@ -176,93 +170,7 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}
calculator := priorityqueue.NewPriorityCalculator()
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
// Get current timestamp from the session context.
currentTs, err := getStartTs(sctx)
if err != nil {
return err
}

jobFactory := priorityqueue.NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)

dbs := is.AllSchemaNames()
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}
// Ignore the memory and system database.
if util.IsMemOrSysDB(db.L) {
continue
}

tbls, err := is.SchemaTableInfos(context.Background(), db)
if err != nil {
return err
}
// We need to check every partition of every table to see if it needs to be analyzed.
for _, tblInfo := range tbls {
// If table locked, skip analyze all partitions of the table.
if _, ok := lockedTables[tblInfo.ID]; ok {
continue
}

if tblInfo.IsView() {
continue
}
pi := tblInfo.GetPartitionInfo()
if pi == nil {
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
db.O,
tblInfo,
r.statsHandle.GetTableStatsForAutoAnalyze(tblInfo),
)
r.pushJob(job, calculator)
continue
}

// Only analyze the partition that has not been locked.
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
if _, ok := lockedTables[def.ID]; !ok {
partitionDefs = append(partitionDefs, def)
}
}
partitionStats := priorityqueue.GetPartitionStats(r.statsHandle, tblInfo, partitionDefs)
// If the prune mode is static, we need to analyze every partition as a separate table.
if pruneMode == variable.Static {
for pIDAndName, stats := range partitionStats {
job := jobFactory.CreateStaticPartitionAnalysisJob(
db.O,
tblInfo,
pIDAndName.ID,
pIDAndName.Name,
stats,
)
r.pushJob(job, calculator)
}
} else {
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
db.O,
tblInfo,
r.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID),
partitionStats,
)
r.pushJob(job, calculator)
}
}
}

return nil
return priorityqueue.FetchAllTablesAndBuildAnalysisJobs(sctx, parameters, r.autoAnalysisTimeWindow, r.statsHandle, r.Jobs.Push)
},
statsutil.FlagWrapTxn,
); err != nil {
Expand All @@ -272,25 +180,6 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
return nil
}

func (r *Refresher) pushJob(job priorityqueue.AnalysisJob, calculator *priorityqueue.PriorityCalculator) {
if job == nil {
return
}
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
// Push the job onto the queue.
r.Jobs.Push(job)
}

// WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished.
// Only used in the test.
func (r *Refresher) WaitAutoAnalyzeFinishedForTest() {
Expand All @@ -307,11 +196,3 @@ func (r *Refresher) GetRunningJobs() map[int64]struct{} {
func (r *Refresher) Close() {
r.worker.Stop()
}

func getStartTs(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}

0 comments on commit f75100c

Please sign in to comment.