Skip to content

Commit

Permalink
statistics: add AnalysisPriorityQueueV2 (#56335)
Browse files Browse the repository at this point in the history
ref #55906
  • Loading branch information
Rustin170506 committed Sep 27, 2024
1 parent 2419267 commit f7eba7e
Show file tree
Hide file tree
Showing 6 changed files with 442 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go_test(
],
embed = [":heap"],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
Expand Down
8 changes: 8 additions & 0 deletions pkg/statistics/handle/autoanalyze/internal/heap/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// 1. Use the `errors` package from PingCAP.
// 2. Use generics to define the `heapData` struct.
// 3. Add a peak API.
// 4. Add an IsEmpty API.

package heap

Expand Down Expand Up @@ -291,6 +292,13 @@ func (h *Heap[K, V]) IsClosed() bool {
return h.closed
}

// IsEmpty returns true if the heap is empty.
func (h *Heap[K, V]) IsEmpty() bool {
h.lock.RLock()
defer h.lock.RUnlock()
return len(h.data.queue) == 0
}

// NewHeap returns a Heap which can be used to queue up items to process.
func NewHeap[K comparable, V any](keyFn KeyFunc[K, V], lessFn LessFunc[V]) *Heap[K, V] {
h := &Heap[K, V]{
Expand Down
12 changes: 12 additions & 0 deletions pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// 1. Use "github.com/stretchr/testify/require" to do assertions.
// 2. Test max heap instead of min heap.
// 3. Add a test for the peak API.
// 4. Add a test for the IsEmpty API.

package heap

Expand Down Expand Up @@ -345,3 +346,14 @@ func TestHeapAddAfterClose(t *testing.T) {
err = h.BulkAdd([]testHeapObject{mkHeapObj("test", 1)})
require.EqualError(t, err, closedMsg)
}

func TestHeap_IsEmpty(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts)
require.True(t, h.IsEmpty())

h.Add(mkHeapObj("foo", 10))
require.False(t, h.IsEmpty())

h.Pop()
require.True(t, h.IsEmpty())
}
7 changes: 6 additions & 1 deletion pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,29 @@ go_library(
"job.go",
"non_partitioned_table_analysis_job.go",
"queue.go",
"queue_v2.go",
"static_partitioned_table_analysis_job.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/notifier",
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/internal/heap",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
Expand All @@ -46,10 +50,11 @@ go_test(
"main_test.go",
"non_partitioned_table_analysis_job_test.go",
"queue_test.go",
"queue_v2_test.go",
"static_partitioned_table_analysis_job_test.go",
],
flaky = True,
shard_count = 29,
shard_count = 32,
deps = [
":priorityqueue",
"//pkg/meta/model",
Expand Down
256 changes: 256 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package priorityqueue

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"go.uber.org/zap"
)

const (
lastAnalysisDurationRefreshInterval = time.Minute * 10
)

// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs.
type AnalysisPriorityQueueV2 struct {
inner *heap.Heap[int64, AnalysisJob]
statsHandle statstypes.StatsHandle
calculator *PriorityCalculator
autoAnalysisTimeWindow atomic.Value // stores *autoAnalysisTimeWindow

ctx context.Context
cancel context.CancelFunc
wg util.WaitGroupWrapper

// initialized is a flag to check if the queue is initialized.
initialized atomic.Bool
}

// NewAnalysisPriorityQueueV2 creates a new AnalysisPriorityQueue2.
func NewAnalysisPriorityQueueV2(handle statstypes.StatsHandle) *AnalysisPriorityQueueV2 {
ctx, cancel := context.WithCancel(context.Background())

return &AnalysisPriorityQueueV2{
statsHandle: handle,
calculator: NewPriorityCalculator(),
ctx: ctx,
cancel: cancel,
}
}

// IsInitialized checks if the priority queue is initialized.
func (pq *AnalysisPriorityQueueV2) IsInitialized() bool {
return pq.initialized.Load()
}

// Initialize initializes the priority queue.
func (pq *AnalysisPriorityQueueV2) Initialize() error {
if pq.initialized.Load() {
statslogutil.StatsLogger().Warn("Priority queue already initialized")
return nil
}

start := time.Now()
defer func() {
statslogutil.StatsLogger().Info("Priority queue initialized", zap.Duration("duration", time.Since(start)))
}()

keyFunc := func(job AnalysisJob) (int64, error) {
return job.GetTableID(), nil
}
// We want the job with the highest weight to be at the top of the priority queue.
lessFunc := func(a, b AnalysisJob) bool {
return a.GetWeight() > b.GetWeight()
}
pq.inner = heap.NewHeap(keyFunc, lessFunc)
if err := pq.init(); err != nil {
pq.Close()
return errors.Trace(err)
}

// Start a goroutine to maintain the priority queue.
pq.wg.Run(pq.run)
pq.initialized.Store(true)
return nil
}

// init initializes the priority queue and adds jobs to it.
func (pq *AnalysisPriorityQueueV2) init() error {
return statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
err := pq.setAutoAnalysisTimeWindow(parameters)
if err != nil {
return err
}
if !pq.IsWithinTimeWindow() {
return nil
}
timeWindow := pq.autoAnalysisTimeWindow.Load().(*AutoAnalysisTimeWindow)

err = FetchAllTablesAndBuildAnalysisJobs(
sctx,
parameters,
*timeWindow,
pq.statsHandle,
pq.Push,
)
if err != nil {
return errors.Trace(err)
}
return nil
}, statsutil.FlagWrapTxn)
}

// run maintains the priority queue.
func (pq *AnalysisPriorityQueueV2) run() {
defer func() {
if r := recover(); r != nil {
statslogutil.StatsLogger().Error("Priority queue panicked", zap.Any("recover", r), zap.Stack("stack"))
}
}()

timeRefreshInterval := time.NewTicker(lastAnalysisDurationRefreshInterval)
defer timeRefreshInterval.Stop()

for {
select {
case <-pq.ctx.Done():
statslogutil.StatsLogger().Info("Priority queue stopped")
return
case <-timeRefreshInterval.C:
statslogutil.StatsLogger().Info("Start to refresh last analysis durations of jobs")
pq.RefreshLastAnalysisDuration()
}
}
}

// RefreshLastAnalysisDuration refreshes the last analysis duration of all jobs in the priority queue.
func (pq *AnalysisPriorityQueueV2) RefreshLastAnalysisDuration() {
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
if err := pq.setAutoAnalysisTimeWindow(parameters); err != nil {
return errors.Trace(err)
}
if !pq.IsWithinTimeWindow() {
statslogutil.StatsLogger().Debug("Not within the auto analyze time window, skip refreshing last analysis duration")
return nil
}
start := time.Now()
defer func() {
statslogutil.StatsLogger().Info("Last analysis duration refreshed", zap.Duration("duration", time.Since(start)))
}()
jobs := pq.inner.List()
for _, job := range jobs {
indicators := job.GetIndicators()
currentTs, err := getStartTs(sctx)
if err != nil {
return errors.Trace(err)
}

jobFactory := NewAnalysisJobFactory(sctx, 0, currentTs)
tableStats, ok := pq.statsHandle.Get(job.GetTableID())
if !ok {
statslogutil.StatsLogger().Warn("Table stats not found during refreshing last analysis duration",
zap.Int64("tableID", job.GetTableID()),
zap.String("job", job.String()),
)
err := pq.inner.Delete(job)
if err != nil {
statslogutil.StatsLogger().Error("Failed to delete job from priority queue",
zap.Error(err),
zap.String("job", job.String()),
)
}
}
indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats)
job.SetIndicators(indicators)
job.SetWeight(pq.calculator.CalculateWeight(job))
if err := pq.inner.Update(job); err != nil {
statslogutil.StatsLogger().Error("Failed to add job to priority queue",
zap.Error(err),
zap.String("job", job.String()),
)
}
}
return nil
}, statsutil.FlagWrapTxn); err != nil {
statslogutil.StatsLogger().Error("Failed to refresh last analysis duration", zap.Error(err))
}
}

func (*AnalysisPriorityQueueV2) handleDDLEvent(_ context.Context, _ sessionctx.Context, _ notifier.SchemaChangeEvent) {
// TODO: Handle the ddl event.
// Only care about the add index event.
}

// Push pushes a job into the priority queue.
func (pq *AnalysisPriorityQueueV2) Push(job AnalysisJob) error {
return pq.inner.Add(job)
}

// Pop pops a job from the priority queue.
func (pq *AnalysisPriorityQueueV2) Pop() (AnalysisJob, error) {
return pq.inner.Pop()
}

// IsEmpty checks whether the priority queue is empty.
func (pq *AnalysisPriorityQueueV2) IsEmpty() bool {
return pq.inner.IsEmpty()
}

func (pq *AnalysisPriorityQueueV2) setAutoAnalysisTimeWindow(
parameters map[string]string,
) error {
start, end, err := exec.ParseAutoAnalysisWindow(
parameters[variable.TiDBAutoAnalyzeStartTime],
parameters[variable.TiDBAutoAnalyzeEndTime],
)
if err != nil {
return errors.Wrap(err, "parse auto analyze period failed")
}
timeWindow := NewAutoAnalysisTimeWindow(start, end)
pq.autoAnalysisTimeWindow.Store(&timeWindow)
return nil
}

// IsWithinTimeWindow checks if the current time is within the auto analyze time window.
func (pq *AnalysisPriorityQueueV2) IsWithinTimeWindow() bool {
window := pq.autoAnalysisTimeWindow.Load().(*AutoAnalysisTimeWindow)
return window.IsWithinTimeWindow(time.Now())
}

// Close closes the priority queue.
func (pq *AnalysisPriorityQueueV2) Close() {
if !pq.initialized.Load() {
return
}

pq.cancel()
pq.wg.Wait()
pq.inner.Close()
}
Loading

0 comments on commit f7eba7e

Please sign in to comment.