Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: extract the common function to rebuild the queue #56251

Merged
merged 5 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/timeutil",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
160 changes: 158 additions & 2 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,162 @@

package priorityqueue

import "container/heap"
import (
"container/heap"
"context"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
"go.uber.org/zap"
)

// PushJobFunc is a function that pushes an AnalysisJob to a queue.
type PushJobFunc func(job AnalysisJob) error

// FetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions.
func FetchAllTablesAndBuildAnalysisJobs(
sctx sessionctx.Context,
parameters map[string]string,
autoAnalysisTimeWindow AutoAnalysisTimeWindow,
statsHandle statstypes.StatsHandle,
jobFunc PushJobFunc,
) error {
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
// Get current timestamp from the session context.
currentTs, err := getStartTs(sctx)
if err != nil {
return err
}

jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
calculator := NewPriorityCalculator()

dbs := is.AllSchemaNames()
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}

// Ignore the memory and system database.
if util.IsMemOrSysDB(db.L) {
continue
}

tbls, err := is.SchemaTableInfos(context.Background(), db)
if err != nil {
return err
}

// We need to check every partition of every table to see if it needs to be analyzed.
for _, tblInfo := range tbls {
// If table locked, skip analyze all partitions of the table.
if _, ok := lockedTables[tblInfo.ID]; ok {
continue
}

if tblInfo.IsView() {
continue
}

pi := tblInfo.GetPartitionInfo()
if pi == nil {
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
db.O,
tblInfo,
statsHandle.GetTableStatsForAutoAnalyze(tblInfo),
)
err := setWeightAndPushJob(jobFunc, job, calculator)
if err != nil {
return err
}
continue
}

// Only analyze the partition that has not been locked.
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
if _, ok := lockedTables[def.ID]; !ok {
partitionDefs = append(partitionDefs, def)
}
}
partitionStats := GetPartitionStats(statsHandle, tblInfo, partitionDefs)
// If the prune mode is static, we need to analyze every partition as a separate table.
if pruneMode == variable.Static {
for pIDAndName, stats := range partitionStats {
job := jobFactory.CreateStaticPartitionAnalysisJob(
db.O,
tblInfo,
pIDAndName.ID,
pIDAndName.Name,
stats,
)
err := setWeightAndPushJob(jobFunc, job, calculator)
if err != nil {
return err
}
}
} else {
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
db.O,
tblInfo,
statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID),
partitionStats,
)
err := setWeightAndPushJob(jobFunc, job, calculator)
if err != nil {
return err
}
}
}
}

return nil
}

func setWeightAndPushJob(pushFunc PushJobFunc, job AnalysisJob, calculator *PriorityCalculator) error {
if job == nil {
return nil
}
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
// Push the job onto the queue.
return pushFunc(job)
}

func getStartTs(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}

// AnalysisPriorityQueue is a priority queue for TableAnalysisJobs.
type AnalysisPriorityQueue struct {
Expand All @@ -31,8 +186,9 @@ func NewAnalysisPriorityQueue() *AnalysisPriorityQueue {
}

// Push adds a job to the priority queue with the given weight.
func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) {
func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) error {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
heap.Push(apq.inner, job)
return nil
}

// Pop removes the highest priority job from the queue.
Expand Down
3 changes: 0 additions & 3 deletions pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/priorityqueue",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
Expand Down
121 changes: 1 addition & 120 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@
package refresher

import (
"context"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -157,7 +152,6 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
r.statsHandle.SPool(),
func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
// Get the available time period for auto analyze and check if the current time is in the period.
start, end, err := exec.ParseAutoAnalysisWindow(
parameters[variable.TiDBAutoAnalyzeStartTime],
Expand All @@ -176,93 +170,7 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}
calculator := priorityqueue.NewPriorityCalculator()
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
// Get current timestamp from the session context.
currentTs, err := getStartTs(sctx)
if err != nil {
return err
}

jobFactory := priorityqueue.NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)

dbs := is.AllSchemaNames()
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}
// Ignore the memory and system database.
if util.IsMemOrSysDB(db.L) {
continue
}

tbls, err := is.SchemaTableInfos(context.Background(), db)
if err != nil {
return err
}
// We need to check every partition of every table to see if it needs to be analyzed.
for _, tblInfo := range tbls {
// If table locked, skip analyze all partitions of the table.
if _, ok := lockedTables[tblInfo.ID]; ok {
continue
}

if tblInfo.IsView() {
continue
}
pi := tblInfo.GetPartitionInfo()
if pi == nil {
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
db.O,
tblInfo,
r.statsHandle.GetTableStatsForAutoAnalyze(tblInfo),
)
r.pushJob(job, calculator)
continue
}

// Only analyze the partition that has not been locked.
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
if _, ok := lockedTables[def.ID]; !ok {
partitionDefs = append(partitionDefs, def)
}
}
partitionStats := priorityqueue.GetPartitionStats(r.statsHandle, tblInfo, partitionDefs)
// If the prune mode is static, we need to analyze every partition as a separate table.
if pruneMode == variable.Static {
for pIDAndName, stats := range partitionStats {
job := jobFactory.CreateStaticPartitionAnalysisJob(
db.O,
tblInfo,
pIDAndName.ID,
pIDAndName.Name,
stats,
)
r.pushJob(job, calculator)
}
} else {
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
db.O,
tblInfo,
r.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID),
partitionStats,
)
r.pushJob(job, calculator)
}
}
}

return nil
return priorityqueue.FetchAllTablesAndBuildAnalysisJobs(sctx, parameters, r.autoAnalysisTimeWindow, r.statsHandle, r.Jobs.Push)
},
statsutil.FlagWrapTxn,
); err != nil {
Expand All @@ -272,25 +180,6 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
return nil
}

func (r *Refresher) pushJob(job priorityqueue.AnalysisJob, calculator *priorityqueue.PriorityCalculator) {
if job == nil {
return
}
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
// Push the job onto the queue.
r.Jobs.Push(job)
}

// WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished.
// Only used in the test.
func (r *Refresher) WaitAutoAnalyzeFinishedForTest() {
Expand All @@ -307,11 +196,3 @@ func (r *Refresher) GetRunningJobs() map[int64]struct{} {
func (r *Refresher) Close() {
r.worker.Stop()
}

func getStartTs(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}