From d7588c412212385471f2bec27f31b918e2c0aaf3 Mon Sep 17 00:00:00 2001 From: Rustin170506 <29879298+Rustin170506@users.noreply.github.com> Date: Thu, 5 Sep 2024 17:55:05 +0800 Subject: [PATCH] statistics: add AnalysisPriorityQueueV2 Signed-off-by: Rustin170506 <29879298+Rustin170506@users.noreply.github.com> --- .../handle/autoanalyze/internal/heap/heap.go | 70 ++++++------ .../autoanalyze/priorityqueue/BUILD.bazel | 4 + .../autoanalyze/priorityqueue/queue2.go | 102 ++++++++++++++++++ 3 files changed, 141 insertions(+), 35 deletions(-) create mode 100644 pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go index f3d677167814b..76b497cedddc0 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go +++ b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go @@ -34,23 +34,23 @@ const ( type LessFunc[T any] func(T, T) bool // KeyFunc is used to generate a key for an object. -type KeyFunc[T any, K comparable] func(T) (K, error) +type KeyFunc[K comparable, T any] func(T) (K, error) -type heapItem[T any] struct { +type heapItem[K comparable, T any] struct { obj T // The object which is stored in the heap. index int // The index of the object's key in the Heap.queue. } -type itemKeyValue[T any, K comparable] struct { - obj T +type itemKeyValue[K comparable, T any] struct { key K + obj T } // heapData is an internal struct that implements the standard heap interface // and keeps the data stored in the heap. -type heapData[T any, K comparable] struct { - items map[K]*heapItem[T] - keyFunc KeyFunc[T, K] +type heapData[K comparable, T any] struct { + items map[K]*heapItem[K, T] + keyFunc KeyFunc[K, T] lessFunc LessFunc[T] queue []K } @@ -60,7 +60,7 @@ var ( ) // Less is a standard heap interface function. -func (h *heapData[T, K]) Less(i, j int) bool { +func (h *heapData[K, T]) Less(i, j int) bool { if i >= len(h.queue) || j >= len(h.queue) { return false } @@ -76,10 +76,10 @@ func (h *heapData[T, K]) Less(i, j int) bool { } // Len is a standard heap interface function. -func (h *heapData[T, K]) Len() int { return len(h.queue) } +func (h *heapData[K, T]) Len() int { return len(h.queue) } // Swap is a standard heap interface function. -func (h *heapData[T, K]) Swap(i, j int) { +func (h *heapData[K, T]) Swap(i, j int) { h.queue[i], h.queue[j] = h.queue[j], h.queue[i] item := h.items[h.queue[i]] item.index = i @@ -88,15 +88,15 @@ func (h *heapData[T, K]) Swap(i, j int) { } // Push is a standard heap interface function. -func (h *heapData[T, K]) Push(kv any) { - keyValue := kv.(*itemKeyValue[T, K]) +func (h *heapData[K, T]) Push(kv any) { + keyValue := kv.(*itemKeyValue[K, T]) n := len(h.queue) - h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n} + h.items[keyValue.key] = &heapItem[K, T]{keyValue.obj, n} h.queue = append(h.queue, keyValue.key) } // Pop is a standard heap interface function. -func (h *heapData[T, K]) Pop() any { +func (h *heapData[K, T]) Pop() any { key := h.queue[len(h.queue)-1] h.queue = h.queue[:len(h.queue)-1] item, ok := h.items[key] @@ -108,15 +108,15 @@ func (h *heapData[T, K]) Pop() any { } // Heap is a thread-safe producer/consumer queue that implements a heap data structure. -type Heap[T any, K comparable] struct { - data *heapData[T, K] +type Heap[K comparable, T any] struct { + data *heapData[K, T] cond sync.Cond lock sync.RWMutex closed bool } // Close closes the heap. -func (h *Heap[T, K]) Close() { +func (h *Heap[K, T]) Close() { h.lock.Lock() defer h.lock.Unlock() h.closed = true @@ -124,7 +124,7 @@ func (h *Heap[T, K]) Close() { } // Add adds an object or updates it if it already exists. -func (h *Heap[T, K]) Add(obj T) error { +func (h *Heap[K, T]) Add(obj T) error { key, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -145,7 +145,7 @@ func (h *Heap[T, K]) Add(obj T) error { } // BulkAdd adds a list of objects to the heap. -func (h *Heap[T, K]) BulkAdd(list []T) error { +func (h *Heap[K, T]) BulkAdd(list []T) error { h.lock.Lock() defer h.lock.Unlock() if h.closed { @@ -168,7 +168,7 @@ func (h *Heap[T, K]) BulkAdd(list []T) error { } // AddIfNotPresent adds an object if it does not already exist. -func (h *Heap[T, K]) AddIfNotPresent(obj T) error { +func (h *Heap[K, T]) AddIfNotPresent(obj T) error { id, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -183,20 +183,20 @@ func (h *Heap[T, K]) AddIfNotPresent(obj T) error { return nil } -func (h *Heap[T, K]) addIfNotPresentLocked(key K, obj T) { +func (h *Heap[K, T]) addIfNotPresentLocked(key K, obj T) { if _, exists := h.data.items[key]; exists { return } - heap.Push(h.data, &itemKeyValue[T, K]{obj, key}) + heap.Push(h.data, &itemKeyValue[K, T]{key, obj}) } // Update is an alias for Add. -func (h *Heap[T, K]) Update(obj T) error { +func (h *Heap[K, T]) Update(obj T) error { return h.Add(obj) } // Delete removes an object from the heap. -func (h *Heap[T, K]) Delete(obj T) error { +func (h *Heap[K, T]) Delete(obj T) error { key, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -211,7 +211,7 @@ func (h *Heap[T, K]) Delete(obj T) error { } // Peek returns the top object from the heap without removing it. -func (h *Heap[T, K]) Peek() (T, error) { +func (h *Heap[K, T]) Peek() (T, error) { h.lock.RLock() defer h.lock.RUnlock() if len(h.data.queue) == 0 { @@ -222,7 +222,7 @@ func (h *Heap[T, K]) Peek() (T, error) { } // Pop removes the top object from the heap and returns it. -func (h *Heap[T, K]) Pop() (T, error) { +func (h *Heap[K, T]) Pop() (T, error) { h.lock.Lock() defer h.lock.Unlock() for len(h.data.queue) == 0 { @@ -241,7 +241,7 @@ func (h *Heap[T, K]) Pop() (T, error) { } // List returns a list of all objects in the heap. -func (h *Heap[T, K]) List() []T { +func (h *Heap[K, T]) List() []T { h.lock.RLock() defer h.lock.RUnlock() list := make([]T, 0, len(h.data.items)) @@ -252,7 +252,7 @@ func (h *Heap[T, K]) List() []T { } // ListKeys returns a list of all keys in the heap. -func (h *Heap[T, K]) ListKeys() []K { +func (h *Heap[K, T]) ListKeys() []K { h.lock.RLock() defer h.lock.RUnlock() list := make([]K, 0, len(h.data.items)) @@ -263,7 +263,7 @@ func (h *Heap[T, K]) ListKeys() []K { } // Get returns an object from the heap. -func (h *Heap[T, K]) Get(obj T) (T, bool, error) { +func (h *Heap[K, T]) Get(obj T) (T, bool, error) { key, err := h.data.keyFunc(obj) if err != nil { var zero T @@ -273,7 +273,7 @@ func (h *Heap[T, K]) Get(obj T) (T, bool, error) { } // GetByKey returns an object from the heap by key. -func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) { +func (h *Heap[K, T]) GetByKey(key K) (T, bool, error) { h.lock.RLock() defer h.lock.RUnlock() item, exists := h.data.items[key] @@ -285,17 +285,17 @@ func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) { } // IsClosed returns true if the heap is closed. -func (h *Heap[T, K]) IsClosed() bool { +func (h *Heap[K, T]) IsClosed() bool { h.lock.RLock() defer h.lock.RUnlock() return h.closed } // NewHeap returns a Heap which can be used to queue up items to process. -func NewHeap[T any, K comparable](keyFn KeyFunc[T, K], lessFn LessFunc[T]) *Heap[T, K] { - h := &Heap[T, K]{ - data: &heapData[T, K]{ - items: map[K]*heapItem[T]{}, +func NewHeap[K comparable, T any](keyFn KeyFunc[K, T], lessFn LessFunc[T]) *Heap[K, T] { + h := &Heap[K, T]{ + data: &heapData[K, T]{ + items: map[K]*heapItem[K, T]{}, queue: []K{}, keyFunc: keyFn, lessFunc: lessFn, diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index 927b1f0b50522..6d2cf3f1b96d2 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "job.go", "non_partitioned_table_analysis_job.go", "queue.go", + "queue2.go", "static_partitioned_table_analysis_job.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue", @@ -17,10 +18,13 @@ go_library( "//pkg/sessionctx", "//pkg/sessionctx/sysproctrack", "//pkg/sessionctx/variable", + "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", + "//pkg/statistics/handle/autoanalyze/internal/heap", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", + "//pkg/util", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go new file mode 100644 index 0000000000000..13c4df7ddb390 --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go @@ -0,0 +1,102 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package priorityqueue + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap" + "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/util" +) + +// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs. +type AnalysisPriorityQueueV2 struct { + inner *heap.Heap[int64, AnalysisJob] + + ctx context.Context + cancel context.CancelFunc + wg util.WaitGroupWrapper + statsCache types.StatsCache + // lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch. + lastDMLUpdateFetchTimestamp atomic.Uint64 +} + +// NewAnalysisPriorityQueue2 creates a new AnalysisPriorityQueue2. +func NewAnalysisPriorityQueue2(statsCache types.StatsCache) *AnalysisPriorityQueueV2 { + keyFunc := func(job AnalysisJob) (int64, error) { + return job.GetTableID(), nil + } + lessFunc := func(a, b AnalysisJob) bool { + return a.GetWeight() > b.GetWeight() + } + ctx, cancel := context.WithCancel(context.Background()) + pq := &AnalysisPriorityQueueV2{ + ctx: ctx, + cancel: cancel, + statsCache: statsCache, + inner: heap.NewHeap(keyFunc, lessFunc), + } + + pq.wg.Run(pq.run) + return pq +} + +func (pq *AnalysisPriorityQueueV2) run() { + fetchInterval := time.NewTicker(time.Minute * 5) + defer fetchInterval.Stop() + + for { + select { + case <-pq.ctx.Done(): + return + case <-fetchInterval.C: + pq.fetchDMLUpdate() + } + } +} + +func (pq *AnalysisPriorityQueueV2) fetchDMLUpdate() { + values := pq.statsCache.Values() + lastFetchTimestamp := pq.lastDMLUpdateFetchTimestamp.Load() + var newMaxVersion uint64 + + for _, value := range values { + if value.Version > lastFetchTimestamp { + // Handle the table stats + pq.handleTableStats(value) + } + newMaxVersion = max(newMaxVersion, value.Version) + } + + // Only update if we've seen a newer version + if newMaxVersion > lastFetchTimestamp { + pq.lastDMLUpdateFetchTimestamp.Store(newMaxVersion) + } +} + +func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) { + // TODO: Implement the logic to handle table stats + // This might include updating the priority queue based on the new stats +} + +func (pq *AnalysisPriorityQueueV2) Close() { + pq.cancel() + pq.wg.Wait() + pq.inner.Close() +}