diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index 7958973d7bd96..755eb4de296fc 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "physical_plans.go", "plan.go", "plan_cache.go", + "plan_cache_instance.go", "plan_cache_lru.go", "plan_cache_param.go", "plan_cache_rebuild.go", @@ -224,6 +225,7 @@ go_test( "partition_pruning_test.go", "physical_plan_test.go", "physical_plan_trace_test.go", + "plan_cache_instance_test.go", "plan_cache_lru_test.go", "plan_cache_param_test.go", "plan_cache_test.go", diff --git a/pkg/planner/core/plan_cache_instance.go b/pkg/planner/core/plan_cache_instance.go new file mode 100644 index 0000000000000..0f274ac295431 --- /dev/null +++ b/pkg/planner/core/plan_cache_instance.go @@ -0,0 +1,229 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "sort" + "sync" + "time" + + "github.com/pingcap/tidb/pkg/sessionctx" + "go.uber.org/atomic" +) + +// InstancePlanCache represents the instance/node level plan cache. +// Value and Opts should always be *PlanCacheValue and *PlanCacheMatchOpts, use any to avoid cycle-import. +type InstancePlanCache interface { + // Get gets the cached value from the cache according to key and opts. + Get(sctx sessionctx.Context, key string, opts any) (value any, ok bool) + // Put puts the key and value into the cache. + Put(sctx sessionctx.Context, key string, value, opts any) (succ bool) + // Evict evicts some cached values. + Evict(sctx sessionctx.Context) (evicted bool) + // MemUsage returns the total memory usage of this plan cache. + MemUsage() int64 +} + +// NewInstancePlanCache creates a new instance level plan cache. +func NewInstancePlanCache(softMemLimit, hardMemLimit int64) InstancePlanCache { + planCache := new(instancePlanCache) + planCache.softMemLimit.Store(softMemLimit) + planCache.hardMemLimit.Store(hardMemLimit) + return planCache +} + +type instancePCNode struct { + value *PlanCacheValue + lastUsed atomic.Time + next atomic.Pointer[instancePCNode] +} + +// instancePlanCache is a lock-free implementation of InstancePlanCache interface. +// [key1] --> [headNode1] --> [node1] --> [node2] --> [node3] +// [key2] --> [headNode2] --> [node4] --> [node5] +// [key3] --> [headNode3] --> [node6] --> [node7] --> [node8] +// headNode.value is always empty, headNode is designed to make it easier to implement. +type instancePlanCache struct { + heads sync.Map + totCost atomic.Int64 + + evictMutex sync.Mutex + softMemLimit atomic.Int64 + hardMemLimit atomic.Int64 +} + +func (pc *instancePlanCache) getHead(key string, create bool) *instancePCNode { + headNode, ok := pc.heads.Load(key) + if ok { // cache hit + return headNode.(*instancePCNode) + } + if !create { // cache miss + return nil + } + newHeadNode := pc.createNode(nil) + actual, _ := pc.heads.LoadOrStore(key, newHeadNode) + if headNode, ok := actual.(*instancePCNode); ok { // for safety + return headNode + } + return nil +} + +// Get gets the cached value according to key and opts. +func (pc *instancePlanCache) Get(sctx sessionctx.Context, key string, opts any) (value any, ok bool) { + headNode := pc.getHead(key, false) + if headNode == nil { // cache miss + return nil, false + } + return pc.getPlanFromList(sctx, headNode, opts) +} + +func (*instancePlanCache) getPlanFromList(sctx sessionctx.Context, headNode *instancePCNode, opts any) (any, bool) { + for node := headNode.next.Load(); node != nil; node = node.next.Load() { + var matchOpts *PlanCacheMatchOpts + if opts != nil { + matchOpts = opts.(*PlanCacheMatchOpts) + } + if matchCachedPlan(sctx, node.value, matchOpts) { // v.Plan is read-only, no need to lock + node.lastUsed.Store(time.Now()) // atomically update the lastUsed field + return node.value, true + } + } + return nil, false +} + +// Put puts the key and values into the cache. +// Due to some thread-safety issues, this Put operation might fail, use the returned succ to indicate it. +func (pc *instancePlanCache) Put(sctx sessionctx.Context, key string, value, opts any) (succ bool) { + vMem := value.(*PlanCacheValue).MemoryUsage() + if vMem+pc.totCost.Load() > pc.hardMemLimit.Load() { + return // do nothing if it exceeds the hard limit + } + headNode := pc.getHead(key, true) + if headNode == nil { + return false // for safety + } + if _, ok := pc.getPlanFromList(sctx, headNode, opts); ok { + return // some other thread has inserted the same plan before + } + + firstNode := headNode.next.Load() + currNode := pc.createNode(value) + currNode.next.Store(firstNode) + if headNode.next.CompareAndSwap(firstNode, currNode) { // if failed, some other thread has updated this node, + pc.totCost.Add(vMem) // then skip this Put and wait for the next time. + succ = true + } + return +} + +// Evict evicts some values. There should be a background thread to perform the eviction. +// step 1: iterate all values to collect their last_used +// step 2: estimate an eviction threshold time based on all last_used values +// step 3: iterate all values again and evict qualified values +func (pc *instancePlanCache) Evict(_ sessionctx.Context) (evicted bool) { + pc.evictMutex.Lock() // make sure only one thread to trigger eviction for safety + defer pc.evictMutex.Unlock() + if pc.totCost.Load() < pc.softMemLimit.Load() { + return // do nothing + } + lastUsedTimes := make([]time.Time, 0, 64) + pc.foreach(func(_, this *instancePCNode) bool { // step 1 + lastUsedTimes = append(lastUsedTimes, this.lastUsed.Load()) + return false + }) + threshold := pc.calcEvictionThreshold(lastUsedTimes) // step 2 + pc.foreach(func(prev, this *instancePCNode) bool { // step 3 + if !this.lastUsed.Load().After(threshold) { // if lastUsed<=threshold, evict this value + if prev.next.CompareAndSwap(this, this.next.Load()) { // have to use CAS since + pc.totCost.Sub(this.value.MemoryUsage()) // it might have been updated by other thread + evicted = true + return true + } + } + return false + }) + + // post operation: clear empty heads in pc.Heads + keys, headNodes := pc.headNodes() + for i, headNode := range headNodes { + if headNode.next.Load() == nil { + pc.heads.Delete(keys[i]) + } + } + return +} + +// MemUsage returns the memory usage of this plan cache. +func (pc *instancePlanCache) MemUsage() int64 { + return pc.totCost.Load() +} + +func (pc *instancePlanCache) calcEvictionThreshold(lastUsedTimes []time.Time) (t time.Time) { + if len(lastUsedTimes) == 0 { + return + } + totCost, softMemLimit := pc.totCost.Load(), pc.softMemLimit.Load() + avgPerPlan := totCost / int64(len(lastUsedTimes)) + if avgPerPlan <= 0 { + return + } + memToRelease := totCost - softMemLimit + // (... +avgPerPlan-1) is used to try to keep the final memory usage below the soft mem limit. + numToEvict := (memToRelease + avgPerPlan - 1) / avgPerPlan + if numToEvict <= 0 { + return + } + sort.Slice(lastUsedTimes, func(i, j int) bool { + return lastUsedTimes[i].Before(lastUsedTimes[j]) + }) + if len(lastUsedTimes) < int(numToEvict) { + return // for safety, avoid index-of-range panic + } + return lastUsedTimes[numToEvict-1] +} + +func (pc *instancePlanCache) foreach(callback func(prev, this *instancePCNode) (thisRemoved bool)) { + _, headNodes := pc.headNodes() + for _, headNode := range headNodes { + for prev, this := headNode, headNode.next.Load(); this != nil; { + thisRemoved := callback(prev, this) + if !thisRemoved { // this node is removed, no need to update the prev node in this case + prev, this = this, this.next.Load() + } else { + this = this.next.Load() + } + } + } +} + +func (pc *instancePlanCache) headNodes() ([]string, []*instancePCNode) { + keys := make([]string, 0, 64) + headNodes := make([]*instancePCNode, 0, 64) + pc.heads.Range(func(k, v any) bool { + keys = append(keys, k.(string)) + headNodes = append(headNodes, v.(*instancePCNode)) + return true + }) + return keys, headNodes +} + +func (*instancePlanCache) createNode(value any) *instancePCNode { + node := new(instancePCNode) + if value != nil { + node.value = value.(*PlanCacheValue) + } + node.lastUsed.Store(time.Now()) + return node +} diff --git a/pkg/planner/core/plan_cache_instance_test.go b/pkg/planner/core/plan_cache_instance_test.go new file mode 100644 index 0000000000000..e619a7afb6850 --- /dev/null +++ b/pkg/planner/core/plan_cache_instance_test.go @@ -0,0 +1,259 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/stretchr/testify/require" +) + +func _put(sctx sessionctx.Context, pc InstancePlanCache, testKey, memUsage, statsHash int64) (succ bool) { + v := &PlanCacheValue{testKey: testKey, memoryUsage: memUsage, matchOpts: &PlanCacheMatchOpts{StatsVersionHash: uint64(statsHash)}} + return pc.Put(sctx, fmt.Sprintf("%v", testKey), v, &PlanCacheMatchOpts{StatsVersionHash: uint64(statsHash)}) +} + +func _hit(t *testing.T, sctx sessionctx.Context, pc InstancePlanCache, testKey, statsHash int) { + v, ok := pc.Get(sctx, fmt.Sprintf("%v", testKey), &PlanCacheMatchOpts{StatsVersionHash: uint64(statsHash)}) + require.True(t, ok) + require.Equal(t, v.(*PlanCacheValue).testKey, int64(testKey)) +} + +func _miss(t *testing.T, sctx sessionctx.Context, pc InstancePlanCache, testKey, statsHash int) { + _, ok := pc.Get(sctx, fmt.Sprintf("%v", testKey), &PlanCacheMatchOpts{StatsVersionHash: uint64(statsHash)}) + require.False(t, ok) +} + +func TestInstancePlanCacheBasic(t *testing.T) { + sctx := MockContext() + defer func() { + domain.GetDomain(sctx).StatsHandle().Close() + }() + + pc := NewInstancePlanCache(1000, 1000) + _put(sctx, pc, 1, 100, 0) + _put(sctx, pc, 2, 100, 0) + _put(sctx, pc, 3, 100, 0) + require.Equal(t, pc.MemUsage(), int64(300)) + _hit(t, sctx, pc, 1, 0) + _hit(t, sctx, pc, 2, 0) + _hit(t, sctx, pc, 3, 0) + + // exceed the hard limit during Put + pc = NewInstancePlanCache(250, 250) + _put(sctx, pc, 1, 100, 0) + _put(sctx, pc, 2, 100, 0) + _put(sctx, pc, 3, 100, 0) + require.Equal(t, pc.MemUsage(), int64(200)) + _hit(t, sctx, pc, 1, 0) + _hit(t, sctx, pc, 2, 0) + _miss(t, sctx, pc, 3, 0) + + // can't Put 2 same values + pc = NewInstancePlanCache(250, 250) + _put(sctx, pc, 1, 100, 0) + _put(sctx, pc, 1, 101, 0) + require.Equal(t, pc.MemUsage(), int64(100)) // the second one will be ignored + + // eviction + pc = NewInstancePlanCache(320, 500) + _put(sctx, pc, 1, 100, 0) + _put(sctx, pc, 2, 100, 0) + _put(sctx, pc, 3, 100, 0) + _put(sctx, pc, 4, 100, 0) + _put(sctx, pc, 5, 100, 0) + _hit(t, sctx, pc, 1, 0) // access 1-3 to refresh their last_used + _hit(t, sctx, pc, 2, 0) + _hit(t, sctx, pc, 3, 0) + require.Equal(t, pc.Evict(sctx), true) + require.Equal(t, pc.MemUsage(), int64(300)) + _hit(t, sctx, pc, 1, 0) // access 1-3 to refresh their last_used + _hit(t, sctx, pc, 2, 0) + _hit(t, sctx, pc, 3, 0) + _miss(t, sctx, pc, 4, 0) // 4-5 have been evicted + _miss(t, sctx, pc, 5, 0) + + // no need to eviction if mem < softLimit + pc = NewInstancePlanCache(320, 500) + _put(sctx, pc, 1, 100, 0) + _put(sctx, pc, 2, 100, 0) + _put(sctx, pc, 3, 100, 0) + require.Equal(t, pc.Evict(sctx), false) + require.Equal(t, pc.MemUsage(), int64(300)) + _hit(t, sctx, pc, 1, 0) + _hit(t, sctx, pc, 2, 0) + _hit(t, sctx, pc, 3, 0) + + // empty head should be dropped after eviction + pc = NewInstancePlanCache(1, 500) + _put(sctx, pc, 1, 100, 0) + _put(sctx, pc, 2, 100, 0) + _put(sctx, pc, 3, 100, 0) + require.Equal(t, pc.MemUsage(), int64(300)) + pcImpl := pc.(*instancePlanCache) + numHeads := 0 + pcImpl.heads.Range(func(k, v any) bool { numHeads++; return true }) + require.Equal(t, numHeads, 3) + require.Equal(t, pc.Evict(sctx), true) + require.Equal(t, pc.MemUsage(), int64(0)) + numHeads = 0 + pcImpl.heads.Range(func(k, v any) bool { numHeads++; return true }) + require.Equal(t, numHeads, 0) +} + +func TestInstancePlanCacheWithMatchOpts(t *testing.T) { + sctx := MockContext() + defer func() { + domain.GetDomain(sctx).StatsHandle().Close() + }() + sctx.GetSessionVars().PlanCacheInvalidationOnFreshStats = true + + // same key with different statsHash + pc := NewInstancePlanCache(1000, 1000) + _put(sctx, pc, 1, 100, 1) + _put(sctx, pc, 1, 100, 2) + _put(sctx, pc, 1, 100, 3) + _hit(t, sctx, pc, 1, 1) + _hit(t, sctx, pc, 1, 2) + _hit(t, sctx, pc, 1, 3) + _miss(t, sctx, pc, 1, 4) + _miss(t, sctx, pc, 2, 1) + + // multiple keys with same statsHash + pc = NewInstancePlanCache(1000, 1000) + _put(sctx, pc, 1, 100, 1) + _put(sctx, pc, 1, 100, 2) + _put(sctx, pc, 2, 100, 1) + _put(sctx, pc, 2, 100, 2) + _hit(t, sctx, pc, 1, 1) + _hit(t, sctx, pc, 1, 2) + _miss(t, sctx, pc, 1, 3) + _hit(t, sctx, pc, 2, 1) + _hit(t, sctx, pc, 2, 2) + _miss(t, sctx, pc, 2, 3) + _miss(t, sctx, pc, 3, 1) + _miss(t, sctx, pc, 3, 2) + _miss(t, sctx, pc, 3, 3) + + // hard limit can take effect in this case + pc = NewInstancePlanCache(200, 200) + _put(sctx, pc, 1, 100, 1) + _put(sctx, pc, 1, 100, 2) + _put(sctx, pc, 1, 100, 3) // the third one will be ignored + require.Equal(t, pc.MemUsage(), int64(200)) + _hit(t, sctx, pc, 1, 1) + _hit(t, sctx, pc, 1, 2) + _miss(t, sctx, pc, 1, 3) + + // eviction this case + pc = NewInstancePlanCache(300, 500) + _put(sctx, pc, 1, 100, 1) + _put(sctx, pc, 1, 100, 2) + _put(sctx, pc, 1, 100, 3) + _put(sctx, pc, 1, 100, 4) + _put(sctx, pc, 1, 100, 5) + _hit(t, sctx, pc, 1, 1) // refresh 1-3's last_used + _hit(t, sctx, pc, 1, 2) + _hit(t, sctx, pc, 1, 3) + require.True(t, pc.Evict(sctx)) + require.Equal(t, pc.MemUsage(), int64(300)) + _hit(t, sctx, pc, 1, 1) + _hit(t, sctx, pc, 1, 2) + _hit(t, sctx, pc, 1, 3) + _miss(t, sctx, pc, 1, 4) + _miss(t, sctx, pc, 1, 5) +} + +func TestInstancePlanCacheConcurrentRead(t *testing.T) { + sctx := MockContext() + defer func() { + domain.GetDomain(sctx).StatsHandle().Close() + }() + sctx.GetSessionVars().PlanCacheInvalidationOnFreshStats = true + + pc := NewInstancePlanCache(300, 100000) + var flag [100][100]bool + for k := 0; k < 100; k++ { + for statsHash := 0; statsHash < 100; statsHash++ { + if rand.Intn(10) < 7 { + _put(sctx, pc, int64(k), 1, int64(statsHash)) + flag[k][statsHash] = true + } + } + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10000; i++ { + k, statsHash := rand.Intn(100), rand.Intn(100) + if flag[k][statsHash] { + _hit(t, sctx, pc, k, statsHash) + } else { + _miss(t, sctx, pc, k, statsHash) + } + time.Sleep(time.Nanosecond * 10) + } + }() + } + wg.Wait() +} + +func TestInstancePlanCacheConcurrentWriteRead(t *testing.T) { + sctx := MockContext() + defer func() { + domain.GetDomain(sctx).StatsHandle().Close() + }() + sctx.GetSessionVars().PlanCacheInvalidationOnFreshStats = true + var flag [100][100]atomic.Bool + pc := NewInstancePlanCache(300, 100000) + var wg sync.WaitGroup + for i := 0; i < 5; i++ { // writers + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + k, statsHash := rand.Intn(100), rand.Intn(100) + if _put(sctx, pc, int64(k), 1, int64(statsHash)) { + flag[k][statsHash].Store(true) + } + time.Sleep(time.Nanosecond * 10) + } + }() + } + for i := 0; i < 5; i++ { // readers + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 2000; i++ { + k, statsHash := rand.Intn(100), rand.Intn(100) + if flag[k][statsHash].Load() { + _hit(t, sctx, pc, k, statsHash) + } + time.Sleep(time.Nanosecond * 5) + } + }() + } + wg.Wait() +} diff --git a/pkg/planner/core/plan_cache_utils.go b/pkg/planner/core/plan_cache_utils.go index 1e56a31c1fb30..58f5b47a4fdc3 100644 --- a/pkg/planner/core/plan_cache_utils.go +++ b/pkg/planner/core/plan_cache_utils.go @@ -302,6 +302,7 @@ type PlanCacheValue struct { OutputColumns types.NameSlice TblInfo2UnionScan map[*model.TableInfo]bool memoryUsage int64 + testKey int64 // this is only for test // matchOpts stores some fields help to choose a suitable plan matchOpts *PlanCacheMatchOpts