Skip to content

Commit

Permalink
statistics: add priority calculator (#51346)
Browse files Browse the repository at this point in the history
ref #50132
  • Loading branch information
Rustin170506 authored Feb 29, 2024
1 parent 2558b0c commit 42c8d2d
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 17 deletions.
3 changes: 2 additions & 1 deletion pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ go_test(
name = "priorityqueue_test",
timeout = "short",
srcs = [
"calculator_test.go",
"interval_test.go",
"job_test.go",
"main_test.go",
"queue_test.go",
],
embed = [":priorityqueue"],
flaky = True,
shard_count = 15,
shard_count = 17,
deps = [
"//pkg/parser/model",
"//pkg/session",
Expand Down
60 changes: 47 additions & 13 deletions pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,57 @@

package priorityqueue

// WeightCalculator is an interface for calculating weights of analysis jobs.
type WeightCalculator interface {
CalculateWeight(job *TableAnalysisJob) float64
}
import "math"

const (
// EventNone represents no special event.
eventNone = 0.0
// EventNewIndex represents a special event for newly added indexes.
eventNewIndex = 2.0
)

// TODO: make these configurable.
const (
changeRatioWeight = 0.6
sizeWeight = 0.1
analysisInterval = 0.3
)

// PriorityCalculator implements the WeightCalculator interface.
type PriorityCalculator struct {
threshold float64
}
type PriorityCalculator struct{}

// NewPriorityCalculator creates a new PriorityCalculator with the given threshold.
func NewPriorityCalculator(threshold float64) *PriorityCalculator {
return &PriorityCalculator{threshold: threshold}
// NewPriorityCalculator creates a new PriorityCalculator.
//
// For more information, please visit:
// https://github.com/pingcap/tidb/blob/master/docs/design/2023-11-29-priority-queue-for-auto-analyze.md
func NewPriorityCalculator() *PriorityCalculator {
return &PriorityCalculator{}
}

// CalculateWeight calculates the weight based on the given rules.
func (*PriorityCalculator) CalculateWeight(_ *TableAnalysisJob) float64 {
// TODO: implement the weight calculation
return 1
// - Table Change Ratio (Change Ratio): Accounts for 60%
// - Table Size (Size): Accounts for 10%
// - Analysis Interval (Analysis Interval): Accounts for 30%
// priority_score calculates the priority score based on the following formula:
//
// priority_score = (0.6 * math.Log10(1 + ChangeRatio) +
// 0.1 * (1 - math.Log10(1 + TableSize)) +
// 0.3 * math.Log10(1 + math.Sqrt(AnalysisInterval)) +
// special_event[event])
func (pc *PriorityCalculator) CalculateWeight(job *TableAnalysisJob) float64 {
// We multiply the priority_score by 100 to increase its magnitude. This ensures that
// when we apply the log10 function, the resulting value is more meaningful and reasonable.
changeRatio := 100 * job.ChangePercentage
return changeRatioWeight*math.Log10(1+changeRatio) +
sizeWeight*(1-math.Log10(1+job.TableSize)) +
analysisInterval*math.Log10(1+math.Sqrt(job.LastAnalysisDuration.Seconds())) +
pc.getSpecialEvent(job)
}

func (*PriorityCalculator) getSpecialEvent(job *TableAnalysisJob) float64 {
if job.HasNewlyAddedIndex() {
return eventNewIndex
}

return eventNone
}
144 changes: 144 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package priorityqueue

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

type testData struct {
ID int
ChangePercentage float64
TableSize float64
LastAnalysisDuration time.Duration
}

func TestCalculateWeight(t *testing.T) {
// Note: all group are sorted by weight in ascending order.
pc := NewPriorityCalculator()
// Only focus on change percentage. Bigger change percentage, higher weight.
changePercentageGroup := []testData{
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 1,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 10,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
}
testWeightCalculation(t, pc, changePercentageGroup)
// Only focus on table size. Bigger table size, lower weight.
tableSizeGroup := []testData{
{
ChangePercentage: 0.6,
TableSize: 100000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 0.6,
TableSize: 10000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
}
testWeightCalculation(t, pc, tableSizeGroup)
// Only focus on last analysis duration. Longer duration, higher weight.
lastAnalysisDurationGroup := []testData{
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour * 12,
},
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour * 24,
},
}
testWeightCalculation(t, pc, lastAnalysisDurationGroup)
// The system should not assign a higher weight to a recently analyzed table, even if it has undergone significant changes.
justBeingAnalyzedGroup := []testData{
{
ChangePercentage: 0.5,
TableSize: 1000,
LastAnalysisDuration: 2 * time.Hour,
},
{
ChangePercentage: 1,
TableSize: 1000,
LastAnalysisDuration: 10 * time.Minute,
},
}
testWeightCalculation(t, pc, justBeingAnalyzedGroup)
}

// testWeightCalculation is a helper function to test the weight calculation.
// It will check if the weight is increasing for each test data group.
func testWeightCalculation(t *testing.T, pc *PriorityCalculator, group []testData) {
prevWeight := -1.0
for _, tc := range group {
job := &TableAnalysisJob{
ChangePercentage: tc.ChangePercentage,
TableSize: tc.TableSize,
LastAnalysisDuration: tc.LastAnalysisDuration,
}
weight := pc.CalculateWeight(job)
require.Greater(t, weight, 0.0)
require.Greater(t, weight, prevWeight)
prevWeight = weight
}
}

func TestGetSpecialEvent(t *testing.T) {
pc := NewPriorityCalculator()

jobWithIndex := &TableAnalysisJob{
PartitionIndexes: map[string][]string{
"index1": {"p1", "p2"},
},
}
require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex))

jobWithIndex = &TableAnalysisJob{
Indexes: []string{"index1"},
}
require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex))

jobWithoutIndex := &TableAnalysisJob{
PartitionIndexes: map[string][]string{},
Indexes: []string{},
}
require.Equal(t, eventNone, pc.getSpecialEvent(jobWithoutIndex))
}
5 changes: 5 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ type TableAnalysisJob struct {
Weight float64
}

// HasNewlyAddedIndex checks whether the table has newly added index.
func (j *TableAnalysisJob) HasNewlyAddedIndex() bool {
return len(j.PartitionIndexes) > 0 || len(j.Indexes) > 0
}

// IsValidToAnalyze checks whether the table is valid to analyze.
// It checks the last failed analysis duration and the average analysis duration.
// If the last failed analysis duration is less than 2 times the average analysis duration,
Expand Down
11 changes: 9 additions & 2 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
calculator := priorityqueue.NewPriorityCalculator(autoAnalyzeRatio)
calculator := priorityqueue.NewPriorityCalculator()
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
Expand Down Expand Up @@ -158,7 +158,14 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
}
// Calculate the weight of the job.
job.Weight = calculator.CalculateWeight(job)
if job.Weight == 0 {
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
if job.Weight <= 0 {
statslogutil.StatsLogger().Info(
"Table is not ready to analyze",
zap.String("reason", "weight is not positive"),
zap.Stringer("job", job),
)
return
}
// Push the job onto the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package refresher

import (
"math"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, r.jobs.Len())
job1 := r.jobs.Pop()
require.Equal(t, float64(1), job1.Weight)
require.Equal(t, 1.2, math.Round(job1.Weight*10)/10)
require.Equal(t, float64(1), job1.ChangePercentage)
require.Equal(t, float64(6*2), job1.TableSize)
require.GreaterOrEqual(t, job1.LastAnalysisDuration, time.Duration(0))
Expand Down

0 comments on commit 42c8d2d

Please sign in to comment.