Skip to content

Commit

Permalink
planner, statistics: async load should load all column meta info for …
Browse files Browse the repository at this point in the history
…lite init (#53297) (#53307)

ref #53141
  • Loading branch information
ti-chi-bot authored May 16, 2024
1 parent b14e28f commit 1f8f3ab
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func CollectColumnStatsUsage(lp LogicalPlan, predicate, histNeeded bool) (
if histNeeded {
collector.histNeededCols[*colToTriggerLoad] = true
} else {
statistics.HistogramNeededItems.Insert(*colToTriggerLoad)
statistics.HistogramNeededItems.Insert(*colToTriggerLoad, true)
}
})
var (
Expand Down
15 changes: 15 additions & 0 deletions pkg/planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,21 @@ func (ds *DataSource) initStats(colGroups [][]*expression.Column) {
ds.tableStats = tableStats
ds.tableStats.GroupNDVs = ds.getGroupNDVs(colGroups)
ds.TblColHists = ds.statisticTable.ID2UniqueID(ds.TblCols)
for _, col := range ds.tableInfo.Columns {
if col.State != model.StatePublic {
continue
}
// If we enable lite stats init or we just found out the meta info of the column is missed, we need to register columns for async load.
_, isLoadNeeded, _ := ds.statisticTable.ColumnIsLoadNeeded(col.ID, false)
if isLoadNeeded {
statistics.HistogramNeededItems.Insert(model.TableItemID{
TableID: ds.tableInfo.ID,
ID: col.ID,
IsIndex: false,
IsSyncLoadFailed: ds.SCtx().GetSessionVars().StmtCtx.StatsLoad.Timeout > 0,
}, false)
}
}
}

func (ds *DataSource) deriveStatsByFilter(conds expression.CNFExprs, filledPaths []*util.AccessPath) *property.StatsInfo {
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func ColumnStatsIsInvalid(colStats *Column, sctx context.PlanContext, histColl *
ID: cid,
IsIndex: false,
IsSyncLoadFailed: sctx.GetSessionVars().StmtCtx.StatsLoad.Timeout > 0,
})
}, true)
}
}
if histColl.Pseudo {
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/handletest/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,9 +1406,11 @@ func TestInitStatsLite(t *testing.T) {
require.NoError(t, h.LoadNeededHistograms())
statsTbl2 := h.GetTableStats(tblInfo)
colBStats1 := statsTbl2.Columns[colBID]
colCStats := statsTbl2.Columns[colCID]
require.True(t, colBStats1.IsFullLoad())
idxBStats1 := statsTbl2.Indices[idxBID]
require.True(t, idxBStats1.IsFullLoad())
require.True(t, colCStats.IsAllEvicted())

// sync stats load
tk.MustExec("set @@tidb_stats_load_sync_wait = 60000")
Expand Down
51 changes: 31 additions & 20 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,10 @@ func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCa
items := statistics.HistogramNeededItems.AllItems()
for _, item := range items {
if !item.IsIndex {
err = loadNeededColumnHistograms(sctx, statsCache, item, loadFMSketch)
err = loadNeededColumnHistograms(sctx, statsCache, item.TableItemID, loadFMSketch, item.FullLoad)
} else {
err = loadNeededIndexHistograms(sctx, statsCache, item, loadFMSketch)
// Index is always full load.
err = loadNeededIndexHistograms(sctx, statsCache, item.TableItemID, loadFMSketch)
}
if err != nil {
return err
Expand All @@ -560,27 +561,27 @@ func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int {
for _, item := range items {
tbl, ok := statsCache.Get(item.TableID)
if !ok {
statistics.HistogramNeededItems.Delete(item)
statistics.HistogramNeededItems.Delete(item.TableItemID)
continue
}
loadNeeded := false
if item.IsIndex {
_, loadNeeded = tbl.IndexIsLoadNeeded(item.ID)
} else {
var analyzed bool
_, loadNeeded, analyzed = tbl.ColumnIsLoadNeeded(item.ID, true)
_, loadNeeded, analyzed = tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad)
loadNeeded = loadNeeded && analyzed
}
if !loadNeeded {
statistics.HistogramNeededItems.Delete(item)
statistics.HistogramNeededItems.Delete(item.TableItemID)
continue
}
reallyNeeded++
}
return reallyNeeded
}

func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCache, col model.TableItemID, loadFMSketch bool) (err error) {
func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCache, col model.TableItemID, loadFMSketch bool, fullLoad bool) (err error) {
tbl, ok := statsCache.Get(col.TableID)
if !ok {
return nil
Expand All @@ -592,25 +593,31 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
return nil
}
colInfo = tbl.ColAndIdxExistenceMap.GetCol(col.ID)
hgMeta, _, statsVer, _, err := HistMetaFromStorage(sctx, &col, colInfo)
if hgMeta == nil || err != nil {
hg, _, statsVer, _, err := HistMetaFromStorage(sctx, &col, colInfo)
if hg == nil || err != nil {
statistics.HistogramNeededItems.Delete(col)
return err
}
hg, err := HistogramFromStorage(sctx, col.TableID, col.ID, &colInfo.FieldType, hgMeta.NDV, 0, hgMeta.LastUpdateVersion, hgMeta.NullCount, hgMeta.TotColSize, hgMeta.Correlation)
if err != nil {
return errors.Trace(err)
}
cms, topN, err := CMSketchAndTopNFromStorage(sctx, col.TableID, 0, col.ID)
if err != nil {
return errors.Trace(err)
}
var fms *statistics.FMSketch
if loadFMSketch {
fms, err = FMSketchFromStorage(sctx, col.TableID, 0, col.ID)
var (
cms *statistics.CMSketch
topN *statistics.TopN
fms *statistics.FMSketch
)
if fullLoad {
hg, err = HistogramFromStorage(sctx, col.TableID, col.ID, &colInfo.FieldType, hg.NDV, 0, hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation)
if err != nil {
return errors.Trace(err)
}
cms, topN, err = CMSketchAndTopNFromStorage(sctx, col.TableID, 0, col.ID)
if err != nil {
return errors.Trace(err)
}
if loadFMSketch {
fms, err = FMSketchFromStorage(sctx, col.TableID, 0, col.ID)
if err != nil {
return errors.Trace(err)
}
}
}
colHist := &statistics.Column{
PhysicalID: col.TableID,
Expand All @@ -630,7 +637,11 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
}
tbl = tbl.Copy()
if colHist.StatsAvailable() {
colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
if fullLoad {
colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
} else {
colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, colHist.LastUpdateVersion)
if statsVer != statistics.Version0 {
tbl.StatsVer = int(statsVer)
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func IndexStatsIsInvalid(sctx context.PlanContext, idxStats *Index, coll *HistCo
ID: cid,
IsIndex: true,
IsSyncLoadFailed: sctx.GetSessionVars().StmtCtx.StatsLoad.Timeout > 0,
})
}, true)
// TODO: we can return true here. But need to fix some tests first.
}
if idxStats == nil {
Expand Down
35 changes: 23 additions & 12 deletions pkg/statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,23 +677,34 @@ func (t *Table) IndexIsLoadNeeded(id int64) (*Index, bool) {
}

type neededStatsInternalMap struct {
items map[model.TableItemID]struct{}
// the bool value indicates whether is a full load or not.
items map[model.TableItemID]bool
m sync.RWMutex
}

func (n *neededStatsInternalMap) AllItems() []model.TableItemID {
func (n *neededStatsInternalMap) AllItems() []model.StatsLoadItem {
n.m.RLock()
keys := make([]model.TableItemID, 0, len(n.items))
for key := range n.items {
keys = append(keys, key)
keys := make([]model.StatsLoadItem, 0, len(n.items))
for key, val := range n.items {
keys = append(keys, model.StatsLoadItem{
TableItemID: key,
FullLoad: val,
})
}
n.m.RUnlock()
return keys
}

func (n *neededStatsInternalMap) Insert(col model.TableItemID) {
func (n *neededStatsInternalMap) Insert(col model.TableItemID, fullLoad bool) {
n.m.Lock()
n.items[col] = struct{}{}
cur := n.items[col]
if cur {
// If the existing one is full load. We don't need to update it.
n.m.Unlock()
return
}
n.items[col] = fullLoad
// Otherwise, we could safely update it.
n.m.Unlock()
}

Expand Down Expand Up @@ -729,23 +740,23 @@ func newNeededStatsMap() *neededStatsMap {
result := neededStatsMap{}
for i := 0; i < shardCnt; i++ {
result.items[i] = neededStatsInternalMap{
items: make(map[model.TableItemID]struct{}),
items: make(map[model.TableItemID]bool),
}
}
return &result
}

func (n *neededStatsMap) AllItems() []model.TableItemID {
var result []model.TableItemID
func (n *neededStatsMap) AllItems() []model.StatsLoadItem {
var result []model.StatsLoadItem
for i := 0; i < shardCnt; i++ {
keys := n.items[i].AllItems()
result = append(result, keys...)
}
return result
}

func (n *neededStatsMap) Insert(col model.TableItemID) {
n.items[getIdx(col)].Insert(col)
func (n *neededStatsMap) Insert(col model.TableItemID, fullLoad bool) {
n.items[getIdx(col)].Insert(col, fullLoad)
}

func (n *neededStatsMap) Delete(col model.TableItemID) {
Expand Down

0 comments on commit 1f8f3ab

Please sign in to comment.