Skip to content

Commit

Permalink
statistics: stop loading too many stats when to init stats (#53999)
Browse files Browse the repository at this point in the history
close #54000
  • Loading branch information
hawkingrei committed Jun 18, 2024
1 parent 8f56847 commit 2cea994
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 34 deletions.
2 changes: 2 additions & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze",
"//pkg/statistics/handle/cache",
Expand All @@ -34,6 +35,7 @@ go_library(
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/memory",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
Expand Down
108 changes: 74 additions & 34 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -177,7 +179,7 @@ func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache s
}
}

func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk, isCacheFull bool) {
var table *statistics.Table
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tblID, statsVer := row.GetInt64(0), row.GetInt64(8)
Expand Down Expand Up @@ -210,10 +212,17 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
if idxInfo == nil {
continue
}
cms, topN, err := statistics.DecodeCMSketchAndTopN(row.GetBytes(6), nil)
if err != nil {
cms = nil
terror.Log(errors.Trace(err))

var cms *statistics.CMSketch
var topN *statistics.TopN
var err error
if !isCacheFull {
// stats cache is full. we should not put it into cache. but we must set LastAnalyzeVersion
cms, topN, err = statistics.DecodeCMSketchAndTopN(row.GetBytes(6), nil)
if err != nil {
cms = nil
terror.Log(errors.Trace(err))
}
}
hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0)
index := &statistics.Index{
Expand All @@ -226,7 +235,8 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
PhysicalID: tblID,
}
if statsVer != statistics.Version0 {
index.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
// We first set the StatsLoadedStatus as AllEvicted. when completing to load bucket, we will set it as ALlLoad.
index.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version)
}
Expand Down Expand Up @@ -254,6 +264,8 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
Flag: row.GetInt64(10),
StatsVer: statsVer,
}
// primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
lastAnalyzePos.Copy(&col.LastAnalyzePos)
table.Columns[hist.ID] = col
table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, colInfo, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0)
Expand Down Expand Up @@ -309,12 +321,12 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
h.initStatsHistograms4Chunk(is, cache, iter, false)
}
return nil
}

func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task) error {
func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task, totalMemory uint64) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
Expand All @@ -324,6 +336,7 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
h.Pool.SPool().Put(se)
}
}()

sctx := se.(sessionctx.Context)
// Why do we need to add `is_index=1` in the SQL?
// because it is aligned to the `initStatsTopN` function, which only loads the topn of the index too.
Expand All @@ -345,16 +358,16 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
h.initStatsHistograms4Chunk(is, cache, iter, isFullCache(cache, totalMemory))
}
return nil
}

func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache, totalMemory uint64) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker("histogram", func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task)
return h.initStatsHistogramsByPaging(is, cache, task, totalMemory)
}, uint64(maxTid), uint64(initStatsStep))
ls.LoadStats()
for tid <= maxTid {
Expand All @@ -368,7 +381,10 @@ func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache
return nil
}

func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk, totalMemory uint64) {
if isFullCache(cache, totalMemory) {
return
}
affectedIndexes := make(map[*statistics.Index]struct{})
var table *statistics.Table
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
Expand Down Expand Up @@ -404,7 +420,7 @@ func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
}
}

func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
func (h *Handle) initStatsTopN(cache statstypes.StatsCache, totalMemory uint64) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl)*/ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 order by table_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
Expand All @@ -422,12 +438,12 @@ func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
h.initStatsTopN4Chunk(cache, iter, totalMemory)
}
return nil
}

func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task) error {
func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task, totalMemory uint64) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
Expand Down Expand Up @@ -455,19 +471,28 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
h.initStatsTopN4Chunk(cache, iter, totalMemory)
}
return nil
}

func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error {
func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache, totalMemory uint64) error {
if isFullCache(cache, totalMemory) {
return nil
}
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker("TopN", func(task initstats.Task) error {
return h.initStatsTopNByPaging(cache, task)
if isFullCache(cache, totalMemory) {
return nil
}
return h.initStatsTopNByPaging(cache, task, totalMemory)
}, uint64(maxTid), uint64(initStatsStep))
ls.LoadStats()
for tid <= maxTid {
if isFullCache(cache, totalMemory) {
break
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + initStatsStep,
Expand Down Expand Up @@ -534,6 +559,9 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
if table == nil || table.PhysicalID != tableID {
if table != nil {
for _, index := range table.Indices {
index.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
}
cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read.
}
var ok bool
Expand Down Expand Up @@ -584,9 +612,12 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
}
}

func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint64) error {
if isFullCache(cache, totalMemory) {
return nil
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err := h.initStatsBucketsConcurrency(cache)
err := h.initStatsBucketsConcurrency(cache, totalMemory)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -663,10 +694,16 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
return nil
}

func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error {
func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache, totalMemory uint64) error {
if isFullCache(cache, totalMemory) {
return nil
}
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker("bucket", func(task initstats.Task) error {
if isFullCache(cache, totalMemory) {
return nil
}
return h.initStatsBucketsByPaging(cache, task)
}, uint64(maxTid), uint64(initStatsStep))
ls.LoadStats()
Expand All @@ -676,6 +713,9 @@ func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error
EndTid: tid + initStatsStep,
})
tid += initStatsStep
if isFullCache(cache, totalMemory) {
break
}
}
ls.Wait()
return nil
Expand Down Expand Up @@ -715,6 +755,10 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
// 1. Basic stats meta data is loaded.(count, modify count, etc.)
// 2. Column/index stats are loaded. (histogram, topn, buckets, FMSketch)
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
totalMemory, err := memory.MemTotal()
if err != nil {
return err
}
loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch
defer func() {
_, err1 := util.Exec(h.initStatsCtx, "commit")
Expand All @@ -733,7 +777,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
}
statslogutil.StatsLogger().Info("complete to load the meta")
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache)
err = h.initStatsHistogramsConcurrency(is, cache, totalMemory)
} else {
err = h.initStatsHistograms(is, cache)
}
Expand All @@ -742,9 +786,9 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
return errors.Trace(err)
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsTopNConcurrency(cache)
err = h.initStatsTopNConcurrency(cache, totalMemory)
} else {
err = h.initStatsTopN(cache)
err = h.initStatsTopN(cache, totalMemory)
}
statslogutil.StatsLogger().Info("complete to load the topn")
if err != nil {
Expand All @@ -757,20 +801,16 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
}
statslogutil.StatsLogger().Info("complete to load the FM Sketch")
}
err = h.initStatsBuckets(cache)
err = h.initStatsBuckets(cache, totalMemory)
statslogutil.StatsLogger().Info("complete to load the bucket")
if err != nil {
return errors.Trace(err)
}
// Set columns' stats status.
for _, table := range cache.Values() {
for _, col := range table.Columns {
if col.StatsAvailable() {
// primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
}
}
h.Replace(cache)
return nil
}

func isFullCache(cache statstypes.StatsCache, total uint64) bool {
memQuota := variable.StatsCacheMemQuota.Load()
return (uint64(cache.MemConsumed()) >= total/4) || (cache.MemConsumed() >= memQuota && memQuota != 0)
}

0 comments on commit 2cea994

Please sign in to comment.