Skip to content

Commit

Permalink
statistics: support tracking column cmsketch (#35135)
Browse files Browse the repository at this point in the history
ref #34052
  • Loading branch information
Yisaer authored Jun 17, 2022
1 parent 580a68d commit 9652651
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 132 deletions.
18 changes: 9 additions & 9 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,15 @@ func TableStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *J
statsVer = *jsonCol.StatsVer
}
col := &statistics.Column{
PhysicalID: physicalID,
Histogram: *hist,
CMSketch: cm,
TopN: topN,
FMSketch: fms,
Info: colInfo,
IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
StatsVer: statsVer,
Loaded: true,
PhysicalID: physicalID,
Histogram: *hist,
CMSketch: cm,
TopN: topN,
FMSketch: fms,
Info: colInfo,
IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
StatsVer: statsVer,
ColLoadedStatus: statistics.NewColFullLoadStatus(),
}
col.Count = int64(col.TotalRowCount())
tbl.Columns[col.ID] = col
Expand Down
44 changes: 22 additions & 22 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (h *Handle) LoadNeededHistograms() (err error) {
continue
}
c, ok := tbl.Columns[col.ColumnID]
if !ok || c.IsLoaded() {
if !ok || !c.IsLoadNeeded() {
statistics.HistogramNeededColumns.Delete(col)
continue
}
Expand All @@ -680,15 +680,15 @@ func (h *Handle) LoadNeededHistograms() (err error) {
logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", col.TableID), zap.Int64("hist_id", col.ColumnID))
}
colHist := &statistics.Column{
PhysicalID: col.TableID,
Histogram: *hg,
Info: c.Info,
CMSketch: cms,
TopN: topN,
FMSketch: fms,
IsHandle: c.IsHandle,
StatsVer: rows[0].GetInt64(0),
Loaded: true,
PhysicalID: col.TableID,
Histogram: *hg,
Info: c.Info,
CMSketch: cms,
TopN: topN,
FMSketch: fms,
IsHandle: c.IsHandle,
StatsVer: rows[0].GetInt64(0),
ColLoadedStatus: statistics.NewColFullLoadStatus(),
}
// Column.Count is calculated by Column.TotalRowCount(). Hence we don't set Column.Count when initializing colHist.
colHist.Count = int64(colHist.TotalRowCount())
Expand Down Expand Up @@ -831,7 +831,7 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl
// 4. loadAll is false.
notNeedLoad := h.Lease() > 0 &&
!isHandle &&
(col == nil || !col.IsNecessaryLoaded() && col.LastUpdateVersion < histVer) &&
(col == nil || !col.IsStatsInitialized() && col.LastUpdateVersion < histVer) &&
!loadAll
if notNeedLoad {
count, err := h.columnCountFromStorage(reader, table.PhysicalID, histID, statsVer)
Expand Down Expand Up @@ -868,17 +868,17 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl
return errors.Trace(err)
}
col = &statistics.Column{
PhysicalID: table.PhysicalID,
Histogram: *hg,
Info: colInfo,
CMSketch: cms,
TopN: topN,
FMSketch: fmSketch,
ErrorRate: errorRate,
IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
Flag: flag,
StatsVer: statsVer,
Loaded: true,
PhysicalID: table.PhysicalID,
Histogram: *hg,
Info: colInfo,
CMSketch: cms,
TopN: topN,
FMSketch: fmSketch,
ErrorRate: errorRate,
IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
Flag: flag,
StatsVer: statsVer,
ColLoadedStatus: statistics.NewColFullLoadStatus(),
}
// Column.Count is calculated by Column.TotalRowCount(). Hence we don't set Column.Count when initializing col.
col.Count = int64(col.TotalRowCount())
Expand Down
18 changes: 9 additions & 9 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,15 @@ func (h *Handle) readStatsForOne(col model.TableColumnID, c *statistics.Column,
logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", col.TableID), zap.Int64("hist_id", col.ColumnID))
}
colHist := &statistics.Column{
PhysicalID: col.TableID,
Histogram: *hg,
Info: c.Info,
CMSketch: cms,
TopN: topN,
FMSketch: fms,
IsHandle: c.IsHandle,
StatsVer: rows[0].GetInt64(0),
Loaded: true,
PhysicalID: col.TableID,
Histogram: *hg,
Info: c.Info,
CMSketch: cms,
TopN: topN,
FMSketch: fms,
IsHandle: c.IsHandle,
StatsVer: rows[0].GetInt64(0),
ColLoadedStatus: statistics.NewColFullLoadStatus(),
}
// Column.Count is calculated by Column.TotalRowCount(). Hence, we don't set Column.Count when initializing colHist.
colHist.Count = int64(colHist.TotalRowCount())
Expand Down
30 changes: 30 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3336,6 +3336,36 @@ func TestAnalyzeIncrementalEvictedIndex(t *testing.T) {
require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertEvictIndex"))
}

func TestEvictedColumnLoadedStatus(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
dom.StatsHandle().SetLease(0)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_analyze_version = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("analyze table test.t")
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.Nil(t, err)
tblStats := domain.GetDomain(tk.Session()).StatsHandle().GetTableStats(tbl.Meta())
for _, col := range tblStats.Columns {
require.True(t, col.IsStatsInitialized())
}

domain.GetDomain(tk.Session()).StatsHandle().SetStatsCacheCapacity(1)
tblStats = domain.GetDomain(tk.Session()).StatsHandle().GetTableStats(tbl.Meta())
for _, col := range tblStats.Columns {
require.True(t, col.IsStatsInitialized())
require.True(t, col.IsCMSEvicted())
}
}

func TestAnalyzeTableLRUPut(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
Expand Down
58 changes: 46 additions & 12 deletions statistics/handle/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (s *statsInnerCache) GetByQuery(tblID int64) (*statistics.Table, bool) {
for idxID := range element.tbl.Indices {
s.lru.get(tblID, idxID, true)
}
// move column element
for colID := range element.tbl.Columns {
s.lru.get(tblID, colID, false)
}
return element.tbl, true
}

Expand Down Expand Up @@ -133,12 +137,14 @@ func (s *statsInnerCache) Put(tblID int64, tbl *statistics.Table) {
func (s *statsInnerCache) put(tblID int64, tbl *statistics.Table, tblMemUsage *statistics.TableMemoryUsage, needMove bool) {
element, exist := s.elements[tblID]
if exist {
s.updateColumns(tblID, tbl, tblMemUsage, needMove)
s.updateIndices(tblID, tbl, tblMemUsage, needMove)
// idx mem usage might be changed before, thus we recalculate the tblMem usage
element.tbl = tbl
element.tblMemUsage = tbl.MemoryUsage()
return
}
s.updateColumns(tblID, tbl, tblMemUsage, needMove)
s.updateIndices(tblID, tbl, tblMemUsage, needMove)
// mem usage might be changed due to evict, thus we recalculate the tblMem usage
s.elements[tblID] = &lruMapElement{
Expand All @@ -148,31 +154,47 @@ func (s *statsInnerCache) put(tblID int64, tbl *statistics.Table, tblMemUsage *s
}

func (s *statsInnerCache) updateIndices(tblID int64, tbl *statistics.Table, tblMemUsage *statistics.TableMemoryUsage, needMove bool) {
element, exist := s.elements[tblID]
_, exist := s.elements[tblID]
if exist {
oldtbl := element.tbl
oldIdxs := s.lru.elements[tblID][true]
deletedIdx := make([]int64, 0)
for oldIdxID := range oldtbl.Indices {
for oldIdxID := range oldIdxs {
_, exist := tbl.Indices[oldIdxID]
if !exist {
deletedIdx = append(deletedIdx, oldIdxID)
}
}
for idxID, index := range tbl.Indices {
idxMem := tblMemUsage.IndicesMemUsage[idxID]
s.lru.put(tblID, idxID, true, index, idxMem, true, needMove)
}
for _, idxID := range deletedIdx {
s.lru.del(tblID, idxID, true)
}
return
}
for idxID, idx := range tbl.Indices {
idxMem := tblMemUsage.IndicesMemUsage[idxID]
s.lru.put(tblID, idxID, true, idx, idxMem, true, needMove)
}
}

func (s *statsInnerCache) updateColumns(tblID int64, tbl *statistics.Table, tblMemUsage *statistics.TableMemoryUsage, needMove bool) {
_, exist := s.elements[tblID]
if exist {
oldCols := s.lru.elements[tblID][false]
deletedCol := make([]int64, 0)
for oldColID := range oldCols {
_, exist := tbl.Columns[oldColID]
if !exist {
deletedCol = append(deletedCol, oldColID)
}
}
for _, colID := range deletedCol {
s.lru.del(tblID, colID, false)
}
}
for colID, col := range tbl.Columns {
colMem := tblMemUsage.ColumnsMemUsage[colID]
s.lru.put(tblID, colID, false, col, colMem, true, needMove)
}
}

// Del implements statsCacheInner
func (s *statsInnerCache) Del(tblID int64) {
s.Lock()
Expand All @@ -185,6 +207,10 @@ func (s *statsInnerCache) Del(tblID int64) {
for idxID := range element.tbl.Indices {
s.lru.del(tblID, idxID, true)
}
// remove columns
for colID := range element.tbl.Columns {
s.lru.del(tblID, colID, false)
}
delete(s.elements, tblID)
}

Expand Down Expand Up @@ -301,9 +327,7 @@ func (s *statsInnerCache) onEvict(tblID int64) {

func (s *statsInnerCache) freshTableCost(tblID int64, element *lruMapElement) {
element.tblMemUsage = element.tbl.MemoryUsage()
for idxID, idx := range element.tbl.Indices {
s.lru.put(tblID, idxID, true, idx, element.tblMemUsage.IndicesMemUsage[idxID], true, false)
}
s.put(tblID, element.tbl, element.tblMemUsage, false)
}

func (s *statsInnerCache) capacity() int64 {
Expand Down Expand Up @@ -345,8 +369,14 @@ func (c *innerItemLruCache) del(tblID, id int64, isIndex bool) {
return
}
delCounter.Inc()
memUsage := c.elements[tblID][isIndex][id].Value.(*lruCacheItem).innerMemUsage
delete(c.elements[tblID][isIndex], id)
c.cache.Remove(ele)
if isIndex {
c.calculateCost(&statistics.IndexMemUsage{}, memUsage)
} else {
c.calculateCost(&statistics.ColumnMemUsage{}, memUsage)
}
}

func (c *innerItemLruCache) put(tblID, id int64, isIndex bool, item statistics.TableCacheItem, itemMem statistics.CacheItemMemoryUsage,
Expand Down Expand Up @@ -388,7 +418,11 @@ func (c *innerItemLruCache) put(tblID, id int64, isIndex bool, item statistics.T
}
newElement := c.cache.PushFront(newItem)
v[id] = newElement
c.calculateCost(itemMem, &statistics.IndexMemUsage{})
if isIndex {
c.calculateCost(itemMem, &statistics.IndexMemUsage{})
} else {
c.calculateCost(itemMem, &statistics.ColumnMemUsage{})
}
}

func (c *innerItemLruCache) evictIfNeeded() {
Expand Down
Loading

0 comments on commit 9652651

Please sign in to comment.