Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: incremental analyze for index with feedback updates #10355

Merged
merged 8 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 11 additions & 17 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,13 +1075,13 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {

type analyzeIndexIncrementalExec struct {
AnalyzeIndexExec
index *statistics.Index
oldHist *statistics.Histogram
oldCMS *statistics.CMSketch
}

func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult {
idx := idxExec.index
highBound := idx.Histogram.GetUpper(idx.Len() - 1)
values, err := codec.Decode(highBound.GetBytes(), len(idxExec.idxInfo.Columns))
startPos := idxExec.oldHist.GetUpper(idxExec.oldHist.Len() - 1)
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
values, err := codec.DecodeRange(startPos.GetBytes(), len(idxExec.idxInfo.Columns))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand All @@ -1090,16 +1090,12 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
oldHist, oldCMS, err := idx.RemoveUpperBound(idxExec.ctx.GetSessionVars().StmtCtx, values)
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, oldHist, hist, int(idxExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
if oldCMS != nil && cms != nil {
err = cms.MergeCMSketch(oldCMS)
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS)
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand All @@ -1120,26 +1116,24 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult

type analyzePKIncrementalExec struct {
AnalyzeColumnsExec
pkStats *statistics.Column
oldHist *statistics.Histogram
}

func analyzePKIncremental(colExec *analyzePKIncrementalExec) analyzeResult {
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
pkStats := colExec.pkStats
high := pkStats.GetUpper(pkStats.Len() - 1)
var maxVal types.Datum
if mysql.HasUnsignedFlag(colExec.pkInfo.Flag) {
maxVal = types.NewUintDatum(math.MaxUint64)
} else {
maxVal = types.NewIntDatum(math.MaxInt64)
}
ran := ranger.Range{LowVal: []types.Datum{*high}, LowExclude: true, HighVal: []types.Datum{maxVal}}
startPos := *colExec.oldHist.GetUpper(colExec.oldHist.Len() - 1)
ran := ranger.Range{LowVal: []types.Datum{startPos}, LowExclude: true, HighVal: []types.Datum{maxVal}}
hists, _, err := colExec.buildStats([]*ranger.Range{&ran})
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
hist := hists[0]
oldHist := pkStats.Histogram.Copy()
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, oldHist, hist, int(colExec.maxNumBuckets))
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
Expand Down
36 changes: 36 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -303,4 +307,36 @@ func (s *testSuite1) TestAnalyzeIncremental(c *C) {
tk.MustExec("analyze incremental table t index")
// Result should not change.
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2"))

// Test analyze incremental with feedback.
tk.MustExec("insert into t values (3,3)")
oriProbability := statistics.FeedbackProbability.Load()
defer func() {
statistics.FeedbackProbability.Store(oriProbability)
}()
statistics.FeedbackProbability.Store(1)
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()
tk.MustQuery("select * from t use index(idx) where b = 3")
tk.MustQuery("select * from t where a > 1")
h := s.dom.StatsHandle()
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
c.Assert(h.DumpStatsFeedbackToKV(), IsNil)
c.Assert(h.HandleUpdateStats(is), IsNil)
c.Assert(h.Update(is), IsNil)
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 3 0 2 2147483647", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2"))
tblStats := h.GetTableStats(tblInfo)
val, err := codec.EncodeKey(tk.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(3))
c.Assert(err, IsNil)
c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1))
c.Assert(statistics.IsAnalyzed(tblStats.Indices[tblInfo.Indices[0].ID].Flag), IsFalse)
c.Assert(statistics.IsAnalyzed(tblStats.Columns[tblInfo.Columns[0].ID].Flag), IsFalse)

tk.MustExec("analyze incremental table t index")
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t a 0 2 3 1 3 3",
"test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2", "test t idx 1 2 3 1 3 3"))
tblStats = h.GetTableStats(tblInfo)
c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1))
}
47 changes: 36 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,18 +1398,28 @@ func (b *executorBuilder) buildAnalyzeIndexIncremental(task plannercore.AnalyzeI
return analyzeTask
}
idx, ok := statsTbl.Indices[task.IndexInfo.ID]
// TODO: If the index contains feedback, we may use other strategy.
if !ok || idx.Len() == 0 || idx.ContainsFeedback() {
if !ok || idx.Len() == 0 || idx.LastAnalyzePos.IsNull() {
return analyzeTask
}
exec := analyzeTask.idxExec
if idx.CMSketch != nil {
width, depth := idx.CMSketch.GetWidthAndDepth()
exec.analyzePB.IdxReq.CmsketchWidth = &width
exec.analyzePB.IdxReq.CmsketchDepth = &depth
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(idx.Flag) {
exec := analyzeTask.idxExec
if idx.CMSketch != nil {
width, depth := idx.CMSketch.GetWidthAndDepth()
exec.analyzePB.IdxReq.CmsketchWidth = &width
exec.analyzePB.IdxReq.CmsketchDepth = &depth
}
oldHist = idx.Histogram.Copy()
} else {
_, bktID := idx.LessRowCountWithBktIdx(idx.LastAnalyzePos)
if bktID == 0 {
return analyzeTask
}
oldHist = idx.TruncateHistogram(bktID)
}
oldHist = oldHist.RemoveUpperBound()
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
analyzeTask.taskType = idxIncrementalTask
analyzeTask.idxIncrementalExec = &analyzeIndexIncrementalExec{AnalyzeIndexExec: *analyzeTask.idxExec, index: idx}
analyzeTask.idxIncrementalExec = &analyzeIndexIncrementalExec{AnalyzeIndexExec: *analyzeTask.idxExec, oldHist: oldHist, oldCMS: idx.CMSketch}
analyzeTask.job = &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "analyze incremental index " + task.IndexInfo.Name.O}
return analyzeTask
}
Expand Down Expand Up @@ -1458,13 +1468,28 @@ func (b *executorBuilder) buildAnalyzePKIncremental(task plannercore.AnalyzeColu
return analyzeTask
}
col, ok := statsTbl.Columns[task.PKInfo.ID]
// TODO: If the primary key contains feedback, we may use other strategy.
if !ok || col.Len() == 0 || col.ContainsFeedback() {
if !ok || col.Len() == 0 || col.LastAnalyzePos.IsNull() {
return analyzeTask
}
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(col.Flag) {
oldHist = col.Histogram.Copy()
} else {
d, err := col.LastAnalyzePos.ConvertTo(b.ctx.GetSessionVars().StmtCtx, col.Tp)
if err != nil {
b.err = err
return nil
}
_, bktID := col.LessRowCountWithBktIdx(d)
if bktID == 0 {
return analyzeTask
}
oldHist = col.TruncateHistogram(bktID)
oldHist.NDV = int64(oldHist.TotalRowCount())
}
exec := analyzeTask.colExec
analyzeTask.taskType = pkIncrementalTask
analyzeTask.colIncrementalExec = &analyzePKIncrementalExec{AnalyzeColumnsExec: *exec, pkStats: col}
analyzeTask.colIncrementalExec = &analyzePKIncrementalExec{AnalyzeColumnsExec: *exec, oldHist: oldHist}
analyzeTask.job = &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "analyze incremental primary key"}
return analyzeTask
}
Expand Down
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,7 @@ func (s *testSuite1) SetUpSuite(c *C) {
c.Assert(err, IsNil)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom.SetStatsUpdating(true)
}

func (s *testSuite1) TearDownSuite(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ const (
stats_ver bigint(64) NOT NULL DEFAULT 0,
flag bigint(64) NOT NULL DEFAULT 0,
correlation double NOT NULL DEFAULT 0,
last_analyze_pos blob DEFAULT NULL,
unique index tbl(table_id, is_index, hist_id)
);`

Expand Down Expand Up @@ -328,6 +329,7 @@ const (
version28 = 28
version29 = 29
version30 = 30
version31 = 31
)

func checkBootstrapped(s Session) (bool, error) {
Expand Down Expand Up @@ -507,6 +509,10 @@ func upgrade(s Session) {
upgradeToVer30(s)
}

if ver < version31 {
upgradeToVer31(s)
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")

Expand Down Expand Up @@ -799,6 +805,10 @@ func upgradeToVer30(s Session) {
mustExecute(s, CreateStatsTopNTable)
}

func upgradeToVer31(s Session) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `last_analyze_pos` blob default null", infoschema.ErrColumnExists)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er

const (
notBootstrapped = 0
currentBootstrapVersion = 30
currentBootstrapVersion = 31
)

func getStoreBootstrapVersion(store kv.Storage) int64 {
Expand Down
25 changes: 25 additions & 0 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,31 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch) error {
return nil
}

// MergeCMSketch4IncrementalAnalyze merges two CM Sketch for incremental analyze. Since there is no value
// that appears partially in `c` and `rc` for incremental analyze, it uses `max` to merge them.
// Here is a simple proof: when we query from the CM sketch, we use the `min` to get the answer:
// (1): For values that only appears in `c, using `max` to merge them affects the `min` query result less than using `sum`;
// (2): For values that only appears in `rc`, it is the same as condition (1);
// (3): For values that appears both in `c` and `rc`, if they do not appear partially in `c` and `rc`, for example,
// if `v` appears 5 times in the table, it can appears 5 times in `c` and 3 times in `rc`, then `max` also gives the correct answer.
// So in fact, if we can know the number of appearances of each value in the first place, it is better to use `max` to construct the CM sketch rather than `sum`.
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch) error {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
}
for i := range c.table {
c.count = 0
for j := range c.table[i] {
c.table[i][j] = mathutil.MaxUint32(c.table[i][j], rc.table[i][j])
c.count += uint64(c.table[i][j])
}
}
return nil
}

// CMSketchToProto converts CMSketch to its protobuf representation.
func CMSketchToProto(c *CMSketch) *tipb.CMSketch {
protoSketch := &tipb.CMSketch{Rows: make([]*tipb.CMSketchRow, c.depth)}
Expand Down
16 changes: 9 additions & 7 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
terror.Log(errors.Trace(err))
}
hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0)
table.Indices[hist.ID] = &statistics.Index{Histogram: *hist, CMSketch: cms, Info: idxInfo, StatsVer: row.GetInt64(8)}
table.Indices[hist.ID] = &statistics.Index{Histogram: *hist, CMSketch: cms, Info: idxInfo, StatsVer: row.GetInt64(8), Flag: row.GetInt64(10), LastAnalyzePos: row.GetDatum(11, types.NewFieldType(mysql.TypeBlob))}
} else {
var colInfo *model.ColumnInfo
for _, col := range tbl.Meta().Columns {
Expand All @@ -124,11 +124,13 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
hist := statistics.NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize)
hist.Correlation = row.GetFloat64(9)
table.Columns[hist.ID] = &statistics.Column{
Histogram: *hist,
PhysicalID: table.PhysicalID,
Info: colInfo,
Count: nullCount,
IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag),
Histogram: *hist,
PhysicalID: table.PhysicalID,
Info: colInfo,
Count: nullCount,
IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag),
Flag: row.GetInt64(10),
LastAnalyzePos: row.GetDatum(11, types.NewFieldType(mysql.TypeBlob)),
}
}
}
Expand All @@ -137,7 +139,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation from mysql.stats_histograms"
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
Expand Down
Loading