Skip to content

Commit

Permalink
stats: fix data race when updating stats cache (pingcap#13647)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and XiaTianliang committed Dec 21, 2019
1 parent a71a8fe commit 02f256b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 66 deletions.
47 changes: 24 additions & 23 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -31,7 +32,7 @@ import (
"go.uber.org/zap"
)

func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
table, ok := h.getTableByPhysicalID(is, physicalID)
Expand All @@ -53,11 +54,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables StatsCache
Version: row.GetUint64(0),
Name: getFullTableName(is, tableInfo),
}
tables[physicalID] = tbl
cache.tables[physicalID] = tbl
}
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (StatsCache, error) {
func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
Expand All @@ -66,27 +67,27 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (StatsCache, error) {
defer terror.Call(rc[0].Close)
}
if err != nil {
return nil, errors.Trace(err)
return statsCache{}, errors.Trace(err)
}
tables := StatsCache{}
tables := statsCache{tables: make(map[int64]*statistics.Table)}
req := rc[0].NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
if err != nil {
return nil, errors.Trace(err)
return statsCache{}, errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsMeta4Chunk(is, tables, iter)
h.initStatsMeta4Chunk(is, &tables, iter)
}
return tables, nil
}

func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := tables[row.GetInt64(0)]
table, ok := cache.tables[row.GetInt64(0)]
if !ok {
continue
}
Expand Down Expand Up @@ -137,7 +138,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
}
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache) error {
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
Expand All @@ -158,15 +159,15 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, tables, iter)
h.initStatsHistograms4Chunk(is, cache, iter)
}
return nil
}

func initStatsBuckets4Chunk(ctx sessionctx.Context, tables StatsCache, iter *chunk.Iterator4Chunk) {
func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
table, ok := tables[tableID]
table, ok := cache.tables[tableID]
if !ok {
continue
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, tables StatsCache, iter *chu
}
}

func (h *Handle) initStatsBuckets(tables StatsCache) error {
func (h *Handle) initStatsBuckets(cache *statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
Expand All @@ -230,12 +231,11 @@ func (h *Handle) initStatsBuckets(tables StatsCache) error {
if req.NumRows() == 0 {
break
}
initStatsBuckets4Chunk(h.mu.ctx, tables, iter)
initStatsBuckets4Chunk(h.mu.ctx, cache, iter)
}
for _, table := range tables {
if h.mu.lastVersion < table.Version {
h.mu.lastVersion = table.Version
}
lastVersion := uint64(0)
for _, table := range cache.tables {
lastVersion = mathutil.MaxUint64(lastVersion, table.Version)
for _, idx := range table.Indices {
for i := 1; i < idx.Len(); i++ {
idx.Buckets[i].Count += idx.Buckets[i-1].Count
Expand All @@ -249,24 +249,25 @@ func (h *Handle) initStatsBuckets(tables StatsCache) error {
col.PreCalculateScalar()
}
}
cache.version = lastVersion
return nil
}

// InitStats will init the stats cache using full load strategy.
func (h *Handle) InitStats(is infoschema.InfoSchema) error {
tables, err := h.initStatsMeta(is)
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
}
err = h.initStatsHistograms(is, tables)
err = h.initStatsHistograms(is, &cache)
if err != nil {
return errors.Trace(err)
}
err = h.initStatsBuckets(tables)
err = h.initStatsBuckets(&cache)
if err != nil {
return errors.Trace(err)
}
h.StatsCache.Store(tables)
h.updateStatsCache(cache)
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package handle_test

import (
"fmt"
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -48,7 +49,15 @@ func (s *testStatsSuite) TestConversion(c *C) {
tbl := h.GetTableStats(tableInfo.Meta())
assertTableEqual(c, loadTbl, tbl)

cleanEnv(c, s.store, s.do)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
c.Assert(h.Update(is), IsNil)
wg.Done()
}()
err = h.LoadStatsFromJSON(is, jsonTbl)
wg.Wait()
c.Assert(err, IsNil)
loadTblInStorage := h.GetTableStats(tableInfo.Meta())
assertTableEqual(c, loadTblInStorage, tbl)
Expand Down
87 changes: 50 additions & 37 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ import (
"go.uber.org/zap"
)

// StatsCache caches the tables in memory for Handle.
type StatsCache map[int64]*statistics.Table
// statsCache caches the tables in memory for Handle.
type statsCache struct {
tables map[int64]*statistics.Table
// version is the latest version of cache.
version uint64
}

// Handle can update stats info periodically.
type Handle struct {
mu struct {
sync.Mutex
ctx sessionctx.Context
// lastVersion is the latest update version before last lease.
lastVersion uint64
// rateMap contains the error rate delta from feedback.
rateMap errorRateDeltaMap
// pid2tid is the map from partition ID to table ID.
Expand All @@ -57,9 +59,15 @@ type Handle struct {
schemaVersion int64
}

// It can be read by multiply readers at the same time without acquire lock, but it can be
// written only after acquire the lock.
statsCache struct {
sync.Mutex
atomic.Value
}

restrictedExec sqlexec.RestrictedSQLExecutor

StatsCache atomic.Value
// ddlEventCh is a channel to notify a ddl operation has happened.
// It is sent only by owner or the drop stats executor, and read by stats handle.
ddlEventCh chan *util.Event
Expand All @@ -73,11 +81,10 @@ type Handle struct {
lease atomic2.Duration
}

// Clear the StatsCache, only for test.
// Clear the statsCache, only for test.
func (h *Handle) Clear() {
h.mu.Lock()
h.StatsCache.Store(StatsCache{})
h.mu.lastVersion = 0
h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
Expand Down Expand Up @@ -110,7 +117,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
}
handle.mu.ctx = ctx
handle.mu.rateMap = make(errorRateDeltaMap)
handle.StatsCache.Store(StatsCache{})
handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
return handle
}

Expand Down Expand Up @@ -139,14 +146,15 @@ func DurationToTS(d time.Duration) uint64 {

// Update reads stats meta from store and updates the stats map.
func (h *Handle) Update(is infoschema.InfoSchema) error {
lastVersion := h.LastUpdateVersion()
oldCache := h.statsCache.Load().(statsCache)
lastVersion := oldCache.version
// We need this because for two tables, the smaller version may write later than the one with larger version.
// Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1),
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than three lease.
offset := DurationToTS(3 * h.Lease())
if lastVersion >= offset {
if oldCache.version >= offset {
lastVersion = lastVersion - offset
} else {
lastVersion = 0
Expand Down Expand Up @@ -190,10 +198,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
tbl.Name = getFullTableName(is, tableInfo)
tables = append(tables, tbl)
}
h.mu.Lock()
h.mu.lastVersion = lastVersion
h.UpdateTableStats(tables, deletedTableIDs)
h.mu.Unlock()
h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion))
return nil
}

Expand Down Expand Up @@ -232,43 +237,54 @@ func (h *Handle) GetTableStats(tblInfo *model.TableInfo) *statistics.Table {

// GetPartitionStats retrieves the partition stats from cache.
func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table {
tbl, ok := h.StatsCache.Load().(StatsCache)[pid]
statsCache := h.statsCache.Load().(statsCache)
tbl, ok := statsCache.tables[pid]
if !ok {
tbl = statistics.PseudoTable(tblInfo)
tbl.PhysicalID = pid
h.UpdateTableStats([]*statistics.Table{tbl}, nil)
h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version))
return tbl
}
return tbl
}

func (h *Handle) copyFromOldCache() StatsCache {
newCache := StatsCache{}
oldCache := h.StatsCache.Load().(StatsCache)
for k, v := range oldCache {
newCache[k] = v
func (h *Handle) updateStatsCache(newCache statsCache) {
h.statsCache.Lock()
oldCache := h.statsCache.Load().(statsCache)
if oldCache.version <= newCache.version {
h.statsCache.Store(newCache)
}
h.statsCache.Unlock()
}

func (sc statsCache) copy() statsCache {
newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)), version: sc.version}
for k, v := range sc.tables {
newCache.tables[k] = v
}
return newCache
}

// UpdateTableStats updates the statistics table cache using copy on write.
func (h *Handle) UpdateTableStats(tables []*statistics.Table, deletedIDs []int64) {
newCache := h.copyFromOldCache()
// update updates the statistics table cache using copy on write.
func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache {
newCache := sc.copy()
newCache.version = newVersion
for _, tbl := range tables {
id := tbl.PhysicalID
newCache[id] = tbl
newCache.tables[id] = tbl
}
for _, id := range deletedIDs {
delete(newCache, id)
delete(newCache.tables, id)
}
h.StatsCache.Store(newCache)
return newCache
}

// LoadNeededHistograms will load histograms for those needed columns.
func (h *Handle) LoadNeededHistograms() error {
cols := statistics.HistogramNeededColumns.AllCols()
for _, col := range cols {
tbl, ok := h.StatsCache.Load().(StatsCache)[col.TableID]
statsCache := h.statsCache.Load().(statsCache)
tbl, ok := statsCache.tables[col.TableID]
if !ok {
continue
}
Expand All @@ -294,24 +310,21 @@ func (h *Handle) LoadNeededHistograms() error {
Count: int64(hg.TotalRowCount()),
IsHandle: c.IsHandle,
}
h.UpdateTableStats([]*statistics.Table{tbl}, nil)
h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version))
statistics.HistogramNeededColumns.Delete(col)
}
return nil
}

// LastUpdateVersion gets the last update version.
func (h *Handle) LastUpdateVersion() uint64 {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.lastVersion
return h.statsCache.Load().(statsCache).version
}

// SetLastUpdateVersion sets the last update version.
func (h *Handle) SetLastUpdateVersion(version uint64) {
h.mu.Lock()
defer h.mu.Unlock()
h.mu.lastVersion = version
statsCache := h.statsCache.Load().(statsCache)
h.updateStatsCache(statsCache.update(nil, nil, version))
}

// FlushStats flushes the cached stats update into store.
Expand Down Expand Up @@ -477,7 +490,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table,

// tableStatsFromStorage loads table stats info from storage.
func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) {
table, ok := h.StatsCache.Load().(StatsCache)[physicalID]
table, ok := h.statsCache.Load().(statsCache).tables[physicalID]
// If table stats is pseudo, we also need to copy it, since we will use the column stats when
// the average error rate of it is small.
if !ok || historyStatsExec != nil {
Expand Down
Loading

0 comments on commit 02f256b

Please sign in to comment.