From c51870be591a5850fba3705e4518bdaa504db533 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Sep 2024 18:41:17 +0800 Subject: [PATCH] This is an automated cherry-pick of #55943 Signed-off-by: ti-chi-bot --- pkg/statistics/handle/cache/statscache.go | 287 ++++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 pkg/statistics/handle/cache/statscache.go diff --git a/pkg/statistics/handle/cache/statscache.go b/pkg/statistics/handle/cache/statscache.go new file mode 100644 index 0000000000000..938b6ebf7e231 --- /dev/null +++ b/pkg/statistics/handle/cache/statscache.go @@ -0,0 +1,287 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a Copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + tidbmetrics "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/metrics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics" + "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +// StatsCacheImpl implements util.StatsCache. +type StatsCacheImpl struct { + atomic.Pointer[StatsCache] + + statsHandle types.StatsHandle +} + +// NewStatsCacheImpl creates a new StatsCache. +func NewStatsCacheImpl(statsHandle types.StatsHandle) (types.StatsCache, error) { + newCache, err := NewStatsCache() + if err != nil { + return nil, err + } + + result := &StatsCacheImpl{ + statsHandle: statsHandle, + } + result.Store(newCache) + + return result, nil +} + +// NewStatsCacheImplForTest creates a new StatsCache for test. +func NewStatsCacheImplForTest() (types.StatsCache, error) { + return NewStatsCacheImpl(nil) +} + +// Update reads stats meta from store and updates the stats map. +func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) error { + start := time.Now() + lastVersion := s.getLastVersion() + var ( + rows []chunk.Row + err error + ) + if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { + rows, _, err = util.ExecRows( + sctx, + "SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? order by version", + lastVersion, + ) + return err + }); err != nil { + return errors.Trace(err) + } + + tables := make([]*statistics.Table, 0, len(rows)) + deletedTableIDs := make([]int64, 0, len(rows)) + + for _, row := range rows { + version := row.GetUint64(0) + physicalID := row.GetInt64(1) + modifyCount := row.GetInt64(2) + count := row.GetInt64(3) + snapshot := row.GetUint64(4) + + // Detect the context cancel signal, since it may take a long time for the loop. + // TODO: add context to TableInfoByID and remove this code block? + if ctx.Err() != nil { + return ctx.Err() + } + + table, ok := s.statsHandle.TableInfoByID(is, physicalID) + if !ok { + logutil.BgLogger().Debug( + "unknown physical ID in stats meta table, maybe it has been dropped", + zap.Int64("ID", physicalID), + ) + deletedTableIDs = append(deletedTableIDs, physicalID) + continue + } + tableInfo := table.Meta() + // If the table is not updated, we can skip it. + if oldTbl, ok := s.Get(physicalID); ok && + oldTbl.Version >= version && + tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS { + continue + } + tbl, err := s.statsHandle.TableStatsFromStorage( + tableInfo, + physicalID, + false, + 0, + ) + // Error is not nil may mean that there are some ddl changes on this table, we will not update it. + if err != nil { + statslogutil.StatsLogger().Error( + "error occurred when read table stats", + zap.String("table", tableInfo.Name.O), + zap.Error(err), + ) + continue + } + if tbl == nil { + deletedTableIDs = append(deletedTableIDs, physicalID) + continue + } + tbl.Version = version + tbl.RealtimeCount = count + tbl.ModifyCount = modifyCount + tbl.TblInfoUpdateTS = tableInfo.UpdateTS + // It only occurs in the following situations: + // 1. The table has already been analyzed, + // but because the predicate columns feature is turned on, and it doesn't have any columns or indexes analyzed, + // it only analyzes _row_id and refreshes stats_meta, in which case the snapshot is not zero. + // 2. LastAnalyzeVersion is 0 because it has never been loaded. + // In this case, we can initialize LastAnalyzeVersion to the snapshot, + // otherwise auto-analyze will assume that the table has never been analyzed and try to analyze it again. + if tbl.LastAnalyzeVersion == 0 && snapshot != 0 { + tbl.LastAnalyzeVersion = snapshot + } + tables = append(tables, tbl) + } + + s.UpdateStatsCache(tables, deletedTableIDs) + dur := time.Since(start) + tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds()) + return nil +} + +func (s *StatsCacheImpl) getLastVersion() uint64 { + // Get the greatest version of the stats meta table. + lastVersion := s.MaxTableStatsVersion() + // We need this because for two tables, the smaller version may write later than the one with larger version. + // Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1), + // and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read + // the table stats of A0 if we read stats that greater than lastVersion which is B0. + // We can read the stats if the diff between commit time and version is less than five lease. + offset := util.DurationToTS(5 * s.statsHandle.Lease()) // 5 lease is 15s. + if s.MaxTableStatsVersion() >= offset { + lastVersion = lastVersion - offset + } else { + lastVersion = 0 + } + + return lastVersion +} + +// Replace replaces this cache. +func (s *StatsCacheImpl) Replace(cache types.StatsCache) { + x := cache.(*StatsCacheImpl) + s.replace(x.Load()) +} + +// replace replaces the cache with the new cache. +func (s *StatsCacheImpl) replace(newCache *StatsCache) { + old := s.Swap(newCache) + if old != nil { + old.Close() + } + metrics.CostGauge.Set(float64(newCache.Cost())) +} + +// UpdateStatsCache updates the cache with the new cache. +func (s *StatsCacheImpl) UpdateStatsCache(tables []*statistics.Table, deletedIDs []int64) { + if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota { + s.Load().Update(tables, deletedIDs) + } else { + // TODO: remove this branch because we will always enable quota. + newCache := s.Load().CopyAndUpdate(tables, deletedIDs) + s.replace(newCache) + } +} + +// Close closes this cache. +func (s *StatsCacheImpl) Close() { + s.Load().Close() +} + +// Clear clears this cache. +// Create a empty cache and replace the old one. +func (s *StatsCacheImpl) Clear() { + cache, err := NewStatsCache() + if err != nil { + logutil.BgLogger().Warn("create stats cache failed", zap.Error(err)) + return + } + s.replace(cache) +} + +// MemConsumed returns its memory usage. +func (s *StatsCacheImpl) MemConsumed() (size int64) { + return s.Load().Cost() +} + +// Get returns the specified table's stats. +func (s *StatsCacheImpl) Get(tableID int64) (*statistics.Table, bool) { + failpoint.Inject("StatsCacheGetNil", func() { + failpoint.Return(nil, false) + }) + return s.Load().Get(tableID) +} + +// Put puts this table stats into the cache. +func (s *StatsCacheImpl) Put(id int64, t *statistics.Table) { + s.Load().put(id, t) +} + +// MaxTableStatsVersion returns the version of the current cache, which is defined as +// the max table stats version the cache has in its lifecycle. +func (s *StatsCacheImpl) MaxTableStatsVersion() uint64 { + return s.Load().Version() +} + +// Values returns all values in this cache. +func (s *StatsCacheImpl) Values() []*statistics.Table { + return s.Load().Values() +} + +// Len returns the length of this cache. +func (s *StatsCacheImpl) Len() int { + return s.Load().Len() +} + +// SetStatsCacheCapacity sets the cache's capacity. +func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) { + s.Load().SetCapacity(c) +} + +// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. +func (s *StatsCacheImpl) UpdateStatsHealthyMetrics() { + distribution := make([]int64, 5) + uneligibleAnalyze := 0 + for _, tbl := range s.Values() { + distribution[4]++ // total table count + isEligibleForAnalysis := tbl.IsEligibleForAnalysis() + if !isEligibleForAnalysis { + uneligibleAnalyze++ + continue + } + healthy, ok := tbl.GetStatsHealthy() + if !ok { + continue + } + if healthy < 50 { + distribution[0]++ + } else if healthy < 80 { + distribution[1]++ + } else if healthy < 100 { + distribution[2]++ + } else { + distribution[3]++ + } + } + for i, val := range distribution { + handle_metrics.StatsHealthyGauges[i].Set(float64(val)) + } + handle_metrics.StatsHealthyGauges[5].Set(float64(uneligibleAnalyze)) +}