From 468d42928896f0b82c930c6917331aaaf081bb67 Mon Sep 17 00:00:00 2001 From: Rustin <29879298+Rustin170506@users.noreply.github.com> Date: Tue, 10 Sep 2024 01:30:56 -0700 Subject: [PATCH] statistics: reorder key and value generic parameters of the heap (#55974) ref pingcap/tidb#55906 --- .../autoanalyze/internal/heap/BUILD.bazel | 11 ++- .../handle/autoanalyze/internal/heap/heap.go | 90 +++++++++---------- .../autoanalyze/internal/heap/main_test.go | 34 +++++++ 3 files changed, 88 insertions(+), 47 deletions(-) create mode 100644 pkg/statistics/handle/autoanalyze/internal/heap/main_test.go diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel b/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel index 2f4ba1e9b6ae2..49f341c7b164e 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel @@ -11,9 +11,16 @@ go_library( go_test( name = "heap_test", timeout = "short", - srcs = ["heap_test.go"], + srcs = [ + "heap_test.go", + "main_test.go", + ], embed = [":heap"], flaky = True, shard_count = 14, - deps = ["@com_github_stretchr_testify//require"], + deps = [ + "//pkg/testkit/testsetup", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], ) diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go index f3d677167814b..67b42ea428e26 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go +++ b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go @@ -31,27 +31,27 @@ const ( ) // LessFunc is used to compare two objects in the heap. -type LessFunc[T any] func(T, T) bool +type LessFunc[V any] func(V, V) bool // KeyFunc is used to generate a key for an object. -type KeyFunc[T any, K comparable] func(T) (K, error) +type KeyFunc[K comparable, V any] func(V) (K, error) -type heapItem[T any] struct { - obj T // The object which is stored in the heap. +type heapItem[K comparable, V any] struct { + obj V // The object which is stored in the heap. index int // The index of the object's key in the Heap.queue. } -type itemKeyValue[T any, K comparable] struct { - obj T +type itemKeyValue[K comparable, V any] struct { key K + obj V } // heapData is an internal struct that implements the standard heap interface // and keeps the data stored in the heap. -type heapData[T any, K comparable] struct { - items map[K]*heapItem[T] - keyFunc KeyFunc[T, K] - lessFunc LessFunc[T] +type heapData[K comparable, V any] struct { + items map[K]*heapItem[K, V] + keyFunc KeyFunc[K, V] + lessFunc LessFunc[V] queue []K } @@ -60,7 +60,7 @@ var ( ) // Less is a standard heap interface function. -func (h *heapData[T, K]) Less(i, j int) bool { +func (h *heapData[K, V]) Less(i, j int) bool { if i >= len(h.queue) || j >= len(h.queue) { return false } @@ -76,10 +76,10 @@ func (h *heapData[T, K]) Less(i, j int) bool { } // Len is a standard heap interface function. -func (h *heapData[T, K]) Len() int { return len(h.queue) } +func (h *heapData[K, V]) Len() int { return len(h.queue) } // Swap is a standard heap interface function. -func (h *heapData[T, K]) Swap(i, j int) { +func (h *heapData[K, V]) Swap(i, j int) { h.queue[i], h.queue[j] = h.queue[j], h.queue[i] item := h.items[h.queue[i]] item.index = i @@ -88,15 +88,15 @@ func (h *heapData[T, K]) Swap(i, j int) { } // Push is a standard heap interface function. -func (h *heapData[T, K]) Push(kv any) { - keyValue := kv.(*itemKeyValue[T, K]) +func (h *heapData[K, V]) Push(kv any) { + keyValue := kv.(*itemKeyValue[K, V]) n := len(h.queue) - h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n} + h.items[keyValue.key] = &heapItem[K, V]{keyValue.obj, n} h.queue = append(h.queue, keyValue.key) } // Pop is a standard heap interface function. -func (h *heapData[T, K]) Pop() any { +func (h *heapData[K, V]) Pop() any { key := h.queue[len(h.queue)-1] h.queue = h.queue[:len(h.queue)-1] item, ok := h.items[key] @@ -108,15 +108,15 @@ func (h *heapData[T, K]) Pop() any { } // Heap is a thread-safe producer/consumer queue that implements a heap data structure. -type Heap[T any, K comparable] struct { - data *heapData[T, K] +type Heap[K comparable, V any] struct { + data *heapData[K, V] cond sync.Cond lock sync.RWMutex closed bool } // Close closes the heap. -func (h *Heap[T, K]) Close() { +func (h *Heap[K, V]) Close() { h.lock.Lock() defer h.lock.Unlock() h.closed = true @@ -124,7 +124,7 @@ func (h *Heap[T, K]) Close() { } // Add adds an object or updates it if it already exists. -func (h *Heap[T, K]) Add(obj T) error { +func (h *Heap[K, V]) Add(obj V) error { key, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -145,7 +145,7 @@ func (h *Heap[T, K]) Add(obj T) error { } // BulkAdd adds a list of objects to the heap. -func (h *Heap[T, K]) BulkAdd(list []T) error { +func (h *Heap[K, V]) BulkAdd(list []V) error { h.lock.Lock() defer h.lock.Unlock() if h.closed { @@ -168,7 +168,7 @@ func (h *Heap[T, K]) BulkAdd(list []T) error { } // AddIfNotPresent adds an object if it does not already exist. -func (h *Heap[T, K]) AddIfNotPresent(obj T) error { +func (h *Heap[K, V]) AddIfNotPresent(obj V) error { id, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -183,20 +183,20 @@ func (h *Heap[T, K]) AddIfNotPresent(obj T) error { return nil } -func (h *Heap[T, K]) addIfNotPresentLocked(key K, obj T) { +func (h *Heap[K, V]) addIfNotPresentLocked(key K, obj V) { if _, exists := h.data.items[key]; exists { return } - heap.Push(h.data, &itemKeyValue[T, K]{obj, key}) + heap.Push(h.data, &itemKeyValue[K, V]{key, obj}) } // Update is an alias for Add. -func (h *Heap[T, K]) Update(obj T) error { +func (h *Heap[K, V]) Update(obj V) error { return h.Add(obj) } // Delete removes an object from the heap. -func (h *Heap[T, K]) Delete(obj T) error { +func (h *Heap[K, V]) Delete(obj V) error { key, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -211,40 +211,40 @@ func (h *Heap[T, K]) Delete(obj T) error { } // Peek returns the top object from the heap without removing it. -func (h *Heap[T, K]) Peek() (T, error) { +func (h *Heap[K, V]) Peek() (V, error) { h.lock.RLock() defer h.lock.RUnlock() if len(h.data.queue) == 0 { - var zero T + var zero V return zero, errors.New("heap is empty") } return h.data.items[h.data.queue[0]].obj, nil } // Pop removes the top object from the heap and returns it. -func (h *Heap[T, K]) Pop() (T, error) { +func (h *Heap[K, V]) Pop() (V, error) { h.lock.Lock() defer h.lock.Unlock() for len(h.data.queue) == 0 { if h.closed { - var zero T + var zero V return zero, errors.New("heap is closed") } h.cond.Wait() } obj := heap.Pop(h.data) if obj == nil { - var zero T + var zero V return zero, errors.New("object was removed from heap data") } - return obj.(T), nil + return obj.(V), nil } // List returns a list of all objects in the heap. -func (h *Heap[T, K]) List() []T { +func (h *Heap[K, V]) List() []V { h.lock.RLock() defer h.lock.RUnlock() - list := make([]T, 0, len(h.data.items)) + list := make([]V, 0, len(h.data.items)) for _, item := range h.data.items { list = append(list, item.obj) } @@ -252,7 +252,7 @@ func (h *Heap[T, K]) List() []T { } // ListKeys returns a list of all keys in the heap. -func (h *Heap[T, K]) ListKeys() []K { +func (h *Heap[K, V]) ListKeys() []K { h.lock.RLock() defer h.lock.RUnlock() list := make([]K, 0, len(h.data.items)) @@ -263,39 +263,39 @@ func (h *Heap[T, K]) ListKeys() []K { } // Get returns an object from the heap. -func (h *Heap[T, K]) Get(obj T) (T, bool, error) { +func (h *Heap[K, V]) Get(obj V) (V, bool, error) { key, err := h.data.keyFunc(obj) if err != nil { - var zero T + var zero V return zero, false, errors.Errorf("key error: %v", err) } return h.GetByKey(key) } // GetByKey returns an object from the heap by key. -func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) { +func (h *Heap[K, V]) GetByKey(key K) (V, bool, error) { h.lock.RLock() defer h.lock.RUnlock() item, exists := h.data.items[key] if !exists { - var zero T + var zero V return zero, false, nil } return item.obj, true, nil } // IsClosed returns true if the heap is closed. -func (h *Heap[T, K]) IsClosed() bool { +func (h *Heap[K, V]) IsClosed() bool { h.lock.RLock() defer h.lock.RUnlock() return h.closed } // NewHeap returns a Heap which can be used to queue up items to process. -func NewHeap[T any, K comparable](keyFn KeyFunc[T, K], lessFn LessFunc[T]) *Heap[T, K] { - h := &Heap[T, K]{ - data: &heapData[T, K]{ - items: map[K]*heapItem[T]{}, +func NewHeap[K comparable, V any](keyFn KeyFunc[K, V], lessFn LessFunc[V]) *Heap[K, V] { + h := &Heap[K, V]{ + data: &heapData[K, V]{ + items: map[K]*heapItem[K, V]{}, queue: []K{}, keyFunc: keyFn, lessFunc: lessFn, diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/main_test.go b/pkg/statistics/handle/autoanalyze/internal/heap/main_test.go new file mode 100644 index 0000000000000..c3e3b61659f27 --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/internal/heap/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package heap + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/testkit/testsetup" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + testsetup.SetupForCommonTest() + goleak.VerifyTestMain(m, opts...) +}