Skip to content

Commit

Permalink
statistics: reorder key and value generic parameters of the heap (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored and Benjamin2037 committed Sep 11, 2024
1 parent 7eef7e4 commit 468d429
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 47 deletions.
11 changes: 9 additions & 2 deletions pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ go_library(
go_test(
name = "heap_test",
timeout = "short",
srcs = ["heap_test.go"],
srcs = [
"heap_test.go",
"main_test.go",
],
embed = [":heap"],
flaky = True,
shard_count = 14,
deps = ["@com_github_stretchr_testify//require"],
deps = [
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
90 changes: 45 additions & 45 deletions pkg/statistics/handle/autoanalyze/internal/heap/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ const (
)

// LessFunc is used to compare two objects in the heap.
type LessFunc[T any] func(T, T) bool
type LessFunc[V any] func(V, V) bool

// KeyFunc is used to generate a key for an object.
type KeyFunc[T any, K comparable] func(T) (K, error)
type KeyFunc[K comparable, V any] func(V) (K, error)

type heapItem[T any] struct {
obj T // The object which is stored in the heap.
type heapItem[K comparable, V any] struct {
obj V // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}

type itemKeyValue[T any, K comparable] struct {
obj T
type itemKeyValue[K comparable, V any] struct {
key K
obj V
}

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData[T any, K comparable] struct {
items map[K]*heapItem[T]
keyFunc KeyFunc[T, K]
lessFunc LessFunc[T]
type heapData[K comparable, V any] struct {
items map[K]*heapItem[K, V]
keyFunc KeyFunc[K, V]
lessFunc LessFunc[V]
queue []K
}

Expand All @@ -60,7 +60,7 @@ var (
)

// Less is a standard heap interface function.
func (h *heapData[T, K]) Less(i, j int) bool {
func (h *heapData[K, V]) Less(i, j int) bool {
if i >= len(h.queue) || j >= len(h.queue) {
return false
}
Expand All @@ -76,10 +76,10 @@ func (h *heapData[T, K]) Less(i, j int) bool {
}

// Len is a standard heap interface function.
func (h *heapData[T, K]) Len() int { return len(h.queue) }
func (h *heapData[K, V]) Len() int { return len(h.queue) }

// Swap is a standard heap interface function.
func (h *heapData[T, K]) Swap(i, j int) {
func (h *heapData[K, V]) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
Expand All @@ -88,15 +88,15 @@ func (h *heapData[T, K]) Swap(i, j int) {
}

// Push is a standard heap interface function.
func (h *heapData[T, K]) Push(kv any) {
keyValue := kv.(*itemKeyValue[T, K])
func (h *heapData[K, V]) Push(kv any) {
keyValue := kv.(*itemKeyValue[K, V])
n := len(h.queue)
h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n}
h.items[keyValue.key] = &heapItem[K, V]{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}

// Pop is a standard heap interface function.
func (h *heapData[T, K]) Pop() any {
func (h *heapData[K, V]) Pop() any {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[:len(h.queue)-1]
item, ok := h.items[key]
Expand All @@ -108,23 +108,23 @@ func (h *heapData[T, K]) Pop() any {
}

// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
type Heap[T any, K comparable] struct {
data *heapData[T, K]
type Heap[K comparable, V any] struct {
data *heapData[K, V]
cond sync.Cond
lock sync.RWMutex
closed bool
}

// Close closes the heap.
func (h *Heap[T, K]) Close() {
func (h *Heap[K, V]) Close() {
h.lock.Lock()
defer h.lock.Unlock()
h.closed = true
h.cond.Broadcast()
}

// Add adds an object or updates it if it already exists.
func (h *Heap[T, K]) Add(obj T) error {
func (h *Heap[K, V]) Add(obj V) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -145,7 +145,7 @@ func (h *Heap[T, K]) Add(obj T) error {
}

// BulkAdd adds a list of objects to the heap.
func (h *Heap[T, K]) BulkAdd(list []T) error {
func (h *Heap[K, V]) BulkAdd(list []V) error {
h.lock.Lock()
defer h.lock.Unlock()
if h.closed {
Expand All @@ -168,7 +168,7 @@ func (h *Heap[T, K]) BulkAdd(list []T) error {
}

// AddIfNotPresent adds an object if it does not already exist.
func (h *Heap[T, K]) AddIfNotPresent(obj T) error {
func (h *Heap[K, V]) AddIfNotPresent(obj V) error {
id, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -183,20 +183,20 @@ func (h *Heap[T, K]) AddIfNotPresent(obj T) error {
return nil
}

func (h *Heap[T, K]) addIfNotPresentLocked(key K, obj T) {
func (h *Heap[K, V]) addIfNotPresentLocked(key K, obj V) {
if _, exists := h.data.items[key]; exists {
return
}
heap.Push(h.data, &itemKeyValue[T, K]{obj, key})
heap.Push(h.data, &itemKeyValue[K, V]{key, obj})
}

// Update is an alias for Add.
func (h *Heap[T, K]) Update(obj T) error {
func (h *Heap[K, V]) Update(obj V) error {
return h.Add(obj)
}

// Delete removes an object from the heap.
func (h *Heap[T, K]) Delete(obj T) error {
func (h *Heap[K, V]) Delete(obj V) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -211,48 +211,48 @@ func (h *Heap[T, K]) Delete(obj T) error {
}

// Peek returns the top object from the heap without removing it.
func (h *Heap[T, K]) Peek() (T, error) {
func (h *Heap[K, V]) Peek() (V, error) {
h.lock.RLock()
defer h.lock.RUnlock()
if len(h.data.queue) == 0 {
var zero T
var zero V
return zero, errors.New("heap is empty")
}
return h.data.items[h.data.queue[0]].obj, nil
}

// Pop removes the top object from the heap and returns it.
func (h *Heap[T, K]) Pop() (T, error) {
func (h *Heap[K, V]) Pop() (V, error) {
h.lock.Lock()
defer h.lock.Unlock()
for len(h.data.queue) == 0 {
if h.closed {
var zero T
var zero V
return zero, errors.New("heap is closed")
}
h.cond.Wait()
}
obj := heap.Pop(h.data)
if obj == nil {
var zero T
var zero V
return zero, errors.New("object was removed from heap data")
}
return obj.(T), nil
return obj.(V), nil
}

// List returns a list of all objects in the heap.
func (h *Heap[T, K]) List() []T {
func (h *Heap[K, V]) List() []V {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]T, 0, len(h.data.items))
list := make([]V, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
return list
}

// ListKeys returns a list of all keys in the heap.
func (h *Heap[T, K]) ListKeys() []K {
func (h *Heap[K, V]) ListKeys() []K {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]K, 0, len(h.data.items))
Expand All @@ -263,39 +263,39 @@ func (h *Heap[T, K]) ListKeys() []K {
}

// Get returns an object from the heap.
func (h *Heap[T, K]) Get(obj T) (T, bool, error) {
func (h *Heap[K, V]) Get(obj V) (V, bool, error) {
key, err := h.data.keyFunc(obj)
if err != nil {
var zero T
var zero V
return zero, false, errors.Errorf("key error: %v", err)
}
return h.GetByKey(key)
}

// GetByKey returns an object from the heap by key.
func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) {
func (h *Heap[K, V]) GetByKey(key K) (V, bool, error) {
h.lock.RLock()
defer h.lock.RUnlock()
item, exists := h.data.items[key]
if !exists {
var zero T
var zero V
return zero, false, nil
}
return item.obj, true, nil
}

// IsClosed returns true if the heap is closed.
func (h *Heap[T, K]) IsClosed() bool {
func (h *Heap[K, V]) IsClosed() bool {
h.lock.RLock()
defer h.lock.RUnlock()
return h.closed
}

// NewHeap returns a Heap which can be used to queue up items to process.
func NewHeap[T any, K comparable](keyFn KeyFunc[T, K], lessFn LessFunc[T]) *Heap[T, K] {
h := &Heap[T, K]{
data: &heapData[T, K]{
items: map[K]*heapItem[T]{},
func NewHeap[K comparable, V any](keyFn KeyFunc[K, V], lessFn LessFunc[V]) *Heap[K, V] {
h := &Heap[K, V]{
data: &heapData[K, V]{
items: map[K]*heapItem[K, V]{},
queue: []K{},
keyFunc: keyFn,
lessFunc: lessFn,
Expand Down
34 changes: 34 additions & 0 deletions pkg/statistics/handle/autoanalyze/internal/heap/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heap

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}

0 comments on commit 468d429

Please sign in to comment.