Skip to content

Commit

Permalink
statistics: add AnalysisPriorityQueueV2
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <[email protected]>
  • Loading branch information
Rustin170506 committed Sep 5, 2024
1 parent fef43c5 commit d7588c4
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 35 deletions.
70 changes: 35 additions & 35 deletions pkg/statistics/handle/autoanalyze/internal/heap/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ const (
type LessFunc[T any] func(T, T) bool

// KeyFunc is used to generate a key for an object.
type KeyFunc[T any, K comparable] func(T) (K, error)
type KeyFunc[K comparable, T any] func(T) (K, error)

type heapItem[T any] struct {
type heapItem[K comparable, T any] struct {
obj T // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}

type itemKeyValue[T any, K comparable] struct {
obj T
type itemKeyValue[K comparable, T any] struct {
key K
obj T
}

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData[T any, K comparable] struct {
items map[K]*heapItem[T]
keyFunc KeyFunc[T, K]
type heapData[K comparable, T any] struct {
items map[K]*heapItem[K, T]
keyFunc KeyFunc[K, T]
lessFunc LessFunc[T]
queue []K
}
Expand All @@ -60,7 +60,7 @@ var (
)

// Less is a standard heap interface function.
func (h *heapData[T, K]) Less(i, j int) bool {
func (h *heapData[K, T]) Less(i, j int) bool {
if i >= len(h.queue) || j >= len(h.queue) {
return false
}
Expand All @@ -76,10 +76,10 @@ func (h *heapData[T, K]) Less(i, j int) bool {
}

// Len is a standard heap interface function.
func (h *heapData[T, K]) Len() int { return len(h.queue) }
func (h *heapData[K, T]) Len() int { return len(h.queue) }

// Swap is a standard heap interface function.
func (h *heapData[T, K]) Swap(i, j int) {
func (h *heapData[K, T]) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
Expand All @@ -88,15 +88,15 @@ func (h *heapData[T, K]) Swap(i, j int) {
}

// Push is a standard heap interface function.
func (h *heapData[T, K]) Push(kv any) {
keyValue := kv.(*itemKeyValue[T, K])
func (h *heapData[K, T]) Push(kv any) {
keyValue := kv.(*itemKeyValue[K, T])
n := len(h.queue)
h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n}
h.items[keyValue.key] = &heapItem[K, T]{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}

// Pop is a standard heap interface function.
func (h *heapData[T, K]) Pop() any {
func (h *heapData[K, T]) Pop() any {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[:len(h.queue)-1]
item, ok := h.items[key]
Expand All @@ -108,23 +108,23 @@ func (h *heapData[T, K]) Pop() any {
}

// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
type Heap[T any, K comparable] struct {
data *heapData[T, K]
type Heap[K comparable, T any] struct {
data *heapData[K, T]
cond sync.Cond
lock sync.RWMutex
closed bool
}

// Close closes the heap.
func (h *Heap[T, K]) Close() {
func (h *Heap[K, T]) Close() {
h.lock.Lock()
defer h.lock.Unlock()
h.closed = true
h.cond.Broadcast()
}

// Add adds an object or updates it if it already exists.
func (h *Heap[T, K]) Add(obj T) error {
func (h *Heap[K, T]) Add(obj T) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -145,7 +145,7 @@ func (h *Heap[T, K]) Add(obj T) error {
}

// BulkAdd adds a list of objects to the heap.
func (h *Heap[T, K]) BulkAdd(list []T) error {
func (h *Heap[K, T]) BulkAdd(list []T) error {
h.lock.Lock()
defer h.lock.Unlock()
if h.closed {
Expand All @@ -168,7 +168,7 @@ func (h *Heap[T, K]) BulkAdd(list []T) error {
}

// AddIfNotPresent adds an object if it does not already exist.
func (h *Heap[T, K]) AddIfNotPresent(obj T) error {
func (h *Heap[K, T]) AddIfNotPresent(obj T) error {
id, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -183,20 +183,20 @@ func (h *Heap[T, K]) AddIfNotPresent(obj T) error {
return nil
}

func (h *Heap[T, K]) addIfNotPresentLocked(key K, obj T) {
func (h *Heap[K, T]) addIfNotPresentLocked(key K, obj T) {
if _, exists := h.data.items[key]; exists {
return
}
heap.Push(h.data, &itemKeyValue[T, K]{obj, key})
heap.Push(h.data, &itemKeyValue[K, T]{key, obj})
}

// Update is an alias for Add.
func (h *Heap[T, K]) Update(obj T) error {
func (h *Heap[K, T]) Update(obj T) error {
return h.Add(obj)
}

// Delete removes an object from the heap.
func (h *Heap[T, K]) Delete(obj T) error {
func (h *Heap[K, T]) Delete(obj T) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -211,7 +211,7 @@ func (h *Heap[T, K]) Delete(obj T) error {
}

// Peek returns the top object from the heap without removing it.
func (h *Heap[T, K]) Peek() (T, error) {
func (h *Heap[K, T]) Peek() (T, error) {
h.lock.RLock()
defer h.lock.RUnlock()
if len(h.data.queue) == 0 {
Expand All @@ -222,7 +222,7 @@ func (h *Heap[T, K]) Peek() (T, error) {
}

// Pop removes the top object from the heap and returns it.
func (h *Heap[T, K]) Pop() (T, error) {
func (h *Heap[K, T]) Pop() (T, error) {
h.lock.Lock()
defer h.lock.Unlock()
for len(h.data.queue) == 0 {
Expand All @@ -241,7 +241,7 @@ func (h *Heap[T, K]) Pop() (T, error) {
}

// List returns a list of all objects in the heap.
func (h *Heap[T, K]) List() []T {
func (h *Heap[K, T]) List() []T {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]T, 0, len(h.data.items))
Expand All @@ -252,7 +252,7 @@ func (h *Heap[T, K]) List() []T {
}

// ListKeys returns a list of all keys in the heap.
func (h *Heap[T, K]) ListKeys() []K {
func (h *Heap[K, T]) ListKeys() []K {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]K, 0, len(h.data.items))
Expand All @@ -263,7 +263,7 @@ func (h *Heap[T, K]) ListKeys() []K {
}

// Get returns an object from the heap.
func (h *Heap[T, K]) Get(obj T) (T, bool, error) {
func (h *Heap[K, T]) Get(obj T) (T, bool, error) {
key, err := h.data.keyFunc(obj)
if err != nil {
var zero T
Expand All @@ -273,7 +273,7 @@ func (h *Heap[T, K]) Get(obj T) (T, bool, error) {
}

// GetByKey returns an object from the heap by key.
func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) {
func (h *Heap[K, T]) GetByKey(key K) (T, bool, error) {
h.lock.RLock()
defer h.lock.RUnlock()
item, exists := h.data.items[key]
Expand All @@ -285,17 +285,17 @@ func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) {
}

// IsClosed returns true if the heap is closed.
func (h *Heap[T, K]) IsClosed() bool {
func (h *Heap[K, T]) IsClosed() bool {
h.lock.RLock()
defer h.lock.RUnlock()
return h.closed
}

// NewHeap returns a Heap which can be used to queue up items to process.
func NewHeap[T any, K comparable](keyFn KeyFunc[T, K], lessFn LessFunc[T]) *Heap[T, K] {
h := &Heap[T, K]{
data: &heapData[T, K]{
items: map[K]*heapItem[T]{},
func NewHeap[K comparable, T any](keyFn KeyFunc[K, T], lessFn LessFunc[T]) *Heap[K, T] {
h := &Heap[K, T]{
data: &heapData[K, T]{
items: map[K]*heapItem[K, T]{},
queue: []K{},
keyFunc: keyFn,
lessFunc: lessFn,
Expand Down
4 changes: 4 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"job.go",
"non_partitioned_table_analysis_job.go",
"queue.go",
"queue2.go",
"static_partitioned_table_analysis_job.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
Expand All @@ -17,10 +18,13 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/internal/heap",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"@org_uber_go_zap//:zap",
],
)
Expand Down
102 changes: 102 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package priorityqueue

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
)

// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs.
type AnalysisPriorityQueueV2 struct {
inner *heap.Heap[int64, AnalysisJob]

ctx context.Context
cancel context.CancelFunc
wg util.WaitGroupWrapper
statsCache types.StatsCache
// lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch.
lastDMLUpdateFetchTimestamp atomic.Uint64
}

// NewAnalysisPriorityQueue2 creates a new AnalysisPriorityQueue2.
func NewAnalysisPriorityQueue2(statsCache types.StatsCache) *AnalysisPriorityQueueV2 {
keyFunc := func(job AnalysisJob) (int64, error) {
return job.GetTableID(), nil
}
lessFunc := func(a, b AnalysisJob) bool {
return a.GetWeight() > b.GetWeight()
}
ctx, cancel := context.WithCancel(context.Background())
pq := &AnalysisPriorityQueueV2{
ctx: ctx,
cancel: cancel,
statsCache: statsCache,
inner: heap.NewHeap(keyFunc, lessFunc),
}

pq.wg.Run(pq.run)
return pq
}

func (pq *AnalysisPriorityQueueV2) run() {
fetchInterval := time.NewTicker(time.Minute * 5)
defer fetchInterval.Stop()

for {
select {
case <-pq.ctx.Done():
return
case <-fetchInterval.C:
pq.fetchDMLUpdate()
}
}
}

func (pq *AnalysisPriorityQueueV2) fetchDMLUpdate() {
values := pq.statsCache.Values()
lastFetchTimestamp := pq.lastDMLUpdateFetchTimestamp.Load()
var newMaxVersion uint64

for _, value := range values {
if value.Version > lastFetchTimestamp {
// Handle the table stats
pq.handleTableStats(value)
}
newMaxVersion = max(newMaxVersion, value.Version)
}

// Only update if we've seen a newer version
if newMaxVersion > lastFetchTimestamp {
pq.lastDMLUpdateFetchTimestamp.Store(newMaxVersion)
}
}

func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
// TODO: Implement the logic to handle table stats
// This might include updating the priority queue based on the new stats
}

func (pq *AnalysisPriorityQueueV2) Close() {
pq.cancel()
pq.wg.Wait()
pq.inner.Close()
}

0 comments on commit d7588c4

Please sign in to comment.